xiaobaoqiu Blog

Think More, Code Less

ConcurrentHashMap

ConcurrentHashMap基本策略是将hash表切分成多个段(Segments),每个Segment包含多个桶,每次加锁只在Segment级别加锁。

1.结构模型

ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment。HashEntry 用来封装映射表的键 / 值对;Segment 用来充当锁的角色,每个 Segment 对象守护整个散列映射表的若干个桶。每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。

1.1 HashEntry 类

HashEntry 用来封装散列映射表中的键值对,可以理解为一个桶。在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;

    HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
        this.key = key;
        this.hash = hash;
        this.next = next;
        this.value = value;
    }

    @SuppressWarnings("unchecked")
    static final <K,V> HashEntry<K,V>[] newArray(int i) {
        return new HashEntry[i];
    }
}

在 ConcurrentHashMap 中,在散列时如果产生“碰撞”,将采用“链地址法”来处理“碰撞”:把“碰撞”的 HashEntry 对象链接成一个链表。由于 HashEntry 的 next 域为 final 型,所以新节点只能在链表的表头处插入。下图是在一个空桶中依次插入 A,B,C 三个 HashEntry 对象后的结构图:

1.1 Segment 类

Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。每个 Segment 对象用来守护其(成员对象 table 中)包含的若干个桶。

table 是一个由 HashEntry 对象组成的数组。table 数组的每一个数组成员就是散列映射表的一个桶。 count 变量是一个计数器,它表示每个 Segment 对象管理的 table 数组(若干个 HashEntry 组成的链表)包含的 HashEntry 对象的个数。每一个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的总数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static final class Segment<K,V> extends ReentrantLock implements Serializable {

    /**
     * 管理的HashEntry对象的个数
     */
    transient volatile int count;

    /**
     * table 被更新的次数
     */
    transient int modCount;

    /**
     * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
     * threshold总是等于capacity * loadFactor
     */
    transient int threshold;

    /**
     * Segment管理的HashEntry
     */
    transient volatile HashEntry<K,V>[] table;

    final float loadFactor;

    Segment(int initialCapacity, float lf) {
        loadFactor = lf;
        setTable(HashEntry.<K,V>newArray(initialCapacity));
    }

    /**
     * 设置 table 引用到这个新生成的 HashEntry 数组
     * 只能在持有锁或构造函数中调用本方法
     */
    void setTable(HashEntry<K,V>[] newTable) {
        threshold = (int)(newTable.length * loadFactor);
        table = newTable;
    }

    /**
     * 根据 key 的散列值,找到 table 中对应的那个桶(table 数组的某个数组成员)
     */
    HashEntry<K,V> getFirst(int hash) {
        HashEntry<K,V>[] tab = table;
        return tab[hash & (tab.length - 1)];
    }
}

下图是依次插入 ABC 三个 HashEntry 节点后,Segment 的结构示意图:

2.ConcurrentHashMap源码分析

ConcurrentHashMap 在默认并发级别会创建包含 16 个 Segment 对象的数组。每个 Segment 的成员对象 table 包含若干个散列表的桶。每个桶是由 HashEntry 链接起来的一个链表。如果键能均匀散列,每个 Segment 大约守护整个散列表中桶总数的 1/16。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {

    /**
     * 默认的hash表的容量
     */
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    /**
     * 默认加载因子
     */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    /**
     * 默认的并发级别,即Segment的数目
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /**
     * 最大容量
     */
    static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * 最大segments数量
     */
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

    /**
     * 
     Number of unsynchronized retries in size and containsValue
     * methods before resorting to locking. This is used to avoid
     * unbounded retries if tables undergo continuous modification
     * which would make it impossible to obtain an accurate result.
     */
    static final int RETRIES_BEFORE_LOCK = 2;

    /**
     * segment掩码,用于找到处于哪个segments
     */
    final int segmentMask;

    /**
     * Shift value for indexing within segments.
     */
    final int segmentShift;

    /**
     * segments, 每个segment是一个hash表
     */
    final Segment<K,V>[] segments;

    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // 找到最接近concurrencyLevel的2的幂,用来做最终的concurrencyLevel,比如传入的
        //concurrencyLevel为10,则最终concurrencyLevel为16
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;
        this.segments = Segment.newArray(ssize);

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1;

        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }
}

下面是 ConcurrentHashMap 的结构示意图:

2.1 高并发的put

以 put 操作为例说明对 ConcurrentHashMap 做结构性修改的过程。

1
2
3
4
5
6
public V put(K key, V value) {
    if (value == null) throw new NullPointerException();
    int hash = hash(key.hashCode());    // 计算键对应的散列码
    // 根据散列码找到对应的 Segment
    return segmentFor(hash).put(key, hash, value, false);
}

segmentFor代码:首先左移segmentShift位(比如segment总数为16, 则segmentShift为4),高维填充0,再和segmentMask做掩码(比如segment总数为16,则掩码为216-1)

1
2
3
final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

