站内搜索: 请输入搜索关键词

当前页面: 开发资料首页Java 专题Java Thread实现读写同步

Java Thread实现读写同步

摘要: Java Thread实现读写同步

(wang hailong)
本文给出一个例子,说明如何使用java thread,处理读写者同步的问题。
文中的源代码能够编译运行。java thread.TestMain
本文可以和另一篇文章《Java Thread应该注意的问题》,进行比较阅读。
1.读写者同步问题 多个读者可以同时读取同一个缓冲区,但当有写者对缓冲区进行写操作时,具有排他性质,其他的读者都不能读取这个缓冲区,其他的写者也不能写这个缓冲区。
2.代码构成和思路 这个例子中包括四个类。
(1)读写资源类RWResource,包含了读写者的计数,共享资源,还有所有的同步代码。
(2)读者类RunnableReader。实现Runnable接口;进行读操作。
(3)写者类RunnableWriter。实现Runnable接口;进行写操作。
(4)测试类TestMain。生成并运行多个写线程和读线程,显示结果。
这个例子对共享资源进行“盒式封装”,把共享资源包含在一个“盒”内。并把所有的同步代码都集中在“盒”里面。读者类和写者类并不进行同步处理,只是申请资源,然后进行读写,读写完成之后,释放资源。
这种方法的优点是共享资源“盒”部分的代码直观易读,紧凑可控,读者类和写者类不用关心同步问题。缺点是共享资源“盒”规定了严格的调用顺序和调用规范,读者类和写者类必须严格遵守共享资源“盒”的调用规范,否则会造成线程死锁,或者资源操作冲突。
不过,即使由读者类和写者类来实现线程同步,如果不注意,也会造成线程死锁,或者资源操作冲突。这是线程的固有问题。:-)
下面给出这四个类的源代码和说明。
3.读写资源类RWResource package thread;
import java.util.List;
import java.util.ArrayList;
/**
* resource for reading and writing
*/
public class RWResource {
/**
* When readerNumber == 0, there is no one reading or writing.
* When readerNumber > 0, readerNumber means number of readers.
* When readerNumber < 0, it means that some writer is writing. */
private int readerNumber = 0;
/**
* the shared resource for writing or reading
*/
private List buffer = null;
public RWResource() {
buffer = new ArrayList(512);
readerNumber = 0;
}
/**
* get buffer for reading.
* should be called before reading
* @return the buffer
*/
public synchronized List getBufferForReading(){
// if some writer is writing, wait until no writer is writing
while(readerNumber < 0){
try{
this.wait();
}catch(InterruptedException e){
e.printStackTrace();
} }
// when readerNumber >= 0
readerNumber++;
return buffer;
}
/**
* should be called after reading
*/
public synchronized void finishReading(){
readerNumber--;
if(readerNumber == 0){
this.notifyAll(); // notify possible waiting writers
} }
/**
* get buffer for writing.
* should be called before writing.
* @return the buffer
*/
public synchronized List getBufferForWriting(){
// if some writer is writing or some reader is reading, wait until no one is writing or reading
while(readerNumber != 0){
try{
this.wait();
}catch(InterruptedException e){
e.printStackTrace(); } }
// when readerNumber == 0
readerNumber--; // now readderNumber == -1.
return buffer; }
/**
* should be called after writing */
public synchronized void finishWriting(){
readerNumber++; // readerNumber = -1 + 1 = 0;
// readerNumber must be 0 at this point
this.notifyAll(); // notify possible waiting writers or waiting readers } }
读写资源类RWResource提供了4个Synchronized方法,分成两组,供读者和写者调用。
阅读上面的代码,可以看到,读写资源类RWResource通过readerNumber计数控制对共享资源的读写访问。当readerNumber等于0时,说明资源空闲,可以读写;当readerNumber大于0时,说明资源正在被一些读者读取,其他线程可以读,不可以写;当readerNumber小于0时(-1),说明资源被某个写者占用,正在写入,其他线程不可以读,也不可以写。
读者首先调用getBufferForReading()获取共享资源,如果readerNumber大于等于0,表示没有写者占用资源,读者能够获取共享资源,此时,readerNumber加1,表示读者的个数增加;读取之后,必须调用finishReading()释放资源,此时,readerNumber减1,表示读者的个数减少。
写者首先调用getBufferForWriting()获取共享资源,如果readerNumber等于0,表示资源空闲,写者能够获取到共享资源,此时,readerNumber减1,readerNumber的值变为-1,表示资源正在被写入;写者写完资源之后,必须调用,必须调用finishWriting()释放资源,此时,readerNumber加1,readerNumber的值变为0,回到空闲状态。
另外,还请留意读写资源类RWResource代码里面的wait()和notifyAll()调用。
读者在readerNumber小于0的情况下等待,调用Wait();写者在readerNumber大于0的情况下等待,调用Wait()。
在释放资源时( finishReading()或finishWriting() ),如果readerNumber的值变为0,回到空闲状态,调用notifyAll(),通知潜在的等待者——读者或写者。
4.读者类RunnableReader package thread;
import java.util.List;
import java.util.Iterator;
public class RunnableReader implements Runnable{
private RWResource resource = null;
public RunnableReader() {
}
/**
* must be called before start running
* @param theResource
*/
public void setRWResource(RWResource theResource){
resource = theResource; }
public void run(){
while(true){
// get the reader's name
String readerName = "[" + Thread.currentThread().getName() + "] ";
// first, get buffer for reading
List buffer = resource.getBufferForReading();
// reading
for(Iterator iterator = buffer.iterator(); iterator.hasNext(); ){
System.out.println(readerName + iterator.next()); }
int articleNumber = buffer.size();
int thinkingTime = articleNumber * 1000;
for(int i = 0; i < thinkingTime; i++){
// thingking hard when reading }
// finish reading
resource.finishReading();
// rest
try{
Thread.sleep(articleNumber * 50);
}catch(InterruptedException e){
e.printStackTrace(); } } } }
上述代码中的setRWResource()方法传入共享资源——读写资源类。本例采用参数传递的方法,在读者和写者之间共享读写资源类。
Run()方法实现Runnable接口的Run()方法。首先,获取当前读者(线程)的名称,然后,试图获取读资源——resource.getBufferForReading(),获取资源之后,读取buffer里面的所有文章,边读边思考(注意代码里面的for(int i = 0; i < thinkingTime; i++)行,占用cpu时间,表示思考过程),最后,释放资源——resource.finishReading()。读完文章,读者休息一段时间——Thread.sleep(articleNumber * 50)。
注意,在以上的过程中,一定要严格遵守这样的规定,在resource.getBufferForReading()和resource.finishReading()之间,进行读取操作。
5.写者类RunnableWriter package thread;
import java.util.List;
import java.util.Iterator;
public class RunnableWriter implements Runnable{
private RWResource resource = null;
private int articleNumber = 0;
public RunnableWriter() {
articleNumber = 0;
}
/**
* must be called before start running
* @param theResource
*/
public void setRWResource(RWResource theResource){
resource = theResource; }
public void run(){
while(true){
// get the writer's name
String writerName = "[" + Thread.currentThread().getName() + "] ";
// first, get buffer for reading
List buffer = resource.getBufferForWriting();
int nWritten = 3; // write 4 articles one time
for(int n = 0; n< nWritten; n++){
// writing
articleNumber++;
String articleName = "article" + articleNumber;
buffer.add(articleName);
System.out.println(writerName + articleName);
int thinkingTime = 10000;
for (int i = 0; i < thinkingTime; i++) {
// thingking hard when writing }
} // finish writing
resource.finishWriting();
// rest
try{
Thread.sleep(500);
}catch(InterruptedException e){
e.printStackTrace(); } } } }
上述代码中的setRWResource()方法传入共享资源——读写资源类。本例采用参数传递的方法,在读者和写者之间共享读写资源类。
Run()方法实现Runnable接口的Run()方法。首先,获取当前写者(线程)的名称,然后,试图获取写资源——resource.getBufferForWriting(),获取资源之后,开始向buffer里面写3篇文章,边写边思考(注意代码里面的for(int i = 0; i < thinkingTime; i++)行,占用cpu时间,表示思考过程),最后,释放资源——resource.finishWriting()。读完文章,写者休息一段时间——Thread.sleep(500)。
注意,在以上的过程中,一定要严格遵守这样的规定,在resource.getBufferForWriting()和resource.finishWriting()之间,进行写操作。
6.测试类TestMain package thread;
public class TestMain{
public static void main(String[] args) {
// init 生成共享资源
RWResource resource = new RWResource();
// 生成读者,设置共享资源
RunnableReader reader = new RunnableReader();
reader.setRWResource(resource);
// 生成写者,设置共享资源。
RunnableWriter writer = new RunnableWriter();
writer.setRWResource(resource);
int writerNumber = 5; // 5个写者
int readerNumber = 10; // 10个读者
// start writers 生成5个写线程,并给每个线程起个名字,writer1, writer2…
for(int i = 0; i < writerNumber; i++){
Thread thread = new Thread(writer, "writer" + (i+1));
thread.start(); }
// give writers enough time to think and write articles
try{
Thread.sleep(1000);
}catch(InterruptedException e){
e.printStackTrace(); }
// start readers生成10个读线程,并给每个线程起个名字,reader1, reader2…
for(int i = 0; i < readerNumber; i++){
Thread thread = new Thread(reader, "reader" + (i+1));
thread.start(); } } }
以上的测试类TestMain代码,生成并运行多个写线程和读线程,产生的结果可能如下:
[writer1] article1
[writer1] article2
[writer1] article3 …
[reader2] article1
[reader3] article1
[reader4] article1
[reader5] article1
[reader6] article1…
[reader1] article1 …
[writer3] article67
[writer3] article68
[writer3] article69 …
我们可以看到,Writer写的文章的号码从不相同,而且,每个Writer每次写3篇文章,写的过程从来不会被打断。每个读者每次通读所有的文章,经常有几个读者同时读同一篇文章的情况。
6.两种改进思路 这个例子采用“盒”包装的方法,把“锁”(readerNumber)和资源(buffer)放在同一个“盒子”(RWResource)里,但是,“盒子”对资源的包装是不完全的,只是简单地把资源(buffer)返回给读者和写者,并没有对资源(buffer)的访问操作进行封装。
其实,可以对资源(buffer)的访问进行进一步封装,比如,为“盒子”提供String[] readerBuffer()和writeBuffer(String)两个方法,在这两个方法里面,根据锁(readerNumber)的状态,判断读写操作是否合法,这样,代码的整体性会更好。带来的结果是,RWResource类的调用规范和顺序更加严格,必须在resource.getBufferForReading()和resource.finishReading()调用readerBuffer()方法,必须在resource.getBufferForWriting()和resource.finishWriting()之间调用writeBuffer()方法。否则,这两个方法会报错。
这样做会增加RWResource类的复杂度。还有一些设计上的因素需要考虑——readerBuffer和writeBuffer方法是否应该被synchronized修饰?
从上例看到,在resource.getBufferForReading()和resource.finishReading()之间,进行读操作;在resource.getBufferForWriting()和resource.finishWriting()之间,进行写操作。读操作和写操作部分的代码,是不用同步的。所以,在getBufferForReading()和finishReading()这样的成对操作之间,用synchronized修饰readerBuffer和writeBuffer方法,是多此一举。
但是,从代码的完整性角度来看,因为readerBuffer和writeBuffer方法需要读 “锁”的状态,所以,readerBuffer和writeBuffer方法还是加上synchronized修饰符为好。
考虑到这些因素,本例采取了一种折衷的方法。从形式上看,“锁”和“资源”是聚合在一起的,实际上,两者的操作是分开的,并不相关。“盒子”根据“锁”的状态,调整资源的分配,读者和写者得到资源之后,享有对资源的完全访问权限。
另一个方向是把“资源”和“锁”完全分开。把getBufferForReading方法改成startReading,把getBufferForWriting方法改成startWriting,RWResource不再分配资源,只进行“锁”操作。把RWResource该作RWLock类。
RunnableReader和RunnableWriter类各自增加一个setBuffer()方法,共享buffer资源。这样,RunnableReader和RunnableWriter类就有了两个分开的方法:setBuffer()设置共享资源,setRWLock()设置读写锁。
对本例稍加修改,就可以实现上述的两种思路。限于篇幅,这里不能给出完整的修改代码。
7.补充 本例中对读写资源类RWResource强加了调用顺序。
在resource.getBufferForReading()和resource.finishReading()之间,进行读操作。
在resource.getBufferForWriting()和resource.finishWriting()之间,进行写操作。
要求在执行一些处理之前,一定要执行某项特殊操作,处理之后一定也要执行某项特殊操作。这种人为的顺序性,无疑增加了代码的耦合度,降低了代码的独立性。很有可能会成为线程死锁和资源操作冲突的根源。
这点一直让我不安,可是没有找到方法避免。毕竟,死锁或者资源操作冲突,是线程的固有问题。
很巧的是,正在我惴惴不安的时候,我的一个朋友提供了一个信息。Sun公司根据JCR,决定在jdk1.5中引入关于concurrency(并发)的部分。
以下这个网址是concurrency部分的util.concurrent一个实现。非常好的信息。对于处理多线程并发问题,很有帮助。
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
里面提供了一个ReadWriteLock类,标准用法如下。
Standard usage of ReadWriteLock:
class X {
ReadWriteLock rw;
// ...
public void read() throws InterruptedException {
rw.readLock().acquire();
try {
// ... do the read }
finally {
rw.readlock().release() } }
public void write() throws InterruptedException {
rw.writeLock().acquire();
try {
// ... do the write }
finally {
rw.writelock().release() } } }
我们可以看到,ReadWriteLock同样要求调用的顺序——aquire()和release()。我对自己的例子增强了一点信心。
我又查看了WriterPreferenceReadWriteLock类,看到里面成对的方法,startRead(),endRead();startWrite(),endWrite()。我的心情完全放松了下来。我的思路虽然粗糙,但大体的方向是正确的。
建议,本文的例子比较简单,可以作为同步原理的入门简介,之后,访问专业化的代码http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
Enjoy it. :-)
作者相关文章: Java Thread in JVM(原作) Com Introduction(原作) A Simple Sample for Expression Tree(原作)
对该文的评论 人气:328 Norwaywoods(2003-3-22 2:45:37)
I am coming back. I know the problem your programme have! The main problem and also a big problem is that: the writer have no sequence. What I mean is you will have many writers block on the RWResource Object, when there is no one writing or reading you call notifyAll() to wake up all the thread block on it, so it just can not make sure which writer will be waken up. That makes your programme meaningless. As we know,many writers must be work on the same resource, that is probably the sequence critically.The none-sequence will mess the world.
buaawhl(2003-3-20 14:55:45)
http://java.sun.com/docs/books/jls/second_edition/html/classes.doc.html#30531
A synchronized method acquires a lock (§17.1) before it executes. For a class (static) method, the lock associated with the Class object for the method's class is used. For an instance method, the lock associated with this (the object for which the method was invoked) is used. These are the same locks that can be used by the synchronized statement (§14.18); thus, the code:
class Test { int count; synchronized void bump() { count++; } static int classCount; static synchronized void classBump() { classCount++; } }
has exactly the same effect as:
class BumpTest { int count; void bump() { synchronized (this) { count++; } } static int classCount; static void classBump() { try { synchronized (Class.forName("BumpTest")) { classCount++; } } catch (ClassNotFoundException e) { ... } } }
buaawhl(2003-3-20 11:11:30)
呵呵,我想我明白Norwaywoods的疑惑了。:-) 这里更精确地解答一下: 当用synchronized修饰对象的方法的时候,实际就是同步这个对象本身,并不只是在同步这一个方法。 所有的被synchronized修饰的方法都会在对象本身上同步。 public synchronized void finishReading() public synchronized void finishWriting() public synchronized List getBufferForReading() public synchronized List getBufferForWriting() 这些方法 “之间” 都是同步的。
另外,建议把本文的代码编译运行,观察结果。还建议访问专业化的代码http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
buaawhl(2003-3-20 11:00:52)
http://www.csdn.net/Develop/read_article.asp?id=17493 Norwaywoods的出色的文章,相当精彩,清晰形象,尤其对于我来说——我对JVM的了解不够。 Norwaywoods一向是我的关于Java Thread的文章的最好读者。
解释一下我的文章中的问题: private int readerNumber 只能通过RWResource自身的方法来调用。 RWResource的关于操作readerNumber的方法,全部都被synchronized修饰,不用担心冲突问题。
还有一点,就是关于wait和notify的理解。 当线程调用Wait,放弃同步对象之后,想要重新运行,一定要重新获得放弃的同步对象,才能够重新运行。 RWResource的 同步方法的 同步对象 就是RWResource对象自己。 这点对于理解文中的代码非常重要。 :-)
Norwaywoods(2003-3-20 8:25:00)
经过一晚的思考,我发现是我的理解出了问题。给大家造成迷惑了!呵呵!
Norwaywoods(2003-3-19 23:42:14)
其实,还不止这些只要是操作readerNumber变量的函数都存在同样的问题!也就是下面四个方法:
public synchronized void finishReading()
public synchronized void finishWriting()
public synchronized List getBufferForReading()
public synchronized List getBufferForWriting()
千万不要假设你的一个线程不会被其它线程抢先!!!!!!!!!!!!! 请看我的最新文章: http://www.csdn.net/Develop/read_article.asp?id=17493
Norwaywoods(2003-3-19 23:34:39)
我觉得你还是不要想改进和补充了,你的程序犯了多线程编程的一个大忌。 问题出在你的: public synchronized List getBufferForWriting(){
// if some writer is writing or some reader is reading, wait until no one is writing or reading
(1) while(readerNumber != 0){
try{
this.wait();
}catch(InterruptedException e){
e.printStackTrace(); } }
// when readerNumber == 0
(2) readerNumber--; // now readderNumber == -1.
return buffer; }
你觉得在改变readerNumber的时候,仅仅同步getBufferForWriting()就够了吗?它仅仅能够阻止其它线程调用这个方法,但是不能阻止其它线程改变你的readerNumber的值。 假设有这样的一个情况: 当你的写者线程运行到标记(1)的时候,它发现你的readerNumber = 0,那就继续执行下去。在没有执行到标记(2)的时候(也就是readerNumber还是0),该写者线程被另一个读者线程抢先了。读者线程发现readerNumber =0为不小于零的数,于是它安安心心的把readerNumber置为1,在这个时候OS调度到刚才那个写者线程,它还是认为readerNumber为零,所以它进行了--操作,这是readerNumber变为0。至此,你的程序已经完全崩溃了!
经验教训:对于关键的共享变量的操作,仅仅同步方法是不够的,必须同步整个对象。保证在一个操作没完成之前,其它操作不允许改变它。
这是一个很典型的反面教程!希望大家都能吸取这个教训!这样才能写出安全的代码!
↑返回目录
前一篇: Java Thread应该注意的问题
后一篇: java swing的drag and drop源程序