这篇文章主要分析Concurrent包中的BlockingQueue系列的并发容器实现原理和源代码.
1.实现原理
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列.
下图展示了如何通过阻塞队列来合作:
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素.
1.1 阻塞队列简单实现
下面是阻塞队列的一个简单实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class BlockingQueue {
private List queue = new LinkedList ();
private int limit = 10 ;
public BlockingQueue ( int limit ){
this . limit = limit ;
}
//入队列
public synchronized void enqueue ( Object item ) throws InterruptedException {
while ( this . queue . size () == this . limit ) {
wait ();
}
if ( this . queue . size () == 0 ) {
notifyAll ();
}
this . queue . add ( item );
}
//出队列
public synchronized Object dequeue () throws InterruptedException {
while ( this . queue . size () == 0 ){
wait ();
}
if ( this . queue . size () == this . limit ){
notifyAll ();
}
return this . queue . remove ( 0 );
}
}
必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。
2.Java BlockingQueue整体介绍
BlockingQueue系列并发容器包括一下几个:
BlockingQueue
BlockingDeque
ArrayBlockingQueue
LinkedBlockingDeque
LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue
2.1 其继承关系如下图:
2.2 主要的方法如下:
放入数据
(1).offer(anObject)
表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);
(2).offer(E o, long timeout, TimeUnit unit)可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败;
(3).put(anObject)
把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
获取数据
(1).poll(time)
取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
(2).poll(long timeout, TimeUnit unit)
从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据,否则知道时间超时还没有数据可取,返回失败;
(3).take()
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
(4).drainTo()
一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
3. ArrayBlockingQueue
数组实现的BlockingQueue,实现上用一个数组保存数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ArrayBlockingQueue < E > extends AbstractQueue < E >
implements BlockingQueue < E >, java . io . Serializable {
private static final long serialVersionUID = - 817911632652898426L ;
/** 队列数据 */
private final E [] items ;
/** 下一个take, poll 或者 remove的下标 */
private int takeIndex ;
/** 下一个 put, offer 或者 add的下标 */
private int putIndex ;
/** 队列内元素的数目 */
private int count ;
/** 锁 */
private final ReentrantLock lock ;
/** 等待take的Condition */
private final Condition notEmpty ;
/** 等待puts的Condition */
private final Condition notFull ;
}
三个构造函数,可以输入容量,锁的公平性等,默认是锁非公平的,注意这里的锁是独占的ReentrantLock,因为这里从队列(数组)中读取数据和旺队列中写数据都应该和别的线程是互斥的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//默认锁是不公平的
public ArrayBlockingQueue ( int capacity ) {
this ( capacity , false );
}
public ArrayBlockingQueue ( int capacity , boolean fair ) {
if ( capacity <= 0 ) throw new IllegalArgumentException ();
this . items = ( E []) new Object [ capacity ];
lock = new ReentrantLock ( fair );
notEmpty = lock . newCondition ();
notFull = lock . newCondition ();
}
//c为初始数据
public ArrayBlockingQueue ( int capacity , boolean fair ,
Collection <? extends E > c ) {
this ( capacity , fair );
if ( capacity < c . size ()) throw new IllegalArgumentException ();
for ( Iterator <? extends E > it = c . iterator (); it . hasNext ();)
add ( it . next ());
}
下面分析offer,put,poll,take这几个主要的函数实现:
3.1 offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer ( E e ) {
if ( e == null ) throw new NullPointerException (); //不支持null元素
final ReentrantLock lock = this . lock ;
lock . lock ();
try {
if ( count == items . length ) //队列满,直接返回false
return false ;
else {
insert ( e );
return true ;
}
} finally {
lock . unlock ();
}
}
insert的实现:
1
2
3
4
5
6
private void insert ( E x ) {
items [ putIndex ] = x ;
putIndex = inc ( putIndex );
++ count ;
notEmpty . signal ();
}
即offer不能插入立即返回,插入成功会触发notEmpty这个Condition,唤醒那些等待读取数据的被阻塞的线程
3.2 put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void put ( E e ) throws InterruptedException {
if ( e == null ) throw new NullPointerException ();
final E [] items = this . items ;
final ReentrantLock lock = this . lock ;
lock . lockInterruptibly (); //可被interrupt的lock
try {
try {
while ( count == items . length )
notFull . await ();
} catch ( InterruptedException ie ) {
notFull . signal (); // propagate to non-interrupted thread
throw ie ;
}
insert ( e );
} finally {
lock . unlock ();
}
}
put不能插入时候,会一直被阻塞,但是这个阻塞的线程是可以被interrupt的.
3.3 poll
1
2
3
4
5
6
7
8
9
10
11
12
public E poll () {
final ReentrantLock lock = this . lock ;
lock . lock ();
try {
if ( count == 0 ) //没有数据可取,直接返回
return null ;
E x = extract ();
return x ;
} finally {
lock . unlock ();
}
}
其中extract()的实现如下:
1
2
3
4
5
6
7
8
9
private E extract () {
final E [] items = this . items ;
E x = items [ takeIndex ];
items [ takeIndex ] = null ;
takeIndex = inc ( takeIndex );
-- count ;
notFull . signal ();
return x ;
}
poll成功会触发notFull这个Condition,即唤醒所有等待插入数据的线程.
3.4 take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public E take () throws InterruptedException {
final ReentrantLock lock = this . lock ;
lock . lockInterruptibly ();
try {
try {
while ( count == 0 )
notEmpty . await ();
} catch ( InterruptedException ie ) {
notEmpty . signal (); // propagate to non-interrupted thread
throw ie ;
}
E x = extract ();
return x ;
} finally {
lock . unlock ();
}
}
take当无数据可取的时候,线程会一致阻塞,但是线程的阻塞状态是可以被interrupt的.
4. LinkedBlockingQueue
LinkedBlockingQueue即一个单链表实现的阻塞队列,单链表节点定义如下:
1
2
3
4
5
6
static class Node < E > {
E item ;
Node < E > next ;
Node ( E x ) { item = x ; }
}
内部实现,链表实现,注意有两个锁,即一个读取的锁(takeLock)和一个写入的锁(putLock).原因是put和take是不冲突的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class LinkedBlockingQueue < E > extends AbstractQueue < E >
implements BlockingQueue < E >, java . io . Serializable {
/** 容量 */
private final int capacity ;
/** 当前元素个数 */
private final AtomicInteger count = new AtomicInteger ( 0 );
/** 链表首节点 */
private transient Node < E > head ;
/** 链表尾节点 */
private transient Node < E > last ;
/** take, poll等操作的锁 */
private final ReentrantLock takeLock = new ReentrantLock ();
/** takes操作的等待Condition */
private final Condition notEmpty = takeLock . newCondition ();
/** put, offer等的锁 */
private final ReentrantLock putLock = new ReentrantLock ();
/** puts操作的等待Condition */
private final Condition notFull = putLock . newCondition ();
}
默认构造指定最大容量为Integer.MAX_VALUE,通过构造函数可以指定容量.
4. PriorityBlockingQueue
PriorityBlockingQueue是基于PriorityQueue实现的一个优先级阻塞队列。而PriorityQueue是基于二叉堆实现的优先级队列,它每次从队列中取出的是具有最高优先权的元素。
1
2
3
4
5
public class PriorityBlockingQueue < E > extends AbstractQueue < E >
implements BlockingQueue < E >, java . io . Serializable {
private final PriorityQueue < E > q ;
private final ReentrantLock lock = new ReentrantLock ( true );
private final Condition notEmpty = lock . newCondition ();
构造函数可以指定比较器(Comparator),如果不指定就按照对象的自然顺序(对象必须是Comparable的)。
5.DelayQueue
DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。
Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。
DelayQueue是基于PriorityQueue实现。简单的说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。
1
2
3
4
5
6
7
public class DelayQueue < E extends Delayed > extends AbstractQueue < E >
implements BlockingQueue < E > {
private transient final ReentrantLock lock = new ReentrantLock ();
private transient final Condition available = lock . newCondition ();
private final PriorityQueue < E > q = new PriorityQueue < E >();
}
看起take函数的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E take () throws InterruptedException {
final ReentrantLock lock = this . lock ;
lock . lockInterruptibly ();
try {
for (;;) {
E first = q . peek (); //第一个元素
if ( first == null ) {
available . await ();
} else {
long delay = first . getDelay ( TimeUnit . NANOSECONDS );
if ( delay > 0 ) { //没有过期,继续等待
long tl = available . awaitNanos ( delay );
} else { //过期
E x = q . poll ();
assert x != null ;
if ( q . size () != 0 )
available . signalAll (); //唤醒其他等待take的线程
return x ;
}
}
}
} finally {
lock . unlock ();
}
}