segment内的put逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock(); //加锁
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);    //找到桶
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        //遍历桶的链表,现有hash值比较,再用key比较
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue;
        if (e != null) {    //key在链表中找到,则执行更新
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        else {  //key在链表中找不到,新插入到表头
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}

从put的代码逻辑我们可以看出来,其他写线程对另外 15 个(假设全部为16个)Segment 的加锁并不会因为当前线程对这个 Segment 的加锁而阻塞。每次put锁住的只是对应的segment而不是整个表,即同时允许多个线程操作不同的segment,对整个HashMap而言,就是同时运行多个线程写这个HashMap。同时,所有读线程几乎不会因本线程的加锁而阻塞(除非读线程刚好读到这个 Segment 中某个 HashEntry 的 value 域的值为 null,此时需要加锁后重新读取该值)。

相比较于 HashTable 和由同步包装器包装的 HashMap每次只能有一个线程执行读或写操作,ConcurrentHashMap 在并发访问性能上有了质的提高。在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作(如果并发级别设置为 16),及任意数量线程的读操作。

2.2 读操作对加锁的需求

在ConcurrentHashMap中有两个策略来降低读操作对加锁的需求:

(1).HashEntry 中的 key,hash,next 都声明为 final 型:
在代码清单HashEntry 类的定义中我们可以看到,HashEntry 中的 key,hash,next 都声明为 final 型。这意味着,不能把节点添加到链接的中间和尾部,也不能在链接的中间和尾部删除节点。这个特性可以保证:在访问某个节点时,这个节点之后的链接不会被改变。这个特性可以大大降低处理链表时的复杂性;
(2).HashEntry 类的 value 域被声明为 Volatile 型:
同时,HashEntry 类的 value 域被声明为 Volatile 型,Java 的内存模型可以保证:某个写线程对 value 域的写入马上可以被后续的某个读线程“看”到。在 ConcurrentHashMap 中,不允许用 null 作为键和值,当读线程读到某个 HashEntry 的 value 域的值为 null 时,便知道产生了冲突(发生了重排序现象),需要加锁后重新读入这个 value 值。这些特性互相配合,使得读线程即使在不加锁状态下,也能正确访问 ConcurrentHashMap;

由于对 Volatile 变量的写入操作将与随后对这个变量的读操作进行同步。当一个写线程修改了某个 HashEntry 的 value 域后,另一个读线程读这个值域,Java 内存模型能够保证读线程读取的一定是更新后的值。所以,写线程对链表的非结构性修改能够被后续不加锁的读线程看到;

2.3 结构性变更

上面的2.2中的第(2)点能保证我们写线程对value的变更,能够被读线程看到。但是,如果对 ConcurrentHashMap 做结构性修改,怎么保证读线程尽可能少被阻塞?

对 ConcurrentHashMap 做结构性修改,实质上是对某个桶指向的链表做结构性修改。如果能够确保:在读线程遍历一个链表期间,写线程对这个链表所做的结构性修改不影响读线程继续正常遍历这个链表。那么读 / 写线程之间就可以安全并发访问这个 ConcurrentHashMap。

结构性修改操作包括 put,remove,clear。下面我们分别分析这三个操作。

clear操作

clear 操作只是把 ConcurrentHashMap 中所有的桶“置空”,每个桶之前引用的链表依然存在,只是桶不再引用到这些链表(所有链表的结构并没有被修改)。正在遍历某个链表的读线程依然可以正常执行对该链表的遍历。

put操作

从上面的代码清单“在 Segment 中执行具体的 put 操作”中,我们可以看出:put 操作如果需要插入一个新节点到链表中时 , 会在链表头部插入这个新节点。此时,链表中的原有节点的链接并没有被修改。也就是说:插入新健 / 值对到链表中的操作不会影响读线程正常遍历这个链表。

remove操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public V remove(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).remove(key, hash, null);
}

