xiaobaoqiu Blog

Think More, Code Less

ConcurrentHashMap

ConcurrentHashMap基本策略是将hash表切分成多个段(Segments),每个Segment包含多个桶,每次加锁只在Segment级别加锁。

1.结构模型

ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment。HashEntry 用来封装映射表的键 / 值对;Segment 用来充当锁的角色,每个 Segment 对象守护整个散列映射表的若干个桶。每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。

1.1 HashEntry 类

HashEntry 用来封装散列映射表中的键值对,可以理解为一个桶。在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;

    HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
        this.key = key;
        this.hash = hash;
        this.next = next;
        this.value = value;
    }

    @SuppressWarnings("unchecked")
    static final <K,V> HashEntry<K,V>[] newArray(int i) {
        return new HashEntry[i];
    }
}

在 ConcurrentHashMap 中,在散列时如果产生“碰撞”,将采用“链地址法”来处理“碰撞”:把“碰撞”的 HashEntry 对象链接成一个链表。由于 HashEntry 的 next 域为 final 型,所以新节点只能在链表的表头处插入。下图是在一个空桶中依次插入 A,B,C 三个 HashEntry 对象后的结构图:

1.1 Segment 类

Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。每个 Segment 对象用来守护其(成员对象 table 中)包含的若干个桶。

table 是一个由 HashEntry 对象组成的数组。table 数组的每一个数组成员就是散列映射表的一个桶。 count 变量是一个计数器,它表示每个 Segment 对象管理的 table 数组(若干个 HashEntry 组成的链表)包含的 HashEntry 对象的个数。每一个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的总数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static final class Segment<K,V> extends ReentrantLock implements Serializable {

    /**
     * 管理的HashEntry对象的个数
     */
    transient volatile int count;

    /**
     * table 被更新的次数
     */
    transient int modCount;

    /**
     * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列
     * threshold总是等于capacity * loadFactor
     */
    transient int threshold;

    /**
     * Segment管理的HashEntry
     */
    transient volatile HashEntry<K,V>[] table;

    final float loadFactor;

    Segment(int initialCapacity, float lf) {
        loadFactor = lf;
        setTable(HashEntry.<K,V>newArray(initialCapacity));
    }

    /**
     * 设置 table 引用到这个新生成的 HashEntry 数组
     * 只能在持有锁或构造函数中调用本方法
     */
    void setTable(HashEntry<K,V>[] newTable) {
        threshold = (int)(newTable.length * loadFactor);
        table = newTable;
    }

    /**
     * 根据 key 的散列值,找到 table 中对应的那个桶(table 数组的某个数组成员)
     */
    HashEntry<K,V> getFirst(int hash) {
        HashEntry<K,V>[] tab = table;
        return tab[hash & (tab.length - 1)];
    }
}

下图是依次插入 ABC 三个 HashEntry 节点后,Segment 的结构示意图:

2.ConcurrentHashMap源码分析

ConcurrentHashMap 在默认并发级别会创建包含 16 个 Segment 对象的数组。每个 Segment 的成员对象 table 包含若干个散列表的桶。每个桶是由 HashEntry 链接起来的一个链表。如果键能均匀散列,每个 Segment 大约守护整个散列表中桶总数的 1/16。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {

    /**
     * 默认的hash表的容量
     */
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    /**
     * 默认加载因子
     */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    /**
     * 默认的并发级别,即Segment的数目
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /**
     * 最大容量
     */
    static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * 最大segments数量
     */
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

    /**
     * 
     Number of unsynchronized retries in size and containsValue
     * methods before resorting to locking. This is used to avoid
     * unbounded retries if tables undergo continuous modification
     * which would make it impossible to obtain an accurate result.
     */
    static final int RETRIES_BEFORE_LOCK = 2;

    /**
     * segment掩码,用于找到处于哪个segments
     */
    final int segmentMask;

    /**
     * Shift value for indexing within segments.
     */
    final int segmentShift;

    /**
     * segments, 每个segment是一个hash表
     */
    final Segment<K,V>[] segments;

    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // 找到最接近concurrencyLevel的2的幂,用来做最终的concurrencyLevel,比如传入的
        //concurrencyLevel为10,则最终concurrencyLevel为16
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;
        this.segments = Segment.newArray(ssize);

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1;

        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }
}

