xiaobaoqiu Blog

Think More, Code Less

ConcurrentLinkedQueue

如果我们要实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁,比如ArryBlockingQueue)或两个锁(入队和出队用不同的锁,比如LinkedBlockingDeque)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现。

1非阻塞算法

1.1 Java的多线程同步机制

在现代的多处理器系统中,提高程序的并行执行能力是有效利用 CPU 资源的关键。为了有效协调多线程间的并发访问,必须采用适当的同步机制来协调竞争。当前常用的多线程同步机制可以分为下面三种类型:

(1).volatile 变量:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性保证,不提供原子性。
(2).CAS 原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性和原子化更新保证。
(3).内部锁和显式锁:重量级多线程同步机制,可能会引起上下文切换和线程调度,它同时提供内存可见性和原子性。

从 Amdahl 定律我们可以知道,要想提高并发性,就应该尽量使串行部分达到最大程度的并行;也就是说:最小化串行代码的粒度是提高并发性能的关键。

与锁相比,非阻塞算法在更细粒度(机器级别的原子指令)的层面协调多线程间的竞争。它使得多个线程在竞争相同资源时不会发生阻塞,它的并发性与锁相比有了质的提高;同时也大大减少了线程调度的开销。同时,由于几乎所有的同步原语都只能对单个变量进行操作,这个限制导致非阻塞算法的设计和实现非常复杂。

1.2 ConcurrentLinkedQueue非阻塞算法实现

ConcurrentLinkedQueue的非阻塞算法实现可概括为下面5点:

(1).使用CAS原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础;
(2).head/tail并非总是指向队列的头/尾节点,也就是说允许队列处于不一致状态;这个特性把入队/出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队/出队时需要原子化更新值的范围到唯一变量,这是非阻塞算法得以实现的关键;
(3).由于队列有时会处于不一致状态。为此,ConcurrentLinkedQueue使用三个不变式来维护非阻塞算法的正确性;
(4).以批处理方式来更新head/tail,从整体上减少入队/出队操作的开销;
(5).为了有利于垃圾收集,队列使用特有的head更新机制;为了确保从已删除节点向后遍历,可到达所有的非删除节点,队列使用了特有的向后推进策略;

在后面的源代码分析中,我们将会看到队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式(基本不变式,head 的不变式和tail的不变式),来约束队列中方法的执行。通过这三个不变式来维护非阻塞算法的正确性:

(1).基本不变式
在执行方法之前和之后,队列必须要保持的不变式:
    当入队插入新节点之后,队列中有一个 next域为null的(最后)节点;
    从head开始遍历队列,可以访问所有item域不为null的节点;
(2).head的不变式和可变式
在执行方法之前和之后,head必须保持的不变式:
    所有“活着”的节点(指未删除节点),都能从head通过调用succ()方法遍历可达;
    head不能为null;
    head节点的next域不能引用到自身;
在执行方法之前和之后,head的可变式:
    head节点的item域可能为null,也可能不为null;
    允许tail滞后于head,也就是说从head开始遍历队列,不一定能到达tail;
(3).tail的不变式和可变式
在执行方法之前和之后,tail必须保持的不变式:
    通过tail调用succ()方法,最后节点总是可达的;
    tail不能为null;
在执行方法之前和之后,tail的可变式:
    tail节点的item域可能为null,也可能不为null;
    允许tail滞后于head,也就是说从head开始遍历队列,不一定能到达tail;
    tail节点的next域可以引用到自身;

2 ConcurrentLinkedQueue源码

ConcurrentLinkedQueue就是一个使用非阻塞算法实现的一个基于链接节点的无界线程安全队列,按照 FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素。

ConcurrentLinkedQueue的链表Node中的next的类型是volatile,而且链表数据item的类型也是volatile。关于volatile,我们知道它的语义包含:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。ConcurrentLinkedQueue就是通过volatile来实现多线程对竞争资源的互斥访问的。

ConcurrentLinkedQueue由head节点和tair节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