V remove(Object key, int hash, Object value) {
    lock();
    try {
        int c = count - 1;
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue = null;
        if (e != null) {    //找到了节点
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                ++modCount;
                HashEntry<K,V> newFirst = e.next;   //待删除节点的next
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}

和 get 操作一样,首先根据散列码找到具体的链表;然后遍历这个链表找到要删除的节点;最后把待删除节点之后的所有节点原样保留在新链表中,把待删除节点之前的每个节点克隆到新链表中。下面通过图例来说明 remove 操作。假设写线程执行 remove 操作,要删除链表的 C 节点,另一个读线程同时正在遍历这个链表:

从上图可以看出,删除节点 C 之后的所有节点原样保留到新链表中;删除节点 C 之前的每个节点被克隆到新链表中,注意:它们在新链表中的链接顺序被反转了。

在执行 remove 操作时,原始链表并没有被修改,也就是说:读线程不会受同时执行 remove 操作的并发写线程的干扰。

综合上面的分析我们可以看出,写线程对某个链表的结构性修改不会影响其他的并发读线程对这个链表的遍历访问。

2.4 读操作

首先找到对应的segment,再在segment内找对应的数据,当读到的数据为null的时候,加锁再重新读(因为 ConcurrentHashMap不允许value为null,因此正常情况下不会出现null):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

V get(Object key, int hash) {
    if (count != 0) { // 读volatile变量值
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                //v == null,说明发生了重排序,需要加锁后重新读取
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}

//加锁读
V readValueUnderLock(HashEntry<K,V> e) {
    lock();
    try {
        return e.value;
    } finally {
        unlock();
    }
}

在 ConcurrentHashMap 中,所有执行写操作的方法(put, remove, clear),在对链表做结构性修改之后,在退出写方法前都会去写这个 count 变量。所有未加锁的读操作(get, contains, containsKey)在读方法中,都会首先去读取这个 count 变量。

根据 Java 内存模型,对 同一个 volatile 变量的写 / 读操作可以确保:写线程写入的值,能够被之后未加锁的读线程“看到”。

这个特性和前面介绍的 HashEntry 对象的不变性相结合,使得在 ConcurrentHashMap 中,读线程在读取散列表时,基本不需要加锁就能成功获得需要的值。这两个特性相配合,不仅减少了请求同一个锁的频率(读操作一般不需要加锁就能够成功获得值),也减少了持有同一个锁的时间(只有读到 value 域的值为 null 时 , 读线程才需要加锁后重读)。

3.总结

ConcurrentHashMap 是一个并发散列映射表的实现,它允许完全并发的读取,并且支持给定数量的并发更新。相比于 HashTable 和用同步包装器包装的 HashMap(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap 拥有更高的并发性。在 HashTable 和由同步包装器包装的 HashMap 中,使用一个全局的锁来同步不同线程间的并发访问。同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器。这虽然保证多线程间的安全并发访问,但同时也导致对容器的访问变成串行化的了。

ConcurrentHashMap 的高并发性主要来自于三个方面:

(1).用分离锁(Segment锁)实现多个线程间的更深层次的共享访问;
(2).用 HashEntery 对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求;
(3).通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性;

参考

http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/#icomments

ConcurrentLinkedQueue

如果我们要实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁,比如ArryBlockingQueue)或两个锁(入队和出队用不同的锁,比如LinkedBlockingDeque)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现。

1非阻塞算法

1.1 Java的多线程同步机制

在现代的多处理器系统中,提高程序的并行执行能力是有效利用 CPU 资源的关键。为了有效协调多线程间的并发访问,必须采用适当的同步机制来协调竞争。当前常用的多线程同步机制可以分为下面三种类型:

(1).volatile 变量:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性保证,不提供原子性。
(2).CAS 原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性和原子化更新保证。
(3).内部锁和显式锁:重量级多线程同步机制,可能会引起上下文切换和线程调度,它同时提供内存可见性和原子性。

从 Amdahl 定律我们可以知道,要想提高并发性,就应该尽量使串行部分达到最大程度的并行;也就是说:最小化串行代码的粒度是提高并发性能的关键。

与锁相比,非阻塞算法在更细粒度(机器级别的原子指令)的层面协调多线程间的竞争。它使得多个线程在竞争相同资源时不会发生阻塞,它的并发性与锁相比有了质的提高;同时也大大减少了线程调度的开销。同时,由于几乎所有的同步原语都只能对单个变量进行操作,这个限制导致非阻塞算法的设计和实现非常复杂。

1.2 ConcurrentLinkedQueue非阻塞算法实现

ConcurrentLinkedQueue的非阻塞算法实现可概括为下面5点:

(1).使用CAS原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础;
(2).head/tail并非总是指向队列的头/尾节点,也就是说允许队列处于不一致状态;这个特性把入队/出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队/出队时需要原子化更新值的范围到唯一变量,这是非阻塞算法得以实现的关键;
(3).由于队列有时会处于不一致状态。为此,ConcurrentLinkedQueue使用三个不变式来维护非阻塞算法的正确性;
(4).以批处理方式来更新head/tail,从整体上减少入队/出队操作的开销;
(5).为了有利于垃圾收集,队列使用特有的head更新机制;为了确保从已删除节点向后遍历,可到达所有的非删除节点,队列使用了特有的向后推进策略;

在后面的源代码分析中,我们将会看到队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式(基本不变式,head 的不变式和tail的不变式),来约束队列中方法的执行。通过这三个不变式来维护非阻塞算法的正确性:

(1).基本不变式
在执行方法之前和之后,队列必须要保持的不变式:
    当入队插入新节点之后,队列中有一个 next域为null的(最后)节点;
    从head开始遍历队列,可以访问所有item域不为null的节点;
(2).head的不变式和可变式
在执行方法之前和之后,head必须保持的不变式:
    所有“活着”的节点(指未删除节点),都能从head通过调用succ()方法遍历可达;
    head不能为null;
    head节点的next域不能引用到自身;
在执行方法之前和之后,head的可变式:
    head节点的item域可能为null,也可能不为null;
    允许tail滞后于head,也就是说从head开始遍历队列,不一定能到达tail;
(3).tail的不变式和可变式
在执行方法之前和之后,tail必须保持的不变式:
    通过tail调用succ()方法,最后节点总是可达的;
    tail不能为null;
在执行方法之前和之后,tail的可变式:
    tail节点的item域可能为null,也可能不为null;
    允许tail滞后于head,也就是说从head开始遍历队列,不一定能到达tail;
    tail节点的next域可以引用到自身;

2 ConcurrentLinkedQueue源码

ConcurrentLinkedQueue就是一个使用非阻塞算法实现的一个基于链接节点的无界线程安全队列,按照 FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素。

ConcurrentLinkedQueue的链表Node中的next的类型是volatile,而且链表数据item的类型也是volatile。关于volatile,我们知道它的语义包含:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。ConcurrentLinkedQueue就是通过volatile来实现多线程对竞争资源的互斥访问的。

ConcurrentLinkedQueue由head节点和tair节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

2.1 声明

从类名上我们就可以看得出来,Concurrent保证了并发中的线程安全,Linked提示是链表实现,Queue则说明是一个队列。声明如下:

1
2
3
4
5
6
7
8
9
10
11
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    //链表头尾指针
    private transient volatile Node<E> head = new Node<E>(null);
    private transient volatile Node<E> tail = head;
    //用于Unsafe实现对head和tail的更新
    private static final long headOffset =
        objectFieldOffset(UNSAFE, "head", ConcurrentLinkedQueue.class);
    private static final long tailOffset =
        objectFieldOffset(UNSAFE, "tail", ConcurrentLinkedQueue.class);
}

2.2 Node

其中的Node为单链表节点,但是提供了Unsafe提供的CAS操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static class Node<E> {
    private volatile E item;
    private volatile Node<E> next;

    Node(E item) {
        // Piggyback on imminent casNext()
        lazySetItem(item);
     }

    E getItem() {
        return item;
    }
    //如果当前节点item等于cmp,则将其设置为val
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void setItem(E val) {
         item = val;
    }

    void lazySetItem(E val) {
        UNSAFE.putOrderedObject(this, itemOffset, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    Node<E> getNext() {
        return next;
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
    private static final long nextOffset = objectFieldOffset(UNSAFE, "next", Node.class);   //next域的偏移量
    private static final long itemOffset = objectFieldOffset(UNSAFE, "item", Node.class);   //item域的偏移量
}

其中UNSAFE.putOrderedObject的语义是:它能够实现非堵塞的写入,这些写入不会被Java的JIT重新排序指令(instruction reordering),这种性能提升是有代价的,虽然便宜,也就是写后结果并不会被其他线程看到(即写操作不提供可见性),甚至是自己的线程,通常是几纳秒后被其他线程看到,这个时间比较短,所以代价可以忍受。这个方法在对低延迟代码是很有用的。

类似Unsafe.putOrderedObject还有unsafe.putOrderedLong等方法,unsafe.putOrderedLong比使用 volatile long要快3倍左右。

如果需要具备可见性,则需要指定字段为volatile,而我们这里item正是声明为volatile。

2.3 入队列

入队列就是将入队节点添加到队列的尾部。入队主要做两件事情:

(1)将入队节点设置成当前队列尾节点的下一个节点;
(2)更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点,理解这一点对于我们研究源码会非常有帮助;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();    //不支持元素为null
    Node<E> n = new Node<E>(e);
    retry:
    for (;;) {
        Node<E> t = tail;
        Node<E> p = t;
        for (int hops = 0; ; hops++) {
            Node<E> next = succ(p); //p的后继节点
            //next!=null,即p节点的后继不为null,指p没有指向最后一个节点
            if (next != null) {
                if (hops > HOPS && t != tail)   //tail发生了变化
                    continue retry;
                p = next;
            } else if (p.casNext(null, n)) {    //p是尾节点,则将其next设置为新结点n
                if (hops >= HOPS)
                    casTail(t, n); //更新tail,允许失败
                return true;
            } else {
                p = succ(p);
            }
        }
    }
}

succ函数表示后继节点,由于 tail 可以指向任意节点,所以从 tail 向后遍历寻找尾节点的过程中,可能会遇到哨兵节点。此时 succ() 方法会直接跳转到 head 指向的节点继续遍历。下面是 succ() 方法的源代码:

1
2
3
4
5
6
final Node<E> succ(Node<E> p) {
    Node<E> next = p.getNext();
    //如果 p 节点的 next 域链接到自身(p 节点是哨兵节点),就跳转到 head,从 
    //head开始继续遍历,否则向后推进到下一个节点
    return (p == next) ? head : next;
}

从上面的源代码我们可以看出,如果向后推进过程中遇到哨兵节点,就跳转到 head,从 head 开始继续遍历;否则,就推进到下一个节点。

上图的队列当前处于 tail 滞后于 head 状态。假设现在执行入队操作,需要从 tail 开始向后遍历找到队列的尾节点。tail 开始时指向 A 节点,执行 succ() 方法向后推进到 B 节点。在 B 节点执行 succ() 方法时,由于 B 节点链接到自身,所以跳转到 head 指向的 E 节点继续遍历。

第二步设置入队节点为尾节点。p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next节点,p如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。

队列的入队方法包含两个步骤:添加新节点和更新 tail 指向这个新节点。从代码中我们可以看到,这两个步骤都是用 CAS 原子指令来完成的。由于 ConcurrentLinkedQueue 允许队列处于不一致状态,所以这两个步骤不必一起原子的执行。添加新节点后,只有当 tail 与新添加节点之间的距离达到了 HOPS 指定的阀值,才会执行更新 tail。

2.3.1 tail 在队列中的位置分析

在执行入队操作前,tail 在队列中的位置共有三种可能:

(1).tail 指向尾节点
(2).tail 节点指向非尾节点
(3).tail 滞后于 head

tail 指向尾节点

开始时,tail 指向 D 节点,首先寻找 D 节点的后继节点。由于 D 的后继节点为 null,所以插入新节点到 D 节点的后面。如果插入成功就退出方法;如果插入失败(说明其他线程刚刚插入了一个新节点),就向后推进到新插入的节点,然后重新开始迭代。下图是插入成功后的示意图:

在上图中,由于 tail 滞后于尾节点的节点数还没有达到 HOPS 指定的阈值,所以 tail 没有被更新。

tail 节点指向非尾节点

开始时,tail 指向 C 节点。首先找到 C 的后继节点 D,然后向后推进到节点 D,后面代码执行路径与上面的“tail 指向尾节点 ”的代码执行路径相同。下图是插入成功后的结构示意图:

上图中的 tail 更新了位置。因为在添加 E 节点后,tail 滞后的节点数达到了 HOPS 指定的阈值。这触发执行更新 tail 的 CAS 操作。

tail 滞后于 head

开始时,tail 指向 A 节点。首先找到 A 的后继节点 B,然后向后推进到节点 B。由于 B 是哨兵节点,产生跳转动作,跳过 C 节点,从 head 指向的 D 节点开始继续向后遍历。后面的代码执行路径与“tail 指向非尾节点”相同。下面是成功插入一个新节点后的结构示意图:

上图的 tail 更新了位置,因为 tail 滞后的节点数达到了 HOPS 指定的阈值,这触发执行更新 tail 的 CAS 操作。

2.3.2 hops的设计意图

为了尽量减少执行 CAS 原子指令的次数,执行入队 / 出队操作时 , ConcurrentLinkedQueue 并不总是更新 head/tail。只有从 head/tail 到头 / 尾节点之间的“距离”达到变量 HOPS 指定的阀值,入队 / 出队操作才会更新它们。

// 更新 head/tail 的阀值
private static final int HOPS = 1;

不使用hops,下面的代码逻辑可能更清晰:

1
2
3
4
5
6
7
8
9
10
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> n = new Node<E>(e);
    for (;;) {
        Node<E> t = tail;
        if (t.casNext(null, n) && casTail(t, n)) {
            return true;
        }
    }
}

让tail节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑非常清楚和易懂。

但是这么做有个缺点就是每次都需要使用循环CAS更新tail节点。如果能减少CAS更新tail节点的次数,就能提高入队的效率,所以JDK实现中使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将tail节点更新成尾节点,而是当tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的CAS操作开销要远远大于读操作,所以入队效率会有所提升。因为 CAS 原子指令的执行包含了内存屏障(Memory barriers),防止乱序执行以及对各种编译器优化的抑制。

还有一点需要注意的是入队方法永远返回true,所以不要通过返回值判断入队是否成功。

2.4 出队列

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。

并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E poll() {
    Node<E> h = head;
    Node<E> p = h;
    for (int hops = 0; ; hops++) {
        E item = p.getItem();   //p执行head节点
        if (item != null && p.casItem(item, null)) {    //将head内容置为null
            if (hops >= HOPS) {
                Node<E> q = p.getNext();
                updateHead(h, (q != null) ? q : p);//将p节点下一个节点设置成head节点
            }
            return item;
        }
        //如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了,那么获取p节点的下一个节点
        Node<E> next = succ(p);
        //如果p的下一个节点也为空,说明这个队列已经空了
        if (next == null) {
            updateHead(h, p);
            break;
        }
        //如果下一个元素不为空,则将头节点的下一个节点设置成头节点
        p = next;
    }
    return null;
}

队列的出队方法包含两个步骤:删除头节点和更新 head 指向新头节点。这里对头节点的删除使用了一个小技巧:设置头节点的 item 域为 null,即删除了它(虽然这个节点还在队列中,但它以是无效节点)。在代码中我们可以看到,这两个步骤都使用 CAS 原子指令来完成。由于 ConcurrentLinkedQueue 允许队列处于不一致状态,所以这两个步骤不必一起原子的执行。在删除头节点后,只有当 head 与新头节点之间的距离达到了 HOPS 指定的阀值,才会执行更新 head。

2.4.1 head在队列中的位置分析

在执行出队操作前,head 在队列中的位置共有两种可能:

(1).head 指向有效节点。
(2).head 指向无效节点。

head 指向有效节点

出队时,首先取得 head 指向的 A 节点的 item 域的值,然后通过 CAS 设置 A 节点 item 域的值为 null。如果成功,由于此时越过的节点数为 0,所以直接返回 A 节点 item 域原有的值。如果不成功,说明其他线程已经抢先删除了该节点,此时向后推进到 B 节点。重复这个过程,直到成功删除一个节点;如果遍历完队列也没有删除成功,则返回 null。下面是成功删除后的结构示意图:

在上图中,虽然 A 节点被设置成无效节点,但 head 依然指向它,因为删除操作越过的节点数还没有达到 HOPS 指定的阀值。

head 指向无效节点

首先获得 head 指向节点的 item 域的值,由于为 null,所以向后推进到 B 节点。获得 B 节点 item 域的值后,通过 CAS 设置该值为 null。如果成功,由于已经达到 HOPS 指定的阀值,触发执行 head 更新。如果不成功(说明其他线程已经抢先删除了 B 节点),继续向后推进到 C 节点。重复这个过程,直到删除一个有效节点。如果遍历完队列也没有删除成功,则返回 null。下图是成功删除后的结构示意图:

从上图我们可以看到,在执行删除操作过程中,head 越过的节点数达到阀值,触发执行 head 的更新,使它指向 C 节点。

2.4.2 更新 head

为了有利于垃圾收集,ConcurrentLinkedQueue 在更新 head 指向新头结点后,会把旧头节点设置为哨兵节点(链接到自身的节点,同时也是以删除节点)。下面是更新 head 的源代码:

1
2
3
4
5
6
final void updateHead(Node<E> h, Node<E> p) {
    // 如果两个节点不相同,尝试用 CAS 指令原子更新 head 指向新头节点
    if (h != p && casHead(h, p))
    // 惰性设置旧头结点为哨兵节点
    h.lazySetNext (h);
}

下面通过一个示意图来理解已删除节点在队列中的状态:

在上图中,假设开始时 head 指向 A 节点,然后连续执行了 4 次出队操作,删除 A,B,C,D 4 个节点。在出队 B 节点时,head 与头结点之间的距离达到变量 HOPS 指定的阀值。这触发执行 updateHead()方法:首先设置 head 指向 C 节点,然后设置 B 节点的 next 域指向自身。同样,在出队 D 节点时,重复同样的过程。由于 B 和 D 节点断开了以删除节点与队列的链接,这将有利于虚拟机回收这些以删除节点占用的内存空间。

2.4 注意点

ConcurrentLinkedQueue的size()会遍历整个队列,因此时间复杂度为O(n).

3.参考

http://blog.csdn.net/tomato__/article/details/24179019

https://www.ibm.com/developerworks/cn/java/j-lo-concurrent/

http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf

Java并发容器之Concurrent

包括四种容器:

ConcurrentHashMap
ConcurrentLinkedQueue
ConcurrentMap
ConcurrentNavigableMap

其中ConcurrentLinkedQueue是并发环境下非阻塞算法的典型实现。

1.ConcurrentMap

ConcurrentMap是继承Map的一种interface,定义了一下几个api接口:

1
2
3
4
5
6
public interface ConcurrentMap<K, V> extends Map<K, V> {
    V putIfAbsent(K key, V value);
    boolean remove(Object key, Object value);
    boolean replace(K key, V oldValue, V newValue);
    V replace(K key, V value);
}

2.ConcurrentNavigableMap

ConcurrentNavigableMap是继承ConcurrentMap的一种interface。

我们从SortedMap接口开始锁说,因为ConcurrentNavigableMap继承自NavigableMap,而NavigableMap继承自SortedMap。

2.1 SortedMap

实现SortedMap接口的类有排序功能,SortedMap相比普通Map提供了几个特殊的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface SortedMap<K,V> extends Map<K,V> {
    /**
     * 返回key在 [fromKey, toKey)之间的子map的视图
     */
    SortedMap<K,V> subMap(K fromKey, K toKey);

    /**
     * 返回key严格小于toKey的子map视图
     */
    SortedMap<K,V> headMap(K toKey);

    /**
     * 返回key大于等于fromKey的子map视图
     */
    SortedMap<K,V> tailMap(K fromKey);

    /**
     * 返回map中第一个key
     */
    K firstKey();

    /**
     * 返回map中最后一个key
     */
    K lastKey();
}

2.2 NavigableMap

NavigableMap扩展的 SortedMap,具有了针对给定搜索目标返回最接近匹配项的导航方法。方法 lowerEntry、floorEntry、ceilingEntry 和 higherEntry 分别返回与小于、小于等于、大于等于、大于给定键的键关联的 Map.Entry 对象,如果不存在这样的键,则返回 null。类似地,方法 lowerKey、floorKey、ceilingKey 和 higherKey 只返回关联的键。

可以按照键的升序或降序访问和遍历 NavigableMap。descendingMap 方法返回映射的一个视图,该视图表示的所有关系方法和方向方法都是逆向的。升序操作和视图的性能很可能比降序操作和视图的性能要好。

此接口还定义了 firstEntry、pollFirstEntry、lastEntry 和 pollLastEntry 方法,它们返回和/或移除最小和最大的映射关系(如果存在),否则返回 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public interface NavigableMap<K,V> extends SortedMap<K,V> {
    /**
     * 返回Map中严格小于给定key的最大的Entry,不存在则返回null
     */
    Map.Entry<K,V> lowerEntry(K key);

    /**
     * 返回Map中严格小于给定key的最大的key,不存在则返回null
     */
    K lowerKey(K key);

    /**
     * 返回Map中小于等于给定key的最大的Entry,不存在则返回null
     */
    Map.Entry<K,V> floorEntry(K key);

    /**
     * 返回Map中小于等于给定key的最大key,不存在则返回null
     */
    K floorKey(K key);

    /**
     * 返回大于等于给定key的最小的Entry,不存在返回null
     */
    Map.Entry<K,V> ceilingEntry(K key);

    /**
     * 返回大于等于给定key的最小的key,不存在返回null
     */
    K ceilingKey(K key);

    /**
     * 返回严格大于给定key的最小的Entry,不存在返回null
     */
    Map.Entry<K,V> higherEntry(K key);

    /**
     * 返回严格大于给定key的最小的key,不存在返回null
     */
    K higherKey(K key);

    /**
     * 返回Map中key最小的Entry,不存在则返回null
     */
    Map.Entry<K,V> firstEntry();

    /**
     * 返回Map中key最大的Entry,不存在则返回null
     */
    Map.Entry<K,V> lastEntry();

    /**
     * 删除并返回Map中key最小的Entry
     */
    Map.Entry<K,V> pollFirstEntry();

    /**
     * 删除并返回Map中key最大的Entry
     */
    Map.Entry<K,V> pollLastEntry();

    /**
     * 返回当前Map的降序的视图
     * 注意,降序Map依靠当前Map实现,因此改变当前Map或者这个降序Map都会反映在彼此上
     */
    NavigableMap<K,V> descendingMap();

    /**
     * 返回当前Map的Key集合的试图
     * 改变当前Map或者这个Set都会影响彼此
     */
    NavigableSet<K> navigableKeySet();

    /**
     * 返回当前Map的降序顺序的Key集合的试图
     * 和navigableKeySet一样,改变当前Map或者这个降序Set都会影响彼此
     */
    NavigableSet<K> descendingKeySet();

    /**
     * 返回key在[fromKey, toKey]之间的试图,如果fromInclusive为true,结果中包含fromKey
     * 对应Entity,如果toInclusive为跳入额, 则结果中包含了toKey对应Entity
     */
    NavigableMap<K,V> subMap(K fromKey, boolean fromInclusive,
                             K toKey,   boolean toInclusive);

    /**
     * 返回key小于(如果inclusive为true,则包括等于)toKey的视图
     */
    NavigableMap<K,V> headMap(K toKey, boolean inclusive);

    /**
     * 返回key大于(如果inclusive为true,则包括等于)fromKey的视图
     */
    NavigableMap<K,V> tailMap(K fromKey, boolean inclusive);
}

同时NavigableMap也包含了三个返回SortedMap的接口

1
2
3
4
5
SortedMap<K,V> subMap(K fromKey, K toKey);

SortedMap<K,V> headMap(K toKey);

SortedMap<K,V> tailMap(K fromKey);

2.3 ConcurrentNavigableMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ConcurrentNavigableMap继承了ConcurrentMap和NavigableMap两个接口
public interface ConcurrentNavigableMap<K,V>
    extends ConcurrentMap<K,V>, NavigableMap<K,V>
{
    ConcurrentNavigableMap<K,V> subMap(K fromKey, boolean fromInclusive,
                                       K toKey,   boolean toInclusive);

    ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive);

    ConcurrentNavigableMap<K,V> tailMap(K fromKey, boolean inclusive);

    ConcurrentNavigableMap<K,V> subMap(K fromKey, K toKey);

    ConcurrentNavigableMap<K,V> headMap(K toKey);

    ConcurrentNavigableMap<K,V> tailMap(K fromKey);

    ConcurrentNavigableMap<K,V> descendingMap();

    public NavigableSet<K> navigableKeySet();

    NavigableSet<K> keySet();

    public NavigableSet<K> descendingKeySet();
}

3.ConcurrentLinkedQueue

ConcurrentLinkedQueue是非阻塞算法的典型实现,因此单独写了一篇。

http://xiaobaoqiu.github.io/blog/2014/12/24/concurrentlinkedqueue/

4.ConcurrentHashMap

ConcurrentHashMap值得研究,也单独写了一篇。

http://xiaobaoqiu.github.io/blog/2014/12/24/concurrenthashmap/

5.参考

https://www.ibm.com/developerworks/cn/java/j-lo-concurrent/

Java并发容器之CopyOnWrite

这里包括两种容器:

CopyOnWriteArrayList
CopyOnWriteArraySet

1.CopyOnWriteArrayList

CopyOnWriteArrayList类是一个线程安全的List接口的实现,在该类的内部进行元素的写操作时,底层的数组将被完整的复制,这对于读操作远远多于写操作的应用非常适合。在CopyOnWriteArrayList上进行操作时,读操作不需要加锁,而写操作类实现中对其进行了加锁。

底层也是用数组实现:

1
2
3
4
5
6
7
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    private static final long serialVersionUID = 8673264195747942595L;

    transient final ReentrantLock lock = new ReentrantLock();

    private volatile transient Object[] array;
}

读操作是不加锁的:

1
2
3
4
5
6
7
8
public E get(int index) {
    return (E)(getArray()[index]);
}

public int indexOf(E e, int index) {
    Object[] elements = getArray();
    return indexOf(e, elements, index, elements.length);
}

写操作加锁,写包括add,remove等操作。

1.1 add操作

增加元素的时候,会首先加锁,将底层数组复制一份,将待新增的元素增加到新的数组中,再将新的数组设置为新的底层数组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();    //加锁,排他锁,所有的其他读写操作都被阻塞
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);    //复制
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

另外一个指定下标的位置add操作,需要将index位置空出来,因此需要分两次arraycopy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void add(int index, E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        if (index > len || index < 0)
        throw new IndexOutOfBoundsException("Index: "+index+
                            ", Size: "+len);
        Object[] newElements;
        int numMoved = len - index; //待移动部分,即index之后的元素
        if (numMoved == 0)
            newElements = Arrays.copyOf(elements, len + 1);
        else {
            newElements = new Object[len + 1];
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index, newElements, index + 1,
                 numMoved);
        }
        newElements[index] = element;
        setArray(newElements);
    } finally {
        lock.unlock();
    }
}

CopyOnWriteArrayList还提供了两个addIfAbsent函数,即不存在这个元素才会增加,这两个函数是CopyOnWriteArraySet实现的基础:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public boolean addIfAbsent(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = new Object[len + 1];
        for (int i = 0; i < len; ++i) {
        if (eq(e, elements[i])) //存在,则不添加
            return false;
        else
            newElements[i] = elements[i];
        }
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

public int addAllAbsent(Collection<? extends E> c) {
    Object[] cs = c.toArray();
    if (cs.length == 0)
        return 0;
    Object[] uniq = new Object[cs.length];
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        int added = 0;
        for (int i = 0; i < cs.length; ++i) { // scan for duplicates
            Object e = cs[i];
            if (indexOf(e, elements, 0, len) < 0 && indexOf(e, uniq, 0, added) < 0)
                uniq[added++] = e;
        }
        if (added > 0) {
            Object[] newElements = Arrays.copyOf(elements, len + added);
            System.arraycopy(uniq, 0, newElements, len, added);
            setArray(newElements);
        }
        return added;
    } finally {
        lock.unlock();
    }
}

1.2 remove操作

删除,需要复制待删除元素前后两段数组元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// 通过下标remove
public E remove(int index) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object oldValue = elements[index];
        int numMoved = len - index - 1;
        if (numMoved == 0)
            setArray(Arrays.copyOf(elements, len - 1));
        else {
            Object[] newElements = new Object[len - 1];
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index + 1, newElements, index,
                 numMoved);
            setArray(newElements);
        }
        return (E)oldValue;
    } finally {
        lock.unlock();
    }
}

//通过值remove,先拷贝在查找并
public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        if (len != 0) {
            // Copy while searching for element to remove
            // This wins in the normal case of element being present
            int newlen = len - 1;
            Object[] newElements = new Object[newlen];

            for (int i = 0; i < newlen; ++i) {
                if (eq(o, elements[i])) {
                    //找到了,将余下的拷贝并返回
                    for (int k = i + 1; k < len; ++k)
                        newElements[k-1] = elements[k];
                    setArray(newElements);
                    return true;
                } else
                    newElements[i] = elements[i];   //逐个拷贝
            }

            //最后一个元素
            if (eq(o, elements[newlen])) {
                setArray(newElements);
                return true;
            }
        }
        return false;
    } finally {
        lock.unlock();
    }
}
//删除下标范围在[fromIndexs, toIndex)之前的元素
private void removeRange(int fromIndex, int toIndex) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;

        if (fromIndex < 0 || fromIndex >= len || toIndex > len || toIndex < fromIndex)
            throw new IndexOutOfBoundsException();
        int newlen = len - (toIndex - fromIndex);
        int numMoved = len - toIndex;
        if (numMoved == 0)
            setArray(Arrays.copyOf(elements, newlen));
        else {
            Object[] newElements = new Object[newlen];
            System.arraycopy(elements, 0, newElements, 0, fromIndex);
            System.arraycopy(elements, toIndex, newElements,
                 fromIndex, numMoved);
            setArray(newElements);
        }
    } finally {
        lock.unlock();
    }
}