下面是 ConcurrentHashMap 的结构示意图:

2.1 高并发的put

以 put 操作为例说明对 ConcurrentHashMap 做结构性修改的过程。

1
2
3
4
5
6
public V put(K key, V value) {
    if (value == null) throw new NullPointerException();
    int hash = hash(key.hashCode());    // 计算键对应的散列码
    // 根据散列码找到对应的 Segment
    return segmentFor(hash).put(key, hash, value, false);
}

segmentFor代码:首先左移segmentShift位(比如segment总数为16, 则segmentShift为4),高维填充0,再和segmentMask做掩码(比如segment总数为16,则掩码为216-1)

1
2
3
final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

segment内的put逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock(); //加锁
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);    //找到桶
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        //遍历桶的链表,现有hash值比较,再用key比较
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue;
        if (e != null) {    //key在链表中找到,则执行更新
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        else {  //key在链表中找不到,新插入到表头
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}

从put的代码逻辑我们可以看出来,其他写线程对另外 15 个(假设全部为16个)Segment 的加锁并不会因为当前线程对这个 Segment 的加锁而阻塞。每次put锁住的只是对应的segment而不是整个表,即同时允许多个线程操作不同的segment,对整个HashMap而言,就是同时运行多个线程写这个HashMap。同时,所有读线程几乎不会因本线程的加锁而阻塞(除非读线程刚好读到这个 Segment 中某个 HashEntry 的 value 域的值为 null,此时需要加锁后重新读取该值)。

相比较于 HashTable 和由同步包装器包装的 HashMap每次只能有一个线程执行读或写操作,ConcurrentHashMap 在并发访问性能上有了质的提高。在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作(如果并发级别设置为 16),及任意数量线程的读操作。

2.2 读操作对加锁的需求

在ConcurrentHashMap中有两个策略来降低读操作对加锁的需求:

(1).HashEntry 中的 key,hash,next 都声明为 final 型:
在代码清单HashEntry 类的定义中我们可以看到,HashEntry 中的 key,hash,next 都声明为 final 型。这意味着,不能把节点添加到链接的中间和尾部,也不能在链接的中间和尾部删除节点。这个特性可以保证:在访问某个节点时,这个节点之后的链接不会被改变。这个特性可以大大降低处理链表时的复杂性;
(2).HashEntry 类的 value 域被声明为 Volatile 型:
同时,HashEntry 类的 value 域被声明为 Volatile 型,Java 的内存模型可以保证:某个写线程对 value 域的写入马上可以被后续的某个读线程“看”到。在 ConcurrentHashMap 中,不允许用 null 作为键和值,当读线程读到某个 HashEntry 的 value 域的值为 null 时,便知道产生了冲突(发生了重排序现象),需要加锁后重新读入这个 value 值。这些特性互相配合,使得读线程即使在不加锁状态下,也能正确访问 ConcurrentHashMap;

由于对 Volatile 变量的写入操作将与随后对这个变量的读操作进行同步。当一个写线程修改了某个 HashEntry 的 value 域后,另一个读线程读这个值域,Java 内存模型能够保证读线程读取的一定是更新后的值。所以,写线程对链表的非结构性修改能够被后续不加锁的读线程看到;

2.3 结构性变更

上面的2.2中的第(2)点能保证我们写线程对value的变更,能够被读线程看到。但是,如果对 ConcurrentHashMap 做结构性修改,怎么保证读线程尽可能少被阻塞?

对 ConcurrentHashMap 做结构性修改,实质上是对某个桶指向的链表做结构性修改。如果能够确保:在读线程遍历一个链表期间,写线程对这个链表所做的结构性修改不影响读线程继续正常遍历这个链表。那么读 / 写线程之间就可以安全并发访问这个 ConcurrentHashMap。

结构性修改操作包括 put,remove,clear。下面我们分别分析这三个操作。

clear操作

clear 操作只是把 ConcurrentHashMap 中所有的桶“置空”,每个桶之前引用的链表依然存在,只是桶不再引用到这些链表(所有链表的结构并没有被修改)。正在遍历某个链表的读线程依然可以正常执行对该链表的遍历。

put操作

从上面的代码清单“在 Segment 中执行具体的 put 操作”中,我们可以看出:put 操作如果需要插入一个新节点到链表中时 , 会在链表头部插入这个新节点。此时,链表中的原有节点的链接并没有被修改。也就是说:插入新健 / 值对到链表中的操作不会影响读线程正常遍历这个链表。

remove操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public V remove(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).remove(key, hash, null);
}

V remove(Object key, int hash, Object value) {
    lock();
    try {
        int c = count - 1;
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;

        V oldValue = null;
        if (e != null) {    //找到了节点
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                ++modCount;
                HashEntry<K,V> newFirst = e.next;   //待删除节点的next
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}

和 get 操作一样,首先根据散列码找到具体的链表;然后遍历这个链表找到要删除的节点;最后把待删除节点之后的所有节点原样保留在新链表中,把待删除节点之前的每个节点克隆到新链表中。下面通过图例来说明 remove 操作。假设写线程执行 remove 操作,要删除链表的 C 节点,另一个读线程同时正在遍历这个链表:

从上图可以看出,删除节点 C 之后的所有节点原样保留到新链表中;删除节点 C 之前的每个节点被克隆到新链表中,注意:它们在新链表中的链接顺序被反转了。

在执行 remove 操作时,原始链表并没有被修改,也就是说:读线程不会受同时执行 remove 操作的并发写线程的干扰。

综合上面的分析我们可以看出,写线程对某个链表的结构性修改不会影响其他的并发读线程对这个链表的遍历访问。

2.4 读操作

首先找到对应的segment,再在segment内找对应的数据,当读到的数据为null的时候,加锁再重新读(因为 ConcurrentHashMap不允许value为null,因此正常情况下不会出现null):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

V get(Object key, int hash) {
    if (count != 0) { // 读volatile变量值
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                //v == null,说明发生了重排序,需要加锁后重新读取
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}

//加锁读
V readValueUnderLock(HashEntry<K,V> e) {
    lock();
    try {
        return e.value;
    } finally {
        unlock();
    }
}

在 ConcurrentHashMap 中,所有执行写操作的方法(put, remove, clear),在对链表做结构性修改之后,在退出写方法前都会去写这个 count 变量。所有未加锁的读操作(get, contains, containsKey)在读方法中,都会首先去读取这个 count 变量。

根据 Java 内存模型,对 同一个 volatile 变量的写 / 读操作可以确保:写线程写入的值,能够被之后未加锁的读线程“看到”。

这个特性和前面介绍的 HashEntry 对象的不变性相结合,使得在 ConcurrentHashMap 中,读线程在读取散列表时,基本不需要加锁就能成功获得需要的值。这两个特性相配合,不仅减少了请求同一个锁的频率(读操作一般不需要加锁就能够成功获得值),也减少了持有同一个锁的时间(只有读到 value 域的值为 null 时 , 读线程才需要加锁后重读)。

3.总结

ConcurrentHashMap 是一个并发散列映射表的实现,它允许完全并发的读取,并且支持给定数量的并发更新。相比于 HashTable 和用同步包装器包装的 HashMap(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap 拥有更高的并发性。在 HashTable 和由同步包装器包装的 HashMap 中,使用一个全局的锁来同步不同线程间的并发访问。同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器。这虽然保证多线程间的安全并发访问,但同时也导致对容器的访问变成串行化的了。

ConcurrentHashMap 的高并发性主要来自于三个方面:

(1).用分离锁(Segment锁)实现多个线程间的更深层次的共享访问;
(2).用 HashEntery 对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求;
(3).通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性;

参考

http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/#icomments