这篇博客的目的是了解Java Concurrent包中Lock的实现原理.
java.util.concurrent.locks包下提供了一系列关于锁的抽象的类,主要实现有两种锁ReentrantLock和ReentrantReadWriteLock.
1.一个简单的锁
让我们从java中的一个同步块开始
1.1 synchronized实现同步块
1
2
3
4
5
6
7
8
9
public class Counter {
private int count = 0 ;
public int inc (){
synchronized ( this ){
return ++ count ;
}
}
}
可以看到在inc()方法中有一个synchronized(this)代码块。该代码块可以保证在同一时间只有一个线程可以执行return ++count操作。虽然在synchronized的同步块中的代码可以更加复杂,但是++count这种简单的操作已经足以表达出线程同步的意思。
1.2 Lock实现同步块
以下的Counter类用Lock代替synchronized可以达到了同样的目的:
1
2
3
4
5
6
7
8
9
10
11
public class Counter {
private Lock lock = new Lock (); //锁
private int count = 0 ;
public int inc (){
lock . lock ();
int newCount = ++ count ;
lock . unlock ();
return newCount ;
}
}
lock()方法会对Lock实例对象进行加锁,因此所有对该对象调用lock()方法的线程都会被阻塞,直到该Lock对象的unlock()方法被调用。
1.3 Lock类实现
这里有一个Lock类的简单实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Lock {
private boolean isLocked = false ;
public synchronized void lock ()
throws InterruptedException {
while ( isLocked ){
wait (); //阻塞
}
isLocked = true ;
}
public synchronized void unlock (){
isLocked = false ;
notify (); //唤醒
}
}
注意其中的while(isLocked)循环,它又被叫做“自旋锁”。自旋锁以及wait()和notify()方法在线程通信 这篇文章中有更加详细的介绍。
当isLocked为true时,调用lock()的线程在wait()调用上阻塞等待。为防止该线程没有收到notify()调用也从wait()中返回,这个线程会重新去检查isLocked条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。如果isLocked为false,当前线程会退出while(isLocked)循环,并将isLocked设回true,让其它正在调用lock()方法的线程能够在Lock实例上加锁。
当线程完成了临界区(锁住的代码端,即位于lock()和unlock()之间)中的代码,就会调用unlock()。执行unlock()会重新将isLocked设置为false,并且通知(唤醒)其中一个(若有的话)在lock()方法中调用了wait()函数而处于等待状态的线程。
1.4 在finally语句中调用unlock()
如果用Lock来保护临界区,并且临界区有可能会抛出异常,那么在finally语句中调用unlock()就显得非常重要了,这样可以保证这个锁对象可以被解锁以便其它线程能继续对其加锁:
lock.lock();
try{
//do critical section code,
//which may throw exception
} finally {
lock.unlock();
}
这个简单的结构可以保证当临界区抛出异常时Lock对象可以被解锁。如果不是在finally语句中调用的unlock(),当临界区抛出异常时,Lock对象将永远停留在被锁住的状态,这会导致其它所有在该Lock对象上调用lock()的线程一直阻塞。
2.锁的可重入性
2.1 什么是可重入
如果一个线程持有某个管程对象(monitor object)上的锁,那么它就有权访问所有在该管程对象上同步块。这就叫可重入。
通俗讲即若线程已经持有锁,那么它就可以重复访问所有使用该锁的代码块。
2.2 synchronized同步块是可重入的
Java中的synchronized同步块是可重入的。这意味着如果一个java线程进入了代码中的synchronized同步块,并因此获得了该同步块使用的同步对象对应的管程上的锁,那么这个线程可以进入由同一个管程对象所同步的另一个java代码块。因此下面的代码没有问题:
1
2
3
4
5
6
7
8
9
public class Reentrant {
public synchronized outer (){
inner ();
}
public synchronized inner (){
//do something
}
}
注意outer()和inner()都声明为synchronized,这在Java中这相当于synchronized(this)块.如果某个线程调用了outer(),outer()中的inner()调用是没问题的,因为两个方法都是在同一个管程对象(即this)上同步的。
2.3 不可重入锁
1.3节的锁实现是不可重入的,原因是如果一个线程在两次调用lock()间没有调用unlock()方法,那么第二次调用lock()就会被阻塞,这就出现了重入锁死,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Reentrant2 {
Lock lock = new Lock ();
public outer (){
lock . lock ();
inner ();
lock . unlock ();
}
public synchronized inner (){
lock . lock ();
//do something
lock . unlock ();
}
}
调用outer()的线程首先会锁住Lock实例,然后继续调用inner()。inner()方法中该线程将再一次尝试锁住Lock实例,结果该动作会失败(也就是说该线程会被阻塞),因为这个Lock实例已经在outer()方法中被锁住了。
2.4 修改成可重入锁
为了让这个Lock类具有可重入性,我们需要对它做一点小的改动:
(1).可重入锁需要一个重入计数变量,初始值设为0,当成功请求锁时加1,释放锁时减1,当释放锁之后计数为0则真正释放锁;
(2).重入锁还必须持有对锁持有者的引用,用以判断是否可以重入;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Lock {
boolean isLocked = false ;
Thread lockedBy = null ; //阻塞的线程
int lockedCount = 0 ; //同一个线程加锁的次数
public synchronized void lock () throws InterruptedException {
Thread callingThread = Thread . currentThread ();
while ( isLocked && lockedBy != callingThread ){
wait ();
}
isLocked = true ;
lockedCount ++;
lockedBy = callingThread ;
}
public synchronized void unlock (){
if ( Thread . curentThread () == this . lockedBy ){
lockedCount --;
if ( lockedCount == 0 ){ //锁次数为0之后才解锁
isLocked = false ;
notify ();
}
}
}
}
注意到现在的while循环(自旋锁)也考虑到了已锁住该Lock实例的线程。如果当前的锁对象没有被加锁(isLocked = false),或者当前调用线程已经对该Lock实例加了锁,那么while循环就不会被执行,调用lock()的线程就不会被阻塞了。
除此之外,我们需要记录同一个线程重复对一个锁对象加锁的次数。否则,一次unblock()调用就会解除整个锁,即使当前锁已经被加锁过多次。在unlock()调用没有达到对应lock()调用的次数之前,我们不希望锁被解除。
3.锁的公平性
Java的synchronized块并不保证尝试进入它们的线程的顺序。如果一个线程因为CPU时间全部被其他线程抢走而得不到CPU运行时间,这种状态被称之为“饥饿”。而该线程被“饥饿致死”正是因为它得不到CPU运行时间的机会。解决饥饿的方案被称之为“公平性”,即所有线程均能公平地获得运行机会。
通俗讲,如果在绝对时间上,先对锁进行获取的请求一定被先满足,那么这个锁是公平的,反之,是不公平的,也就是说等待时间最长的线程最有机会获取锁,也可以说锁的获取是有序的。
3.1 Java中导致饥饿的原因
(1).高优先级线程抢占所有的低优先级线程的CPU时间
我们能为每个线程设置独自的线程优先级,优先级越高的线程获得的CPU时间越多,线程优先级值设置在1到10之间,这些优先级值所表示行为的准确解释则依赖于你的应用运行平台。
(2).线程被永久堵塞在一个等待进入同步块的状态
Java的同步代码区也是一个导致饥饿的因素。Java的同步代码区对哪个线程允许进入的次序没有任何保障。这就意味着理论上存在一个试图进入该同步区的线程处于被永久堵塞的风险,因为其他线程总是能持续地先于它获得访问,这即是“饥饿”问题,而一个线程被“饥饿致死”正是因为它得不到CPU运行时间的机会。
(3).线程在等待一个本身也处于永久等待完成的对象(比如调用这个对象的wait方法)
如果多个线程处在wait()方法执行上,而对其调用notify()不会保证哪一个线程会获得唤醒,任何线程都有可能处于继续等待的状态。因此存在这样一个风险:一个等待线程从来得不到唤醒,因为其他等待线程总是能被获得唤醒。
3.2 实现公平锁
下面来讲述将上面Lock类转变为公平锁FairLock。基本原理是每一个调用lock()的线程都会进入一个队列,当解锁后,只有队列里的第一个线程被允许锁住Farlock实例,所有其它的线程都将处于等待状态,直到他们处于队列头部。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class FairLock {
private boolean isLocked = false ;
private Thread lockingThread = null ;
//用于等待的线程队列
private List < QueueObject > waitingThreads = new ArrayList < QueueObject >();
public void lock () throws InterruptedException {
QueueObject queueObject = new QueueObject ();
boolean isLockedForThisThread = true ;
synchronized ( this ){ //加入等到队列
waitingThreads . add ( queueObject );
}
while ( isLockedForThisThread ){
synchronized ( this ){
isLockedForThisThread =
isLocked || waitingThreads . get ( 0 ) != queueObject ;
if (! isLockedForThisThread ){ //当前线程没有被阻塞
isLocked = true ;
waitingThreads . remove ( queueObject );
lockingThread = Thread . currentThread ();
return ;
}
}
//isLockedForThisThread == true
try {
queueObject . doWait (); //通知当前线程等待
} catch ( InterruptedException e ){
synchronized ( this ) {
waitingThreads . remove ( queueObject );
}
throw e ;
}
}
}
public synchronized void unlock (){
if ( this . lockingThread != Thread . currentThread ()){
throw new IllegalMonitorStateException (
"Calling thread has not locked this lock" );
}
isLocked = false ;
lockingThread = null ;
if ( waitingThreads . size () > 0 ){
waitingThreads . get ( 0 ). doNotify ();
}
}
}
QueueObject实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class QueueObject {
private boolean isNotified = false ; //被唤醒
public synchronized void doWait () throws InterruptedException {
while (! isNotified ){
this . wait ();
}
this . isNotified = false ;
}
public synchronized void doNotify () {
this . isNotified = true ;
this . notify (); //唤醒等待当前QueueObject对象的线程
}
public boolean equals ( Object o ) {
return this == o ;
}
}
首先注意到lock()方法不在声明为synchronized,取而代之的是对必需同步的代码,在synchronized中进行嵌套。
FairLock新创建了一个QueueObject的实例,并对每个调用lock()的线程进行入队列。调用unlock()的线程将从队列头部获取QueueObject,并对其调用doNotify(),以唤醒在该对象上等待的线程。通过这种方式,在同一时间仅有一个等待线程获得唤醒,而不是所有的等待线程。这也是实现FairLock公平性的核心所在。
请注意,在同一个同步块中,锁状态依然被检查和设置,以避免出现滑漏条件。
还需注意到,QueueObject实际是一个信号量(Semaphore)。doWait()和doNotify()方法在QueueObject中保存着信号。这样做以避免一个线程在调用queueObject.doWait()之前被另一个调用unlock()并随之调用queueObject.doNotify()的线程重入,从而导致信号丢失。queueObject.doWait()调用放置在synchronized(this)块之外,以避免被monitor嵌套锁死,所以另外的线程可以解锁,只要当没有线程在lock方法的synchronized(this)块中执行即可。
最后,注意到queueObject.doWait()在try – catch块中是怎样调用的。在InterruptedException抛出的情况下,线程得以离开lock(),并需让它从队列中移除。
4.ReentrantLock源码
ReentrantLock是java中可重入锁的一个实现,一次只能有一个线程持有锁,也即所谓独占锁的概念。它包含三个内部类:Sync、NonfairSync、FairSync,通过构造函数的参数来指定锁是否是公平的,下面是一些核心代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ReentrantLock implements Lock , java . io . Serializable {
private final Sync sync ;
//默认是不公平锁
public ReentrantLock () {
sync = new NonfairSync ();
}
//参数fair决定,true为公平锁实现,false为非公平锁实现
public ReentrantLock ( boolean fair ) {
sync = ( fair )? new FairSync () : new NonfairSync ();
}
public void lock () {
sync . lock ();
}
//不公平锁
public boolean tryLock () {
return sync . nonfairTryAcquire ( 1 );
}
//带超时时间的锁
public boolean tryLock ( long timeout , TimeUnit unit ) throws InterruptedException {
return sync . tryAcquireNanos ( 1 , unit . toNanos ( timeout ));
}
public void unlock () {
sync . release ( 1 );
}
//是否有线程等待当前锁
public final boolean hasQueuedThreads () {
return sync . hasQueuedThreads ();
}
...
我们可以发现,ReentrantLock都是把具体实现委托给内部类(Sync、NonfairSync、FairSync),ReentrantLock的重入计数是使用AbstractQueuedSynchronizer的state属性的,state大于0表示锁被占用、等于0表示空闲,小于0则是重入次数太多导致溢出了.
4.1 ReentrantLock.Sync
Sync类代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static abstract class Sync extends AbstractQueuedSynchronizer {
abstract void lock ();
//非公平获取,公平锁和非公平锁都需要这个方法
final boolean nonfairTryAcquire ( int acquires ) {
final Thread current = Thread . currentThread ();
int c = getState ();
if ( c == 0 ) { //state == 0表示无锁
//CAS确保即使有多个线程竞争锁也是安全的
if ( compareAndSetState ( 0 , acquires )) { //加锁成功
setExclusiveOwnerThread ( current ); //设置当前持有锁的线程
return true ; //获取成功
}
}
else if ( current == getExclusiveOwnerThread ()) { //当前线程正是锁持有者
int nextc = c + acquires ;
if ( nextc < 0 ) // 被锁次数上溢(很少出现)
throw new Error ( "Maximum lock count exceeded" );
//锁被持有的情况下,只有持有者才能更新锁保护的资源
setState ( nextc );
return true ;
}
return false ;
}
//释放
protected final boolean tryRelease ( int releases ) {
int c = getState () - releases ;
//只有锁的持有者才能释放锁
if ( Thread . currentThread () != getExclusiveOwnerThread ())
throw new IllegalMonitorStateException ();
boolean free = false ;
if ( c == 0 ) { //锁被释放
free = true ;
setExclusiveOwnerThread ( null );
}
setState ( c );
return free ;
}
//当前线程是否持有锁
protected final boolean isHeldExclusively () {
return getExclusiveOwnerThread () == Thread . currentThread ();
}
final ConditionObject newCondition () {
return new ConditionObject ();
}
//锁的持有者
final Thread getOwner () {
return getState () == 0 ? null : getExclusiveOwnerThread ();
}
//加锁次数
final int getHoldCount () {
return isHeldExclusively () ? getState () : 0 ;
}
//是否上锁,根据state字段可以判断
final boolean isLocked () {
return getState () != 0 ;
}
}
4.2 ReentrantLock.NonfairSync
公平锁的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
final static class NonfairSync extends Sync {
// 执行lock,尝试立即闯入,失败就退回常规流程
final void lock () {
if ( compareAndSetState ( 0 , 1 )) //比较并设置state,成功则表示获取成功
setExclusiveOwnerThread ( Thread . currentThread ()); //锁持有者
else
acquire ( 1 ); //获取失败,进入常规流程:acquire会首先调用tryAcquire
}
protected final boolean tryAcquire ( int acquires ) {
return nonfairTryAcquire ( acquires );
}
}
acquire的实现(AbstractQueuedSynchronizer.java):
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
就是说常规流程下,首先会执行Sync类中非公平获取(nonfairTryAcquire(1))的过程.
4.3 ReentrantLock.FairSync
非公平锁的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final static class FairSync extends Sync {
//见AbstractQueuedSynchronizer.java, 4.2节有
final void lock () {
acquire ( 1 );
}
//公平版本的tryAcquire,除非是递归调用或没有等待者或者是第一个,否则不授予访问
protected final boolean tryAcquire ( int acquires ) {
final Thread current = Thread . currentThread ();
int c = getState ();
if ( c == 0 ) {
//是等待队列的第一个等待者
if ( isFirst ( current ) &&
compareAndSetState ( 0 , acquires )) {
setExclusiveOwnerThread ( current ); //加锁成功
return true ;
}
}
//当前线程正是线程的持有者
else if ( current == getExclusiveOwnerThread ()) {
int nextc = c + acquires ;
if ( nextc < 0 ) //溢出
throw new Error ( "Maximum lock count exceeded" );
setState ( nextc );
return true ;
}
return false ;
}
}
isFirst的实现,即等待队列为空或者当前线程为等待队列的第一个元素:
final boolean isFirst(Thread current) {
Node h, s;
return ((h = head) == null ||
((s = h.next) != null && s.thread == current) ||
fullIsFirst(current));
}
4.4 lock() VS lockInterruptibly()
先说线程请求锁的几个方法:
lock():拿不到lock就不罢休,不然线程就一直block;
tryLock():马上返回,拿到lock就返回true,不然返回false;
带时间限制的tryLock():拿不到lock,就等一段时间,超时返回false;
先说说线程的打扰机制,每个线程都有一个 打扰 标志。这里分两种情况:
1. 线程在sleep或wait,join, 此时如果别的进程调用此进程的 interrupt()方法,此线程会被唤醒并被要求处理InterruptedException;
2. 此线程在运行中,则不会收到提醒。但是 此线程的 “打扰标志”会被设置, 可以通过isInterrupted()查看并作出处理。
lockInterruptibly()和上面的第一种情况是一样的,线程在请求lock并被阻塞时,如果被interrupt,则“此线程会被唤醒并被要求处理InterruptedException”。
5.读写锁
假设你的程序中涉及到对一些共享资源的读和写操作,且写操作没有读操作那么频繁。在没有写操作的时候,两个线程同时读一个资源没有任何问题,所以应该允许多个线程能在同时读取共享资源;但是如果有一个线程想去写这些共享资源,就不应该再有其它线程对该资源进行读或写(也就是说:读-读能共存,读-写不能共存,写-写不能共存),这就需要一个读/写锁来解决这个问题。
5.1 读/写锁的Java实现
如果某个线程想要读取资源,只要没有线程正在对该资源进行写操作且没有线程请求对该资源的写操作即可。我们假设对写操作的请求比对读操作的请求更重要,即需要提升写请求的优先级;
此外,如果读操作发生的比较频繁,我们又没有提升写操作的优先级,那么就会产生“饥饿”现象,请求写操作的线程会一直阻塞,直到所有的读线程都从ReadWriteLock上解锁了(因为多个读可以共存),如果一直保证新线程的读操作权限,那么等待写操作的线程就会一直阻塞下去,结果就是发生“饥饿”;
因此,只有当没有线程正在锁住ReadWriteLock进行写操作,且没有线程请求该锁准备执行写操作时,才能保证读操作继续.
根据上面的描述,我们可以对读写访问资源的条件做个概述:
读:没有线程正在做写操作,且没有线程在请求写操作
写:没有线程正在做读写操作
按照上面的叙述,简单的实现出一个读/写锁,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ReadWriteLock {
private int readers = 0 ; //在读线程数
private int writers = 0 ; //在写线程数目
private int writeRequests = 0 ; //请求写线程数目
//尝试读加锁,有写或者有写请求时候,等待
public synchronized void lockRead () throws InterruptedException {
while ( writers > 0 || writeRequests > 0 ){
wait ();
}
readers ++;
}
//读解锁
public synchronized void unlockRead (){
readers --;
notifyAll ();
}
//尝试写加锁
public synchronized void lockWrite () throws InterruptedException {
writeRequests ++;
//有在读或者在在写的线程
while ( readers > 0 || writers > 0 ){
wait ();
}
writeRequests --;
writers ++;
}
//写解锁
public synchronized void unlockWrite () throws InterruptedException {
writers --;
notifyAll ();
}
}
在两个释放锁的方法(unlockRead,unlockWrite)中,都调用了notifyAll方法,而不是notify。要解释这个原因,我们可以想象下面一种情形:
如果有线程在等待获取读锁,同时又有线程在等待获取写锁。如果这时其中一个等待读锁的线程被notify方法唤醒,但因为此时仍有请求写锁的线程存在(writeRequests>0),所以被唤醒的线程会再次进入阻塞状态。然而,等待写锁的线程一个也没被唤醒,就像什么也没发生过一样(出现了所谓的信号丢失现象)。如果用的是notifyAll方法,所有的线程都会被唤醒,然后判断能否获得其请求的锁。
用notifyAll还有一个好处。如果有多个读线程在等待读锁且没有线程在等待写锁时,调用unlockWrite()后,所有等待读锁的线程都能立马成功获取读锁(共享读锁),而不是一次只允许一个。
5.2 读/写锁的重入
上面实现的读/写锁(ReadWriteLock) 是不可重入的,当一个已经持有写锁的线程再次请求写锁时,就会被阻塞。原因是已经有一个写线程了(就是它自己)。此外,考虑下面的例子:
Thread 1 获得了读锁
Thread 2 请求写锁,但因为Thread 1 持有了读锁,所以写锁请求被阻塞。
Thread 1 再想请求一次读锁,但因为Thread 2处于请求写锁的状态,所以想再次获取读锁也会被阻塞。
上面这种情形使用前面的ReadWriteLock就会被锁定,即一种类似于死锁的情形。导致不会再有线程能够成功获取读锁或写锁了。
为了让ReadWriteLock可重入,需要对它做一些改进,下面会分别处理读锁的重入和写锁的重入。
5.3 读锁重入
了让ReadWriteLock的读锁可重入,我们要先为读锁重入建立规则:
要保证某个线程中的读锁可重入,要么满足获取读锁的条件(没有写或写请求),要么已经持有读锁(不管是否有写请求).
要确定一个线程是否已经持有读锁,可以用一个map来存储已经持有读锁的线程以及对应线程获取读锁的次数,即Map<thread, count>,当需要判断某个线程能否获得读锁时,就利用map中存储的数据进行判断。下面是方法lockRead和unlockRead修改后的的代码(省略了写锁代码):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ReadWriteLock {
//读锁计数器,记录当前持有读锁的线程以及加锁次数
private Map < Thread , Integer > readingThreads =
new HashMap < Thread , Integer >();
private int writers = 0 ;
private int writeRequests = 0 ;
public synchronized void lockRead () throws InterruptedException {
Thread callingThread = Thread . currentThread ();
while (! canGrantReadAccess ( callingThread )){
wait ();
}
readingThreads . put ( callingThread , ( getAccessCount ( callingThread ) + 1 ));
}
public synchronized void unlockRead (){
Thread callingThread = Thread . currentThread ();
int accessCount = getAccessCount ( callingThread );
if ( accessCount == 1 ) { //加锁次数为0了
readingThreads . remove ( callingThread );
} else {
readingThreads . put ( callingThread , ( accessCount - 1 ));
}
notifyAll ();
}
//判断当前线程能否获得读锁
private boolean canGrantReadAccess ( Thread callingThread ){
if ( writers > 0 ) return false ;
if ( isReader ( callingThread ) return true ;
if ( writeRequests > 0 ) return false ;
return true ;
}
//获取这个读锁目前的加锁次数
private int getReadAccessCount ( Thread callingThread ){
Integer accessCount = readingThreads . get ( callingThread );
if ( accessCount == null ) return 0 ;
return accessCount . intValue ();
}
//当前线程是否为已加锁的读锁
private boolean isReader ( Thread callingThread ){
return readingThreads . get ( callingThread ) != null ;
}
}
代码中我们可以看到(canGrantReadAccess函数),只有在没有线程拥有写锁的情况下才允许读锁的重入,此外,重入的读锁比写锁请求优先级高。
5.4 写锁重入
因为写锁是排他锁,写锁的可重入规则比较简单:
仅当一个线程已经持有写锁,才允许写锁重入(再次获得写锁)。
下面是方法lockWrite和unlockWrite修改后的的代码(省略了写锁的代码):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ReadWriteLock {
private Map < Thread , Integer > readingThreads =
new HashMap < Thread , Integer >();
private int writeAccesses = 0 ;
private int writeRequests = 0 ;
private Thread writingThread = null ; //当前的写锁线程
public synchronized void lockWrite () throws InterruptedException {
writeRequests ++;
Thread callingThread = Thread . currentThread ();
while (! canGrantWriteAccess ( callingThread )){
wait ();
}
writeRequests --;
writeAccesses ++;
writingThread = callingThread ;
}
public synchronized void unlockWrite () throws InterruptedException {
writeAccesses --;
if ( writeAccesses == 0 ){
writingThread = null ;
}
notifyAll ();
}
//判断能否加写锁
private boolean canGrantWriteAccess ( Thread callingThread ){
if ( hasReaders ()) return false ;
if ( writingThread == null ) return true ;
if (! isWriter ( callingThread )) return false ;
return true ;
}
//有线程正持有读锁
private boolean hasReaders (){
return readingThreads . size () > 0 ;
}
//请求线程是否为当前已加写锁的线程
private boolean isWriter ( Thread callingThread ){
return writingThread == callingThread ;
}
}
5.5 读锁升级到写锁
有时,我们希望一个拥有读锁的线程,也能获得写锁。想要允许这样的操作,要求这个线程是唯一一个拥有读锁的线程(因为写锁是排他锁)。writeLock()需要做点改动来达到这个目的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ReadWriteLock {
private Map < Thread , Integer > readingThreads =
new HashMap < Thread , Integer >();
private int writeAccesses = 0 ;
private int writeRequests = 0 ;
private Thread writingThread = null ;
public synchronized void lockWrite () throws InterruptedException {
writeRequests ++;
Thread callingThread = Thread . currentThread ();
while (! canGrantWriteAccess ( callingThread )){
wait ();
}
writeRequests --;
writeAccesses ++;
writingThread = callingThread ;
}
public synchronized void unlockWrite () throws InterruptedException {
writeAccesses --;
if ( writeAccesses == 0 ){
writingThread = null ;
}
notifyAll ();
}
//能否获得写锁,如果是当前唯一的一个读锁,则可以升级获得写锁
private boolean canGrantWriteAccess ( Thread callingThread ){
if ( isOnlyReader ( callingThread )) return true ;
if ( hasReaders ()) return false ;
if ( writingThread == null ) return true ;
if (! isWriter ( callingThread )) return false ;
return true ;
}
private boolean hasReaders (){
return readingThreads . size () > 0 ;
}
private boolean isWriter ( Thread callingThread ){
return writingThread == callingThread ;
}
//当前持有锁的线程是否为唯一的读锁线程
private boolean isOnlyReader ( Thread callingThread ){
return readers == 1 && readingThreads . get ( callingThread ) != null ;
}
}
5.6 写锁降级到读锁
有时拥有写锁的线程也希望得到读锁。如果一个线程拥有了写锁,那么自然其它线程是不可能拥有读锁或写锁了。所以对于一个拥有写锁的线程,再获得读锁,是不会有什么危险的。
我们仅仅需要对上面canGrantReadAccess方法进行简单地修改:
1
2
3
4
5
6
7
8
9
10
public class ReadWriteLock {
//能否获得读锁,当当前持有的线程为一个写锁线程,则可以
private boolean canGrantReadAccess ( Thread callingThread ){
if ( isWriter ( callingThread )) return true ;
if ( writingThread != null ) return false ;
if ( isReader ( callingThread ) return true ;
if ( writeRequests > 0 ) return false ;
return true ;
}
}
5.7 可重入的ReadWriteLock的完整实现
下面是完整的ReadWriteLock实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class ReadWriteLock {
private Map < Thread , Integer > readingThreads =
new HashMap < Thread , Integer >();
private int writeAccesses = 0 ;
private int writeRequests = 0 ;
private Thread writingThread = null ;
public synchronized void lockRead () throws InterruptedException {
Thread callingThread = Thread . currentThread ();
while (! canGrantReadAccess ( callingThread )){
wait ();
}
readingThreads . put ( callingThread ,
( getReadAccessCount ( callingThread ) + 1 ));
}
private boolean canGrantReadAccess ( Thread callingThread ){
if ( isWriter ( callingThread )) return true ;
if ( hasWriter ()) return false ;
if ( isReader ( callingThread )) return true ;
if ( hasWriteRequests ()) return false ;
return true ;
}
public synchronized void unlockRead (){
Thread callingThread = Thread . currentThread ();
if (! isReader ( callingThread )){
throw new IllegalMonitorStateException (
"Calling Thread does not" +
" hold a read lock on this ReadWriteLock" );
}
int accessCount = getReadAccessCount ( callingThread );
if ( accessCount == 1 ){
readingThreads . remove ( callingThread );
} else {
readingThreads . put ( callingThread , ( accessCount - 1 ));
}
notifyAll ();
}
public synchronized void lockWrite () throws InterruptedException {
writeRequests ++;
Thread callingThread = Thread . currentThread ();
while (! canGrantWriteAccess ( callingThread )){
wait ();
}
writeRequests --;
writeAccesses ++;
writingThread = callingThread ;
}
public synchronized void unlockWrite ()
throws InterruptedException {
if (! isWriter ( Thread . currentThread ()){
throw new IllegalMonitorStateException (
"Calling Thread does not" +
" hold the write lock on this ReadWriteLock" );
}
writeAccesses --;
if ( writeAccesses == 0 ){
writingThread = null ;
}
notifyAll ();
}
private boolean canGrantWriteAccess ( Thread callingThread ){
if ( isOnlyReader ( callingThread )) return true ;
if ( hasReaders ()) return false ;
if ( writingThread == null ) return true ;
if (! isWriter ( callingThread )) return false ;
return true ;
}
private int getReadAccessCount ( Thread callingThread ){
Integer accessCount = readingThreads . get ( callingThread );
if ( accessCount == null ) return 0 ;
return accessCount . intValue ();
}
private boolean hasReaders (){
return readingThreads . size () > 0 ;
}
private boolean isReader ( Thread callingThread ){
return readingThreads . get ( callingThread ) != null ;
}
private boolean isOnlyReader ( Thread callingThread ){
return readingThreads . size () == 1 &&
readingThreads . get ( callingThread ) != null ;
}
private boolean hasWriter (){
return writingThread != null ;
}
private boolean isWriter ( Thread callingThread ){
return writingThread == callingThread ;
}
private boolean hasWriteRequests (){
return this . writeRequests > 0 ;
}
}
在利用ReadWriteLock来保护临界区时,如果临界区可能抛出异常,在finally块中调用readUnlock()和writeUnlock()就显得很重要了。这样做是为了保证ReadWriteLock能被成功解锁,然后其它线程可以请求到该锁:
lock.lockWrite();
try{
//do critical section code, which may throw exception
} finally {
lock.unlockWrite();
}
6.ReentrantReadWriteLock源码
ReentrantLock 实现了标准的互斥操作,也就是一次只能有一个线程持有锁,也即所谓独占锁的概念。前面的章节中一直在强调这个特点。显然这个特点在一定程度上面减低了吞吐量,实际上独占锁是一种保守的锁策略,在这种情况下任何“读/读”,“写/读”,“写/写”操作都不能同时发生。
读写锁使用的场合是一个共享资源被大量读取操作,而只有少量的写操作(修改数据).
ReentrantReadWriteLock是java中可重入读写锁的一个实现.包含五个内部类:Sync,NonfairSync,FairSync,ReadLock,WriteLock.其中Sync,NonfairSync,FairSync用上公平锁和非公平锁的实现,ReadLock和WriteLock分别用于实现读锁和写锁.
ReentrantReadWriteLock的一些特性:
(1).写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性;读取锁是不能直接升级为写入锁的;
(2).读取锁和写入锁的数量最大分别只能是65535;
6.1 ReentrantReadWriteLock一些关键的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ReentrantReadWriteLock implements ReadWriteLock , java . io . Serializable {
private final ReentrantReadWriteLock . ReadLock readerLock ;
private final ReentrantReadWriteLock . WriteLock writerLock ;
private final Sync sync ;
//默认实现是非公平的
public ReentrantReadWriteLock () {
this ( false );
}
public ReentrantReadWriteLock ( boolean fair ) {
sync = ( fair )? new FairSync () : new NonfairSync ();
readerLock = new ReadLock ( this );
writerLock = new WriteLock ( this );
}
public int getReadLockCount () {
return sync . getReadLockCount ();
}
public boolean isWriteLocked () {
return sync . isWriteLocked ();
}
//是否由当前线程持有写锁
public boolean isWriteLockedByCurrentThread () {
return sync . isHeldExclusively ();
}
//读锁等待队列
protected Collection < Thread > getQueuedWriterThreads () {
return sync . getExclusiveQueuedThreads ();
}
//写锁等待队列
protected Collection < Thread > getQueuedReaderThreads () {
return sync . getSharedQueuedThreads ();
}
}
6.2 读锁实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class ReadLock implements Lock , java . io . Serializable {
private final Sync sync ;
//加锁(共享锁)
public void lock () {
sync . acquireShared ( 1 );
}
public boolean tryLock () {
return sync . tryReadLock ();
}
public boolean tryLock ( long timeout , TimeUnit unit ) throws InterruptedException {
return sync . tryAcquireSharedNanos ( 1 , unit . toNanos ( timeout ));
}
//解锁
public void unlock () {
sync . releaseShared ( 1 );
}
}
6.3 写锁实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static class WriteLock implements Lock , java . io . Serializable {
private final Sync sync ;
//排他锁
public void lock () {
sync . acquire ( 1 );
}
public boolean tryLock () {
return sync . tryWriteLock ();
}
public boolean tryLock ( long timeout , TimeUnit unit ) throws InterruptedException {
return sync . tryAcquireNanos ( 1 , unit . toNanos ( timeout ));
}
public void unlock () {
sync . release ( 1 );
}
//是否有当前线程持有
public boolean isHeldByCurrentThread () {
return sync . isHeldExclusively ();
}
//加锁次数
public int getHoldCount () {
return sync . getWriteHoldCount ();
}
}
6.4 state
ReentrantLock和ReentrantReadWriteLock的实现很多细节都是由借由AbstractQueuedSynchronizer实现.
AQS中有一个state字段(int类型,32位)用来描述有多少线程获持有锁。在独占锁的时代这个值通常是0或者1(如果是重入的就是重入的次数),在共享锁的时代就是持有锁的数量。
ReentrantLock的锁是排他锁,因此只用来表示重入数.
ReentrantReadWriteLock的读、写锁是相关但是又不一致的,所以需要两个数来描述读锁(共享锁)和写锁(独占锁)的数量。显然现在一个state就不够用了。于是在ReentrantReadWrilteLock里面将这个字段一分为二,高位16位表示共享锁的数量,低位16位表示独占锁的数量(或者重入数量),这就是上节中提到的为什么共享锁和独占锁的数量(包括重入次数)最大只能是65535的原因了.
我们可以借此分析写入锁获取片段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryAcquire ( int acquires ) {
Thread current = Thread . currentThread ();
int c = getState ();
int w = exclusiveCount ( c );
if ( c != 0 ) {
//有读锁或者写锁持有者不是当前线程
if ( w == 0 || current != getExclusiveOwnerThread ())
return false ;
if ( w + exclusiveCount ( acquires ) > MAX_COUNT )
throw new Error ( "Maximum lock count exceeded" );
}
if (( w == 0 && writerShouldBlock ( current )) ||
! compareAndSetState ( c , c + acquires ))
return false ;
setExclusiveOwnerThread ( current );
return true ;
}
exclusiveCount©的实现很简单,就是用掩码去掉高16位(c & EXCLUSIVE_MASK).
writerShouldBlock的实现分为公平锁和非公平锁两个实现,不公平锁的实现
1
2
3
final boolean writerShouldBlock ( Thread current ) {
return false ;
}
即在非公平锁中写锁优先级高.
writerShouldBlock公平锁的实现:
1
2
3
4
final boolean writerShouldBlock ( Thread current ) {
// only proceed if queue is empty or current thread at head
return ! isFirst ( current );
}
即请求写锁的线程不是等待队列的第一个就会被阻塞,即读锁和写锁是公平排队的.
读锁的获取:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final int tryAcquireShared ( int unused ) {
Thread current = Thread . currentThread ();
int c = getState ();
//有写锁并且写锁持有者不是当前线程(考虑写锁可以降级到读锁)
if ( exclusiveCount ( c ) != 0 && getExclusiveOwnerThread () != current )
return - 1 ;
if ( sharedCount ( c ) == MAX_COUNT )
throw new Error ( "Maximum lock count exceeded" );
if (! readerShouldBlock ( current ) && compareAndSetState ( c , c + SHARED_UNIT )){
HoldCounter rh = cachedHoldCounter ; //首先访问缓存
if ( rh == null || rh . tid != current . getId ())
cachedHoldCounter = rh = readHolds . get ();
rh . count ++;
return 1 ;
}
return fullTryAcquireShared ( current );
}
这里注意到在读锁的获取和释放都用到了一个HoldCounter类,下面会介绍.
6.5 HoldCounter
在ReentrantReadWriteLock.Sync中有包含几个内部类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 每个线程特定的 read 持有计数。存放在ThreadLocal
*/
static final class HoldCounter {
int count ;
// 用id而不是引用是为了避免保留垃圾
final long tid = Thread . currentThread (). getId ();
/** Decrement if positive; return previous value */
int tryDecrement () {
int c = count ;
if ( c > 0 )
count = c - 1 ;
return c ;
}
}
/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
static final class ThreadLocalHoldCounter extends ThreadLocal < HoldCounter > {
public HoldCounter initialValue () {
return new HoldCounter ();
}
}
static abstract class Sync extends AbstractQueuedSynchronizer {
//当前线程持有读锁的数目
transient ThreadLocalHoldCounter readHolds ;
/**
* 最近一个成功获取读锁的线程的计数。
* 通常情况,下一个要释放的线程是最后一个获取线程
*/
transient HoldCounter cachedHoldCounter ;
}
可以看到这里使用ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程Id,这样在释放锁的时候才能知道ReadWriteLock里面缓存的上一个读取线程(cachedHoldCounter)是否是当前线程。这样做的好处是可以减少ThreadLocal.get()的次数,因为这也是一个耗时操作。
需要说明的是这样HoldCounter绑定线程id而不绑定线程对象的原因是避免HoldCounter和ThreadLocal互相绑定而GC难以释放它们,所以其实这样做只是为了帮助GC快速回收对象而已。
7.参考
http://tutorials.jenkov.com/java-concurrency/locks.html
http://tutorials.jenkov.com/java-concurrency/starvation-and-fairness.html
http://ifeve.com/java-concurrency-thread-directory/
http://coderbee.net/index.php/concurrent/20131209/618