public boolean removeAll(Collection<?> c) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        if (len != 0) {
            // temp array holds those elements we know we want to keep
            int newlen = 0;
            Object[] temp = new Object[len];
            for (int i = 0; i < len; ++i) {
                Object element = elements[i];
                if (!c.contains(element))
                    temp[newlen++] = element;
            }
            if (newlen != len) {
                setArray(Arrays.copyOf(temp, newlen));
                return true;
            }
        }
        return false;
    } finally {
        lock.unlock();
    }
}

//清空
public void clear() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        setArray(new Object[0]);
    } finally {
        lock.unlock();
    }
}

2.CopyOnWriteArraySet

CopyOnWriteArraySet是在CopyOnWriteArrayList的基础上实现的,只是保证元素不重复,因此会在add的时候判断,调用CopyOnWriteArrayList的addIfAbsent函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
        implements java.io.Serializable {
    private final CopyOnWriteArrayList<E> al;

    public boolean add(E e) {
        return al.addIfAbsent(e);
    }

    public boolean addAll(Collection<? extends E> c) {
        return al.addAllAbsent(c) > 0;
    }
}

Java并发容器之SkipList

这里包括两种容器

ConcurrentSkipListMap
ConcurrentSkipListSet

其中ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的,它包含一个ConcurrentNavigableMap对象m,而m对象实际上是ConcurrentNavigableMap的实现类ConcurrentSkipListMap的实例。