2.1 声明

从类名上我们就可以看得出来,Concurrent保证了并发中的线程安全,Linked提示是链表实现,Queue则说明是一个队列。声明如下:

1
2
3
4
5
6
7
8
9
10
11
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    //链表头尾指针
    private transient volatile Node<E> head = new Node<E>(null);
    private transient volatile Node<E> tail = head;
    //用于Unsafe实现对head和tail的更新
    private static final long headOffset =
        objectFieldOffset(UNSAFE, "head", ConcurrentLinkedQueue.class);
    private static final long tailOffset =
        objectFieldOffset(UNSAFE, "tail", ConcurrentLinkedQueue.class);
}

2.2 Node

其中的Node为单链表节点,但是提供了Unsafe提供的CAS操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static class Node<E> {
    private volatile E item;
    private volatile Node<E> next;

    Node(E item) {
        // Piggyback on imminent casNext()
        lazySetItem(item);
     }

    E getItem() {
        return item;
    }
    //如果当前节点item等于cmp,则将其设置为val
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void setItem(E val) {
         item = val;
    }

    void lazySetItem(E val) {
        UNSAFE.putOrderedObject(this, itemOffset, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    Node<E> getNext() {
        return next;
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    private static final long nextOffset = objectFieldOffset(UNSAFE, "next", Node.class);   //next域的偏移量
    private static final long itemOffset = objectFieldOffset(UNSAFE, "item", Node.class);   //item域的偏移量
}

其中UNSAFE.putOrderedObject的语义是:它能够实现非堵塞的写入,这些写入不会被Java的JIT重新排序指令(instruction reordering),这种性能提升是有代价的,虽然便宜,也就是写后结果并不会被其他线程看到(即写操作不提供可见性),甚至是自己的线程,通常是几纳秒后被其他线程看到,这个时间比较短,所以代价可以忍受。这个方法在对低延迟代码是很有用的。

类似Unsafe.putOrderedObject还有unsafe.putOrderedLong等方法,unsafe.putOrderedLong比使用 volatile long要快3倍左右。

如果需要具备可见性,则需要指定字段为volatile,而我们这里item正是声明为volatile。

2.3 入队列

入队列就是将入队节点添加到队列的尾部。入队主要做两件事情:

(1)将入队节点设置成当前队列尾节点的下一个节点;
(2)更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();    //不支持元素为null
    Node<E> n = new Node<E>(e);
    retry:
    for (;;) {
        Node<E> t = tail;
        Node<E> p = t;
        for (int hops = 0; ; hops++) {
            Node<E> next = succ(p); //p的后继节点
            //next!=null,即p节点的后继不为null,指p没有指向最后一个节点
            if (next != null) {
                if (hops > HOPS && t != tail)   //tail发生了变化
                    continue retry;
                p = next;
            } else if (p.casNext(null, n)) {    //p是尾节点,则将其next设置为新结点n
                if (hops >= HOPS)
                    casTail(t, n); //更新tail,允许失败
                return true;
            } else {
                p = succ(p);
            }
        }
    }
}

succ函数表示后继节点,由于 tail 可以指向任意节点,所以从 tail 向后遍历寻找尾节点的过程中,可能会遇到哨兵节点。此时 succ() 方法会直接跳转到 head 指向的节点继续遍历。下面是 succ() 方法的源代码:

1
2
3
4
5
6
final Node<E> succ(Node<E> p) {
    Node<E> next = p.getNext();
    //如果 p 节点的 next 域链接到自身(p 节点是哨兵节点),就跳转到 head,从 
    //head开始继续遍历,否则向后推进到下一个节点
    return (p == next) ? head : next;
}

从上面的源代码我们可以看出,如果向后推进过程中遇到哨兵节点,就跳转到 head,从 head 开始继续遍历;否则,就推进到下一个节点。

上图的队列当前处于 tail 滞后于 head 状态。假设现在执行入队操作,需要从 tail 开始向后遍历找到队列的尾节点。tail 开始时指向 A 节点,执行 succ() 方法向后推进到 B 节点。在 B 节点执行 succ() 方法时,由于 B 节点链接到自身,所以跳转到 head 指向的 E 节点继续遍历。

第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

队列的入队方法包含两个步骤:添加新节点和更新 tail 指向这个新节点。从代码中我们可以看到,这两个步骤都是用 CAS 原子指令来完成的。由于 ConcurrentLinkedQueue 允许队列处于不一致状态,所以这两个步骤不必一起原子的执行。添加新节点后,只有当 tail 与新添加节点之间的距离达到了 HOPS 指定的阀值,才会执行更新 tail。

2.3.1 tail 在队列中的位置分析

在执行入队操作前,tail 在队列中的位置共有三种可能:

(1).tail 指向尾节点
(2).tail 节点指向非尾节点
(3).tail 滞后于 head

tail 指向尾节点

开始时,tail 指向 D 节点,首先寻找 D 节点的后继节点。由于 D 的后继节点为 null,所以插入新节点到 D 节点的后面。如果插入成功就退出方法;如果插入失败(说明其他线程刚刚插入了一个新节点),就向后推进到新插入的节点,然后重新开始迭代。下图是插入成功后的示意图:

在上图中,由于 tail 滞后于尾节点的节点数还没有达到 HOPS 指定的阈值,所以 tail 没有被更新。

tail 节点指向非尾节点

开始时,tail 指向 C 节点。首先找到 C 的后继节点 D,然后向后推进到节点 D,后面代码执行路径与上面的“tail 指向尾节点 ”的代码执行路径相同。下图是插入成功后的结构示意图:

上图中的 tail 更新了位置。因为在添加 E 节点后,tail 滞后的节点数达到了 HOPS 指定的阈值。这触发执行更新 tail 的 CAS 操作。

tail 滞后于 head

开始时,tail 指向 A 节点。首先找到 A 的后继节点 B,然后向后推进到节点 B。由于 B 是哨兵节点,产生跳转动作,跳过 C 节点,从 head 指向的 D 节点开始继续向后遍历。后面的代码执行路径与“tail 指向非尾节点”相同。下面是成功插入一个新节点后的结构示意图:

上图的 tail 更新了位置,因为 tail 滞后的节点数达到了 HOPS 指定的阈值,这触发执行更新 tail 的 CAS 操作。

2.3.2 hops的设计意图

为了尽量减少执行 CAS 原子指令的次数,执行入队 / 出队操作时 , ConcurrentLinkedQueue 并不总是更新 head/tail。只有从 head/tail 到头 / 尾节点之间的“距离”达到变量 HOPS 指定的阀值,入队 / 出队操作才会更新它们。

// 更新 head/tail 的阀值
private static final int HOPS = 1;

不使用hops,下面的代码逻辑可能更清晰:

1
2
3
4
5
6
7
8
9
10
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> n = new Node<E>(e);
    for (;;) {
        Node<E> t = tail;
        if (t.casNext(null, n) && casTail(t, n)) {
            return true;
        }
    }
}

让tail节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑非常清楚和易懂。

但是这么做有个缺点就是每次都需要使用循环CAS更新tail节点。如果能减少CAS更新tail节点的次数,就能提高入队的效率,所以JDK实现中使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将tail节点更新成尾节点,而是当tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的CAS操作开销要远远大于读操作,所以入队效率会有所提升。因为 CAS 原子指令的执行包含了内存屏障(Memory barriers),防止乱序执行以及对各种编译器优化的抑制。

还有一点需要注意的是入队方法永远返回true,所以不要通过返回值判断入队是否成功。

2.4 出队列

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。

并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E poll() {
    Node<E> h = head;
    Node<E> p = h;
    for (int hops = 0; ; hops++) {
        E item = p.getItem();   //p执行head节点
        if (item != null && p.casItem(item, null)) {    //将head内容置为null
            if (hops >= HOPS) {
                Node<E> q = p.getNext();
                updateHead(h, (q != null) ? q : p);//将p节点下一个节点设置成head节点
            }
            return item;
        }
        //如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了,那么获取p节点的下一个节点
        Node<E> next = succ(p);
        //如果p的下一个节点也为空,说明这个队列已经空了
        if (next == null) {
            updateHead(h, p);
            break;
        }
        //如果下一个元素不为空,则将头节点的下一个节点设置成头节点
        p = next;
    }
    return null;
}

队列的出队方法包含两个步骤:删除头节点和更新 head 指向新头节点。这里对头节点的删除使用了一个小技巧:设置头节点的 item 域为 null,即删除了它(虽然这个节点还在队列中,但它以是无效节点)。在代码中我们可以看到,这两个步骤都使用 CAS 原子指令来完成。由于 ConcurrentLinkedQueue 允许队列处于不一致状态,所以这两个步骤不必一起原子的执行。在删除头节点后,只有当 head 与新头节点之间的距离达到了 HOPS 指定的阀值,才会执行更新 head。

2.4.1 head在队列中的位置分析

在执行出队操作前,head 在队列中的位置共有两种可能:

(1).head 指向有效节点。
(2).head 指向无效节点。

head 指向有效节点

出队时,首先取得 head 指向的 A 节点的 item 域的值,然后通过 CAS 设置 A 节点 item 域的值为 null。如果成功,由于此时越过的节点数为 0,所以直接返回 A 节点 item 域原有的值。如果不成功,说明其他线程已经抢先删除了该节点,此时向后推进到 B 节点。重复这个过程,直到成功删除一个节点;如果遍历完队列也没有删除成功,则返回 null。下面是成功删除后的结构示意图:

在上图中,虽然 A 节点被设置成无效节点,但 head 依然指向它,因为删除操作越过的节点数还没有达到 HOPS 指定的阀值。

head 指向无效节点

首先获得 head 指向节点的 item 域的值,由于为 null,所以向后推进到 B 节点。获得 B 节点 item 域的值后,通过 CAS 设置该值为 null。如果成功,由于已经达到 HOPS 指定的阀值,触发执行 head 更新。如果不成功(说明其他线程已经抢先删除了 B 节点),继续向后推进到 C 节点。重复这个过程,直到删除一个有效节点。如果遍历完队列也没有删除成功,则返回 null。下图是成功删除后的结构示意图:

从上图我们可以看到,在执行删除操作过程中,head 越过的节点数达到阀值,触发执行 head 的更新,使它指向 C 节点。

2.4.2 更新 head

为了有利于垃圾收集,ConcurrentLinkedQueue 在更新 head 指向新头结点后,会把旧头节点设置为哨兵节点(链接到自身的节点,同时也是以删除节点)。下面是更新 head 的源代码:

1
2
3
4
5
6
final void updateHead(Node<E> h, Node<E> p) {
    // 如果两个节点不相同,尝试用 CAS 指令原子更新 head 指向新头节点
    if (h != p && casHead(h, p))
    // 惰性设置旧头结点为哨兵节点
    h.lazySetNext (h);
}

下面通过一个示意图来理解已删除节点在队列中的状态:

在上图中,假设开始时 head 指向 A 节点,然后连续执行了 4 次出队操作,删除 A,B,C,D 4 个节点。在出队 B 节点时,head 与头结点之间的距离达到变量 HOPS 指定的阀值。这触发执行 updateHead()方法:首先设置 head 指向 C 节点,然后设置 B 节点的 next 域指向自身。同样,在出队 D 节点时,重复同样的过程。由于 B 和 D 节点断开了以删除节点与队列的链接,这将有利于虚拟机回收这些以删除节点占用的内存空间。

2.4 注意点

ConcurrentLinkedQueue的size()会遍历整个队列,因此时间复杂度为O(n).

3.参考

http://blog.csdn.net/tomato__/article/details/24179019

https://www.ibm.com/developerworks/cn/java/j-lo-concurrent/

http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf