xiaobaoqiu Blog

Think More, Code Less

Java并发容器之BlockingQueue

这篇文章主要分析Concurrent包中的BlockingQueue系列的并发容器实现原理和源代码.

1.实现原理

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列.

下图展示了如何通过阻塞队列来合作:

线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素.

1.1 阻塞队列简单实现

下面是阻塞队列的一个简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class BlockingQueue {

    private List queue = new LinkedList();
    private int  limit = 10;

    public BlockingQueue(int limit){
        this.limit = limit;
    }

    //入队列
    public synchronized void enqueue(Object item) throws InterruptedException {

        while(this.queue.size() == this.limit) {
            wait();
        }

        if(this.queue.size() == 0) {
            notifyAll();
        }

        this.queue.add(item);
    }

    //出队列
    public synchronized Object dequeue() throws InterruptedException{

        while(this.queue.size() == 0){
            wait();
        }

        if(this.queue.size() == this.limit){
            notifyAll();
        }

        return this.queue.remove(0);
    }
}

必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。

2.Java BlockingQueue整体介绍

BlockingQueue系列并发容器包括一下几个:

BlockingQueue
BlockingDeque
ArrayBlockingQueue
LinkedBlockingDeque
LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue

2.1 其继承关系如下图:

2.2 主要的方法如下:

放入数据

(1).offer(anObject)
表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);
(2).offer(E o, long timeout, TimeUnit unit)可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败;
(3).put(anObject)
把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

获取数据

(1).poll(time)
取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
(2).poll(long timeout, TimeUnit unit)
从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据,否则知道时间超时还没有数据可取,返回失败;
(3).take()
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 
(4).drainTo()
一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

3. ArrayBlockingQueue

数组实现的BlockingQueue,实现上用一个数组保存数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private static final long serialVersionUID = -817911632652898426L;

    /** 队列数据  */
    private final E[] items;
    /** 下一个take, poll 或者 remove的下标 */
    private int takeIndex;
    /** 下一个 put, offer 或者 add的下标 */
    private int putIndex;
    /** 队列内元素的数目 */
    private int count;

    /** 锁 */
    private final ReentrantLock lock;
    /** 等待take的Condition */
    private final Condition notEmpty;
    /** 等待puts的Condition */
    private final Condition notFull;
}

三个构造函数,可以输入容量,锁的公平性等,默认是锁非公平的,注意这里的锁是独占的ReentrantLock,因为这里从队列(数组)中读取数据和旺队列中写数据都应该和别的线程是互斥的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//默认锁是不公平的
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.items = (E[]) new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
//c为初始数据
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);
    if (capacity < c.size()) throw new IllegalArgumentException();

    for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
        add(it.next());
}

下面分析offer,put,poll,take这几个主要的函数实现:

3.1 offer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();    //不支持null元素
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)  //队列满,直接返回false
            return false;
        else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

insert的实现:

1
2
3
4
5
6
private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}

即offer不能插入立即返回,插入成功会触发notEmpty这个Condition,唤醒那些等待读取数据的被阻塞的线程

3.2 put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final E[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();   //可被interrupt的lock
    try {
        try {
            while (count == items.length)
                notFull.await();
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        insert(e);
    } finally {
        lock.unlock();
    }
}

put不能插入时候,会一直被阻塞,但是这个阻塞的线程是可以被interrupt的.

3.3 poll

1
2
3
4
5
6
7
8
9
10
11
12
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == 0) //没有数据可取,直接返回
            return null;
        E x = extract();
        return x;
    } finally {
        lock.unlock();
    }
}

其中extract()的实现如下:

1
2
3
4
5
6
7
8
9
private E extract() {
    final E[] items = this.items;
    E x = items[takeIndex];
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
}

poll成功会触发notFull这个Condition,即唤醒所有等待插入数据的线程.

3.4 take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (count == 0)
                notEmpty.await();
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        E x = extract();
        return x;
    } finally {
        lock.unlock();
    }
}

take当无数据可取的时候,线程会一致阻塞,但是线程的阻塞状态是可以被interrupt的.

4. LinkedBlockingQueue

LinkedBlockingQueue即一个单链表实现的阻塞队列,单链表节点定义如下:

1
2
3
4
5
6
static class Node<E> {
    E item;

    Node<E> next;
    Node(E x) { item = x; }
}

内部实现,链表实现,注意有两个锁,即一个读取的锁(takeLock)和一个写入的锁(putLock).原因是put和take是不冲突的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** 容量 */
    private final int capacity;

    /** 当前元素个数 */
    private final AtomicInteger count = new AtomicInteger(0);

    /** 链表首节点 */
    private transient Node<E> head;

    /** 链表尾节点 */
    private transient Node<E> last;

    /** take, poll等操作的锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** takes操作的等待Condition */
    private final Condition notEmpty = takeLock.newCondition();

    /** put, offer等的锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** puts操作的等待Condition */
    private final Condition notFull = putLock.newCondition();
}

默认构造指定最大容量为Integer.MAX_VALUE,通过构造函数可以指定容量.

4. PriorityBlockingQueue

PriorityBlockingQueue是基于PriorityQueue实现的一个优先级阻塞队列。而PriorityQueue是基于二叉堆实现的优先级队列,它每次从队列中取出的是具有最高优先权的元素。

1
2
3
4
5
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private final PriorityQueue<E> q;
    private final ReentrantLock lock = new ReentrantLock(true);
    private final Condition notEmpty = lock.newCondition();

构造函数可以指定比较器(Comparator),如果不指定就按照对象的自然顺序(对象必须是Comparable的)。

5.DelayQueue

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。

Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。

DelayQueue是基于PriorityQueue实现。简单的说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

1
2
3
4
5
6
7
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private transient final ReentrantLock lock = new ReentrantLock();
    private transient final Condition available = lock.newCondition();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
}

看起take函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek(); //第一个元素
            if (first == null) {
                available.await();
            } else {
                long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                if (delay > 0) {    //没有过期,继续等待
                    long tl = available.awaitNanos(delay);
                } else {    //过期
                    E x = q.poll();
                    assert x != null;
                    if (q.size() != 0)
                        available.signalAll(); //唤醒其他等待take的线程
                    return x;
                }
            }
        }
    } finally {
        lock.unlock();
    }
}