ConcurrentSkipListMap中的元素是key-value键值对;而ConcurrentSkipListSet是集合,它只用到了ConcurrentSkipListMap中的key,其value是一个空的Object。

1
2
3
4
5
6
7
8
9
public class ConcurrentSkipListSet<E>
    extends AbstractSet<E>
    implements NavigableSet<E>, Cloneable, java.io.Serializable {
    private final ConcurrentNavigableMap<E,Object> m;

    public ConcurrentSkipListSet() {
        m = new ConcurrentSkipListMap<E,Object>();
    }
}

因此这里只分析ConcurrentSkipListMap的源代码。

1.ConcurrentSkipListMap

ConcurrentSkipListMap和TreeMap类似,它们虽然都是有序的哈希表。但是,第一,它们的线程安全机制不同,TreeMap是非线程安全的,而ConcurrentSkipListMap是线程安全的。第二,ConcurrentSkipListMap是通过跳表实现的,而TreeMap是通过红黑树实现的。

ConcurrentSkipListMap的数据结构,如下图所示:

跳表分为许多层(level),每一层都可以看作是数据的索引,这些索引的意义就是加快跳表查找数据速度。每一层的数据都是有序的,上一层数据是下一层数据的子集,并且第一层(level 1)包含了全部的数据;层次越高,跳跃性越大,包含的数据越少。跳表包含一个表头,它查找数据时,是从上往下,从左往右进行查找。

先以数据“7,14,21,32,37,71,85”序列为例,来对跳表进行简单说明。在跳表中查找“32”节点 路径如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentNavigableMap<K,V>,
               Cloneable,
               java.io.Serializable {
    //head是跳表的表头
    private transient volatile HeadIndex<K,V> head;
}

static class Index<K,V> {
    final Node<K,V> node;       //哈希表节点node
    final Index<K,V> down;      //下索引的指针
    volatile Index<K,V> right;  //右索引的指针
}

static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;    //节点所属层次
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;
}

下面从ConcurrentSkipListMap的添加(put),删除(remove),获取(get)这3个方面对它进行分析:

1.1 添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public V put(K key, V value) {
    if (value == null) throw new NullPointerException();
    return doPut(key, value, false);
}

private V doPut(K kkey, V value, boolean onlyIfAbsent) {
    Comparable<? super K> key = comparable(kkey);
    for (;;) {
        Node<K,V> b = findPredecessor(key); //找到key的前继节点
        Node<K,V> n = b.next;   //key的前继节点的后继节点,即 插入节点 的“后继节点”
        for (;;) {
            if (n != null) {
                Node<K,V> f = n.next;
                //如果两次获得的b.next不是相同的Node,就跳转到外层循环,重新遍历。
                if (n != b.next)               // inconsistent read
                    break;;
                Object v = n.value;
                //当n的值为null(意味着其它线程删除了n);此时删除b的下一个节点,然后跳转到”外层for循环“,重新遍历。
                if (v == null) {               // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                // 如果其它线程删除了b;则跳转到”外层for循环“,重新获得b和n后再遍历。
                if (v == n || b.value == null) // b is deleted
                    break;
                int c = key.compareTo(n.key);
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) {
                    if (onlyIfAbsent || n.casValue(v, value))
                        return (V)v;
                    else
                        break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }
            // 新建节点(对应是“要插入的键值对”)
            Node<K,V> z = new Node<K,V>(kkey, value, n);
            if (!b.casNext(n, z))   // 设置“b的后继节点”为z
                break;         // restart if lost race to append to b,多线程情况下,break才可能发生(其它线程对b进行了操作)

            // 随机获取一个level,然后在“第1层”到“第level层”的链表中都插入新建节点
            int level = randomLevel();
            if (level > 0)
                insertIndex(z, level);
            return null;
        }
    }
}

doPut()的作用就是将键值对添加到“跳表”中,其主干部分(即单纯的只考虑“单线程的情况下,将key-value添加到跳表中”,即忽略“多线程相关的内容”),它的流程如下:

第1步:找到“插入位置”。找到key的前去节点(b)和key的后继节点(n);key是要插入节点的键;
第2步:新建并插入节点。新建节点z(key对应的节点),并将新节点z插入到“跳表”中(设置“b的后继节点为z”,“z的后继节点为n”);
第3步:更新跳表。即随机获取一个level,然后在“跳表”的第1层~第level层之间,每一层都插入节点z;在第level层之上就不再插入节点了。若level数值大于“跳表的层次”,则新建一层;

主干代码大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private V doPut(K kkey, V value, boolean onlyIfAbsent) {
    Comparable<? super K> key = comparable(kkey);
    for (;;) {
        // 找到key的前继节点
        Node<K,V> b = findPredecessor(key);
        // 设置n为key的后继节点
        Node<K,V> n = b.next;
        for (;;) {
            // 新建节点(对应是“要被插入的键值对”)
            Node<K,V> z = new Node<K,V>(kkey, value, n);
            // 设置“b的后继节点”为z
            b.casNext(n, z);

            // 随机获取一个level,然后在“第1层”到“第level层”的链表中都插入新建节点
            int level = randomLevel();
            if (level > 0)
                insertIndex(z, level);
            return null;
        }
    }
}

1.2 删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public V remove(Object key) {
    return doRemove(key, null);
}

final V doRemove(Object okey, Object value) {
        Comparable<? super K> key = comparable(okey);
        for (;;) {
            Node<K,V> b = findPredecessor(key); //前置节点
            Node<K,V> n = b.next;
            for (;;) {
                if (n == null)
                    return null;
                Node<K,V> f = n.next;
                if (n != b.next)                    // inconsistent read
                    break;
                Object v = n.value;
                if (v == null) {                    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (v == n || b.value == null)      // b is deleted
                    break;
                int c = key.compareTo(n.key);
                if (c < 0)
                    return null;
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (value != null && !value.equals(v))
                    return null;
                if (!n.casValue(v, null))// 设置b的后继节点为null
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // Retry via findNode
                else {
                    findPredecessor(key);           // Clean index
                    if (head.right == null)
                        tryReduceLevel();
                }
                return (V)v;
            }
        }
    }

下面是删除跳表中键值对的主干步骤:

第1步:找到被删除节点的位置;找到key的前继节点(b),key所对应的节点(n),n的后继节点f;key是要删除节点的键;
第2步:删除节点;将key所对应的节点n从跳表中移除,将b的后继节点设为f;
第3步:更新跳表;遍历跳表,删除每一层的key节点(如果存在的话)。删除导致某一层为空之后,需要删除这一层;

主干部分的doRemove()的代码大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final V doRemove(Object okey, Object value) {
    Comparable<? super K> key = comparable(okey);
    for (;;) {
        // 找到key的前继节点b
        Node<K,V> b = findPredecessor(key);
        // b-->n,即n为b的后继节点,也即n节点为key的节点
        Node<K,V> n = b.next;
        for (;;) {
            // f是n的后继节点
            Node<K,V> f = n.next;
            // 设置n节点为null
            n.casValue(v, null);
            // 设置b的后继节点为f
            b.casNext(n, f);
            // 清除跳表中每一层的key节点
            findPredecessor(key);
            // 如果“表头的右索引为空”,则将“跳表的层次”-1
            if (head.right == null)
                tryReduceLevel();
            return (V)v;
        }
    }
}

1.3 获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public V get(Object key) {
    return doGet(key);
}

private V doGet(Object okey) {
    Comparable<? super K> key = comparable(okey);
    Node<K,V> bound = null;
    Index<K,V> q = head;    //待比较节点
    Index<K,V> r = q.right; //待比较的右边节点
    Node<K,V> n;
    K k;
    int c;
    for (;;) {
        Index<K,V> d;
        // 往右遍历
        if (r != null && (n = r.node) != bound && (k = n.key) != null) {
            if ((c = key.compareTo(k)) > 0) {
                q = r;
                r = r.right;
                continue;
            } else if (c == 0) {
                Object v = n.value;
                return (v != null)? (V)v : getUsingFindNode(key);
            } else
                bound = n;
        }

        // 往下遍历
        if ((d = q.down) != null) {
            q = d;
            r = d.right;
        } else
            break;
    }

    // 同一层逐个往后便利
    for (n = q.node.next;  n != null; n = n.next) {
        if ((k = n.key) != null) {
            if ((c = key.compareTo(k)) == 0) {
                Object v = n.value;
                return (v != null)? (V)v : getUsingFindNode(key);
            } else if (c < 0)
                break;
        }
    }
    return null;
}

2.ShipList的简单实现

很早之前写过一个简单的SkipList,贴在百度空间: http://hi.baidu.com/nicker2010/item/be641424bb0963c8ef10f1dc