xiaobaoqiu Blog

Think More, Code Less

Selenium

1.什么是Selenium

Selenium 是一种 Web 应用的自动测试工具,通过模拟用户对 Web 页面的各种操作,可以精确重现软件测试人员编写的 Test Cases 步骤。它所采用的原理是通过录制应用程序产生的线性脚本进行回放从而达到自动化测试的目的。其优点是简单,通过录制就可以得到所需脚本。类似于录制/回放测试工具有很多,我们之所以选择它的原因是它是开源的,而且它测试直接在浏览器中运行,就像真实用户所做的一样。Selenium测试可以在 Windows、Linux上的 Internet Explorer、Mozilla和 Firefox 中运行。其他测试工具都不能覆盖如此多的平台,更重要的是Selenium支持多种语言、JAVA、Ruby、Python等。

Selenium的核心,也称browser bot,是用JavaScript编写的。这使得测试脚本可以在受支持的浏览器中运行。browser bot负责执行从测试脚本接收到的命令,测试脚本要么是用 HTML的表布局编写的,要么是使用一种受支持的编程语言编写的。

1.1 Selenium名字的来源

Selenium的中文名为“硒”,是一种化学元素的名字,它对汞(Mercury)有天然的解毒作用,实验表明汞暴露水平越高,硒对汞毒性的拮抗作用越明显,所以说硒是汞的克星。大家应该知道Mercury测试工具系 列吧(QTP,QC,LR,WR…),他们功能强大,但却价格不菲,大家对此又爱又恨!故thoughtworks特意把他们的Web开源测试工具命 名为Selenium,以此帮助大家脱离汞毒。

Selenium 是thoughtworks所做的,用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。支持的浏览器包括IE、Mozilla Firefox、Mozilla Suite等。

在selenium2.0中,包含2个部分 selenium ide 和 selenium webdriver.

1.2 优势

使用 Selenium 和在浏览器中运行测试还有很多其他好处。下面是主要的两大好处:

(1).通过编写模仿用户操作的 Selenium 测试脚本,可以从终端用户的角度来测试应用程序;
(2).通过在不同浏览器中运行测试,更容易发现浏览器的不兼容性;

1.3 Selenium 模式

可以按两种模式来使用test runner 和 driven,这两种模式在复杂性和编写方式方面有所不同。driven 测试脚本编写起来往往要更复杂一些,因为它们是用编程语言编写的。

两种模式之间最大的不同点在于:如果使用 driven 脚本,测试有一部分在浏览器之外运行,而如果使用 test runner 脚本的话,测试是完全在浏览器中运行的。

不管是 test runner 还是 driven 测试用例,都可以与持续集成工具集成。

本文这里只简单介绍了 driven 模式。

2.driven 模式

driven Selenium 脚本是用多种受支持的编程语言中的一种编写的,目前可用的有 Java、Ruby 和 Python 等驱动程序。这些脚本在浏览器之外的一个单独的进程中运行。驱动程序的任务是执行测试脚本,并通过与运行在浏览器中的 browser bot 进行通信来驱动浏览器。驱动程序与 browser bot 之间的通信使用一种简单的特定于 Selenium 的连接语言 Selenese。

driven 脚本比 test runner 脚本更强大、更灵活,可以将它们与 xUnit (比如JUnit)框架集成。driven 脚本的缺点(与 test runner 脚本相比)是,这种脚本编写和部署起来更复杂。这是因为驱动程序必须执行以下任务:

(1).启动服务器;
(2).部署所测试的应用程序;
(3).部署测试脚本;
(4).启动浏览器;
(5).发送命令到 browser bot;
(6).验证 browser bot 执行的命令的结果;

3.简单case

这里用Java实现一个简单的使用Selenium的case

3.1 Maven包

使用现有的maven包:

1
2
3
4
<dependency>
    <groupId>org.seleniumhq.selenium</groupId>
    <artifactId>selenium-java</artifactId>
</dependency>

3.2 简单case

下面结合JUnit以打开google为例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import com.google.common.collect.Lists;
import com.qunar.scm.common.BrowserDriver;
import org.apache.commons.collections.CollectionUtils;
import org.junit.Assert;
import org.junit.Test;
import org.openqa.selenium.By;
import org.openqa.selenium.Keys;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * 测试Google
 *
 * @author baoqiu.xiao@qunar.com  Date: 14-12-31 Time: 下午1:42
 * @version \$Id$
 */
public class GoogleTest extends AbstractBaseTest {

    private static final Logger logger = LoggerFactory.getLogger(GoogleTest.class);

    private static final String GOOGLE_URL = "https://www.google.com.hk";

    @Test
    public void testGoogle() {
        //当前测试case名称
        caseName = Thread.currentThread().getStackTrace()[1].getMethodName();

        boolean flag = testCommonProcess(currentDriverList(), new BrowserVisiter() {
            @Override
            public boolean visit(BrowserDriver browserDriver) {
                return doTestGoogle(browserDriver);
            }
        });

        Assert.assertTrue(flag);
    }

    /**
     * 实际的测试逻辑
     *
     * @param browserDriver
     */
    private boolean doTestGoogle(final BrowserDriver browserDriver) {
        if(browserDriver == null) {
            logger.error("browserDriver is null");
            return false;
        }

        WebDriver driver = browserDriver.getWebDriver();

        //1.打开页面
        driver.get(GOOGLE_URL);
        driver.manage().window().maximize();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //2.在搜索框内输入关键字,输入回车
        WebElement keyWord = driver.findElement(By.name("q"));
        keyWord.sendKeys("xiaobaoqiu");
        keyWord.sendKeys(Keys.RETURN);

        return true;
    }



    /**
     * 当前需要测试的浏览器列表,这里只使用了firefox
     *
     * @return
     */
    @Override
    protected List<BrowserDriver> currentDriverList() {
        return Lists.newArrayList(BrowserDriver.FIREFOX_DRIVER);
    }

    /**
     * 生成测试报告
     *
     * @param driverList
     * @param result
     */
    @Override
    protected void doReportResult(List<BrowserDriver> driverList, List<Boolean> result) {
        if (CollectionUtils.isEmpty(driverList) || CollectionUtils.isEmpty(result)) {
            logger.error("isEmpty(driverList) or isEmpty(result)");
            return;
        }

        if (driverList.size() != result.size()) {
            logger.error("driverList.size() != result.size()");
            return;
        }

        logger.info("BEG====================================={}=====================================BEG", caseName);
        for (int i = 0; i < driverList.size(); i++) {
            logger.info("{} = {}", driverList.get(i).getBrowser().getName(), resultString(result.get(i)));
        }
        logger.info("END====================================={}=====================================END", caseName);
    }
}

结果:

3.3 注意点

一些Selenuim的注意点,持续补充中:

(1).Selenium默认状态下只支持Firefox,支持Chrome和IE需要安装插件;

4.参考

http://code.google.com/p/selenium/

https://www.ibm.com/developerworks/cn/java/wa-selenium-ajax/

Java并发容器之BlockingQueue

这篇文章主要分析Concurrent包中的BlockingQueue系列的并发容器实现原理和源代码.

1.实现原理

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列.

下图展示了如何通过阻塞队列来合作:

线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素.

1.1 阻塞队列简单实现

下面是阻塞队列的一个简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class BlockingQueue {

    private List queue = new LinkedList();
    private int  limit = 10;

    public BlockingQueue(int limit){
        this.limit = limit;
    }

    //入队列
    public synchronized void enqueue(Object item) throws InterruptedException {

        while(this.queue.size() == this.limit) {
            wait();
        }

        if(this.queue.size() == 0) {
            notifyAll();
        }

        this.queue.add(item);
    }

    //出队列
    public synchronized Object dequeue() throws InterruptedException{

        while(this.queue.size() == 0){
            wait();
        }

        if(this.queue.size() == this.limit){
            notifyAll();
        }

        return this.queue.remove(0);
    }
}

必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。

2.Java BlockingQueue整体介绍

BlockingQueue系列并发容器包括一下几个:

BlockingQueue
BlockingDeque
ArrayBlockingQueue
LinkedBlockingDeque
LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue

2.1 其继承关系如下图:

2.2 主要的方法如下:

放入数据

(1).offer(anObject)
表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);
(2).offer(E o, long timeout, TimeUnit unit)可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败;
(3).put(anObject)
把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

获取数据

(1).poll(time)
取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
(2).poll(long timeout, TimeUnit unit)
从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据,否则知道时间超时还没有数据可取,返回失败;
(3).take()
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; 
(4).drainTo()
一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

3. ArrayBlockingQueue

数组实现的BlockingQueue,实现上用一个数组保存数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private static final long serialVersionUID = -817911632652898426L;

    /** 队列数据  */
    private final E[] items;
    /** 下一个take, poll 或者 remove的下标 */
    private int takeIndex;
    /** 下一个 put, offer 或者 add的下标 */
    private int putIndex;
    /** 队列内元素的数目 */
    private int count;

    /** 锁 */
    private final ReentrantLock lock;
    /** 等待take的Condition */
    private final Condition notEmpty;
    /** 等待puts的Condition */
    private final Condition notFull;
}

三个构造函数,可以输入容量,锁的公平性等,默认是锁非公平的,注意这里的锁是独占的ReentrantLock,因为这里从队列(数组)中读取数据和旺队列中写数据都应该和别的线程是互斥的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//默认锁是不公平的
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.items = (E[]) new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
//c为初始数据
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);
    if (capacity < c.size()) throw new IllegalArgumentException();

    for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
        add(it.next());
}

下面分析offer,put,poll,take这几个主要的函数实现:

3.1 offer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();    //不支持null元素
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)  //队列满,直接返回false
            return false;
        else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

insert的实现:

1
2
3
4
5
6
private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}

即offer不能插入立即返回,插入成功会触发notEmpty这个Condition,唤醒那些等待读取数据的被阻塞的线程

3.2 put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final E[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();   //可被interrupt的lock
    try {
        try {
            while (count == items.length)
                notFull.await();
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        insert(e);
    } finally {
        lock.unlock();
    }
}

put不能插入时候,会一直被阻塞,但是这个阻塞的线程是可以被interrupt的.

3.3 poll

1
2
3
4
5
6
7
8
9
10
11
12
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == 0) //没有数据可取,直接返回
            return null;
        E x = extract();
        return x;
    } finally {
        lock.unlock();
    }
}

其中extract()的实现如下:

1
2
3
4
5
6
7
8
9
private E extract() {
    final E[] items = this.items;
    E x = items[takeIndex];
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
}

poll成功会触发notFull这个Condition,即唤醒所有等待插入数据的线程.

3.4 take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (count == 0)
                notEmpty.await();
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        E x = extract();
        return x;
    } finally {
        lock.unlock();
    }
}

take当无数据可取的时候,线程会一致阻塞,但是线程的阻塞状态是可以被interrupt的.

4. LinkedBlockingQueue

LinkedBlockingQueue即一个单链表实现的阻塞队列,单链表节点定义如下:

1
2
3
4
5
6
static class Node<E> {
    E item;

    Node<E> next;
    Node(E x) { item = x; }
}

内部实现,链表实现,注意有两个锁,即一个读取的锁(takeLock)和一个写入的锁(putLock).原因是put和take是不冲突的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** 容量 */
    private final int capacity;

    /** 当前元素个数 */
    private final AtomicInteger count = new AtomicInteger(0);

    /** 链表首节点 */
    private transient Node<E> head;

    /** 链表尾节点 */
    private transient Node<E> last;

    /** take, poll等操作的锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** takes操作的等待Condition */
    private final Condition notEmpty = takeLock.newCondition();

    /** put, offer等的锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** puts操作的等待Condition */
    private final Condition notFull = putLock.newCondition();
}

默认构造指定最大容量为Integer.MAX_VALUE,通过构造函数可以指定容量.

4. PriorityBlockingQueue

PriorityBlockingQueue是基于PriorityQueue实现的一个优先级阻塞队列。而PriorityQueue是基于二叉堆实现的优先级队列,它每次从队列中取出的是具有最高优先权的元素。

1
2
3
4
5
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private final PriorityQueue<E> q;
    private final ReentrantLock lock = new ReentrantLock(true);
    private final Condition notEmpty = lock.newCondition();

构造函数可以指定比较器(Comparator),如果不指定就按照对象的自然顺序(对象必须是Comparable的)。

5.DelayQueue

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。

Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。

DelayQueue是基于PriorityQueue实现。简单的说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

1
2
3
4
5
6
7
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private transient final ReentrantLock lock = new ReentrantLock();
    private transient final Condition available = lock.newCondition();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
}

看起take函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek(); //第一个元素
            if (first == null) {
                available.await();
            } else {
                long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                if (delay > 0) {    //没有过期,继续等待
                    long tl = available.awaitNanos(delay);
                } else {    //过期
                    E x = q.poll();
                    assert x != null;
                    if (q.size() != 0)
                        available.signalAll(); //唤醒其他等待take的线程
                    return x;
                }
            }
        }
    } finally {
        lock.unlock();
    }
}

Java Tips

总结日常代码出现的问题.

1.获取周一日期

期望根据当前日期获取周一日期.

Date today = CommonDateTimeUtils.parseDate("2014-11-16");   //周日
System.out.println("today = " + today);

Calendar calendar = Calendar.getInstance();
calendar.setTime(today);
calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
Date monday = DateUtils.truncate(calendar.getTime(), Calendar.DAY_OF_MONTH);
System.out.println("monday = " + monday);

如上,today是2014-11-16(周日),获取周一的时间,我们期望获取得到2014-11-10(周一),实际周一的时间为2014-11-17(周一).

原因是时间系统默认周日是一周的第一天.我们可以设置周一是一周的第一天.

Date today = CommonDateTimeUtils.parseDate("2014-11-16");   //周日
System.out.println("today = " + today);

Calendar calendar = Calendar.getInstance();
calendar.setFirstDayOfWeek(Calendar.MONDAY);
calendar.setTime(today);
calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
Date monday = DateUtils.truncate(calendar.getTime(), Calendar.DAY_OF_MONTH);
System.out.println("monday = " + monday);

2.Velocity String to number

经常在Velocity中将字符串转成数字,比如double或者int,写法如下:

1
2
#set($Integer = 0)
$Integer.parseInt($intString)

大概的理解就是:volocity会把基本数据类型的静态方法转换到velocity里。所以,定义一个int,就能调用Integer中的方法。

注意其中第一句#set($Integer = 0)是必须的。

Java Lock

这篇博客的目的是了解Java Concurrent包中Lock的实现原理.

java.util.concurrent.locks包下提供了一系列关于锁的抽象的类,主要实现有两种锁ReentrantLock和ReentrantReadWriteLock.

1.一个简单的锁

让我们从java中的一个同步块开始

1.1 synchronized实现同步块

1
2
3
4
5
6
7
8
9
public class Counter{
    private int count = 0;

    public int inc(){
        synchronized(this){
            return ++count;
        }
    }
}

可以看到在inc()方法中有一个synchronized(this)代码块。该代码块可以保证在同一时间只有一个线程可以执行return ++count操作。虽然在synchronized的同步块中的代码可以更加复杂,但是++count这种简单的操作已经足以表达出线程同步的意思。

1.2 Lock实现同步块

以下的Counter类用Lock代替synchronized可以达到了同样的目的:

1
2
3
4
5
6
7
8
9
10
11
public class Counter{
    private Lock lock = new Lock(); //锁
    private int count = 0;

    public int inc(){
        lock.lock();
        int newCount = ++count;
        lock.unlock();
        return newCount;
    }
}

lock()方法会对Lock实例对象进行加锁,因此所有对该对象调用lock()方法的线程都会被阻塞,直到该Lock对象的unlock()方法被调用。

1.3 Lock类实现

这里有一个Lock类的简单实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Lock{
    private boolean isLocked = false;

    public synchronized void lock()
        throws InterruptedException{
        while(isLocked){
            wait();         //阻塞
        }
        isLocked = true;
    }

    public synchronized void unlock(){
        isLocked = false;
        notify();           //唤醒
    }
}

注意其中的while(isLocked)循环,它又被叫做“自旋锁”。自旋锁以及wait()和notify()方法在线程通信这篇文章中有更加详细的介绍。

当isLocked为true时,调用lock()的线程在wait()调用上阻塞等待。为防止该线程没有收到notify()调用也从wait()中返回,这个线程会重新去检查isLocked条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。如果isLocked为false,当前线程会退出while(isLocked)循环,并将isLocked设回true,让其它正在调用lock()方法的线程能够在Lock实例上加锁。

当线程完成了临界区(锁住的代码端,即位于lock()和unlock()之间)中的代码,就会调用unlock()。执行unlock()会重新将isLocked设置为false,并且通知(唤醒)其中一个(若有的话)在lock()方法中调用了wait()函数而处于等待状态的线程。

1.4 在finally语句中调用unlock()

如果用Lock来保护临界区,并且临界区有可能会抛出异常,那么在finally语句中调用unlock()就显得非常重要了,这样可以保证这个锁对象可以被解锁以便其它线程能继续对其加锁:

lock.lock();
try{
    //do critical section code,
    //which may throw exception
} finally {
    lock.unlock();
}

这个简单的结构可以保证当临界区抛出异常时Lock对象可以被解锁。如果不是在finally语句中调用的unlock(),当临界区抛出异常时,Lock对象将永远停留在被锁住的状态,这会导致其它所有在该Lock对象上调用lock()的线程一直阻塞。

2.锁的可重入性

2.1 什么是可重入

如果一个线程持有某个管程对象(monitor object)上的锁,那么它就有权访问所有在该管程对象上同步块。这就叫可重入。 通俗讲即若线程已经持有锁,那么它就可以重复访问所有使用该锁的代码块。

2.2 synchronized同步块是可重入的

Java中的synchronized同步块是可重入的。这意味着如果一个java线程进入了代码中的synchronized同步块,并因此获得了该同步块使用的同步对象对应的管程上的锁,那么这个线程可以进入由同一个管程对象所同步的另一个java代码块。因此下面的代码没有问题:

1
2
3
4
5
6
7
8
9
public class Reentrant{
    public synchronized outer(){
        inner();
    }

    public synchronized inner(){
        //do something
    }
}

注意outer()和inner()都声明为synchronized,这在Java中这相当于synchronized(this)块.如果某个线程调用了outer(),outer()中的inner()调用是没问题的,因为两个方法都是在同一个管程对象(即this)上同步的。

2.3 不可重入锁

1.3节的锁实现是不可重入的,原因是如果一个线程在两次调用lock()间没有调用unlock()方法,那么第二次调用lock()就会被阻塞,这就出现了重入锁死,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Reentrant2{
    Lock lock = new Lock();

    public outer(){
        lock.lock();
        inner();
        lock.unlock();
    }

    public synchronized inner(){
        lock.lock();
        //do something
        lock.unlock();
    }
}

调用outer()的线程首先会锁住Lock实例,然后继续调用inner()。inner()方法中该线程将再一次尝试锁住Lock实例,结果该动作会失败(也就是说该线程会被阻塞),因为这个Lock实例已经在outer()方法中被锁住了。

2.4 修改成可重入锁

为了让这个Lock类具有可重入性,我们需要对它做一点小的改动:

(1).可重入锁需要一个重入计数变量,初始值设为0,当成功请求锁时加1,释放锁时减1,当释放锁之后计数为0则真正释放锁;
(2).重入锁还必须持有对锁持有者的引用,用以判断是否可以重入;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Lock{
    boolean isLocked = false;
    Thread  lockedBy = null;    //阻塞的线程
    int lockedCount = 0;        //同一个线程加锁的次数

    public synchronized void lock() throws InterruptedException{
        Thread callingThread = Thread.currentThread();
        while(isLocked && lockedBy != callingThread){
            wait();
        }
        isLocked = true;
        lockedCount++;
        lockedBy = callingThread;
    }

    public synchronized void unlock(){
        if(Thread.curentThread() == this.lockedBy){
            lockedCount--;

            if(lockedCount == 0){   //锁次数为0之后才解锁
                isLocked = false;
                notify();
            }
        }
    }
}

注意到现在的while循环(自旋锁)也考虑到了已锁住该Lock实例的线程。如果当前的锁对象没有被加锁(isLocked = false),或者当前调用线程已经对该Lock实例加了锁,那么while循环就不会被执行,调用lock()的线程就不会被阻塞了。

除此之外,我们需要记录同一个线程重复对一个锁对象加锁的次数。否则,一次unblock()调用就会解除整个锁,即使当前锁已经被加锁过多次。在unlock()调用没有达到对应lock()调用的次数之前,我们不希望锁被解除。

3.锁的公平性

Java的synchronized块并不保证尝试进入它们的线程的顺序。如果一个线程因为CPU时间全部被其他线程抢走而得不到CPU运行时间,这种状态被称之为“饥饿”。而该线程被“饥饿致死”正是因为它得不到CPU运行时间的机会。解决饥饿的方案被称之为“公平性”,即所有线程均能公平地获得运行机会。

通俗讲,如果在绝对时间上,先对锁进行获取的请求一定被先满足,那么这个锁是公平的,反之,是不公平的,也就是说等待时间最长的线程最有机会获取锁,也可以说锁的获取是有序的。

3.1 Java中导致饥饿的原因

(1).高优先级线程抢占所有的低优先级线程的CPU时间

我们能为每个线程设置独自的线程优先级,优先级越高的线程获得的CPU时间越多,线程优先级值设置在1到10之间,这些优先级值所表示行为的准确解释则依赖于你的应用运行平台。

(2).线程被永久堵塞在一个等待进入同步块的状态

Java的同步代码区也是一个导致饥饿的因素。Java的同步代码区对哪个线程允许进入的次序没有任何保障。这就意味着理论上存在一个试图进入该同步区的线程处于被永久堵塞的风险,因为其他线程总是能持续地先于它获得访问,这即是“饥饿”问题,而一个线程被“饥饿致死”正是因为它得不到CPU运行时间的机会。

(3).线程在等待一个本身也处于永久等待完成的对象(比如调用这个对象的wait方法)

如果多个线程处在wait()方法执行上,而对其调用notify()不会保证哪一个线程会获得唤醒,任何线程都有可能处于继续等待的状态。因此存在这样一个风险:一个等待线程从来得不到唤醒,因为其他等待线程总是能被获得唤醒。

3.2 实现公平锁

下面来讲述将上面Lock类转变为公平锁FairLock。基本原理是每一个调用lock()的线程都会进入一个队列,当解锁后,只有队列里的第一个线程被允许锁住Farlock实例,所有其它的线程都将处于等待状态,直到他们处于队列头部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class FairLock {
    private boolean           isLocked       = false;
    private Thread            lockingThread  = null;
    //用于等待的线程队列
    private List<QueueObject> waitingThreads = new ArrayList<QueueObject>();

    public void lock() throws InterruptedException{
        QueueObject queueObject           = new QueueObject();
        boolean     isLockedForThisThread = true;
        synchronized(this){ //加入等到队列
            waitingThreads.add(queueObject);
        }
        while(isLockedForThisThread){
            synchronized(this){
                isLockedForThisThread =
                isLocked || waitingThreads.get(0) != queueObject;
                if(!isLockedForThisThread){ //当前线程没有被阻塞
                    isLocked = true;
                    waitingThreads.remove(queueObject);
                    lockingThread = Thread.currentThread();
                    return;
                }
            }
            //isLockedForThisThread == true
            try{
                queueObject.doWait();   //通知当前线程等待
            }catch(InterruptedException e){
                synchronized(this) {
                    waitingThreads.remove(queueObject);
                }
                throw e;
            }
        }
    }

    public synchronized void unlock(){
        if(this.lockingThread != Thread.currentThread()){
          throw new IllegalMonitorStateException(
            "Calling thread has not locked this lock");
        }
        isLocked      = false;
        lockingThread = null;
        if(waitingThreads.size() > 0){
            waitingThreads.get(0).doNotify();
        }
    }
}

QueueObject实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class QueueObject {
    private boolean isNotified = false; //被唤醒

    public synchronized void doWait() throws InterruptedException {
        while(!isNotified){
            this.wait();
        }
        this.isNotified = false;
    }

    public synchronized void doNotify() {
        this.isNotified = true;
        this.notify();  //唤醒等待当前QueueObject对象的线程
    }

    public boolean equals(Object o) {
        return this == o;
    }
}

首先注意到lock()方法不在声明为synchronized,取而代之的是对必需同步的代码,在synchronized中进行嵌套。

FairLock新创建了一个QueueObject的实例,并对每个调用lock()的线程进行入队列。调用unlock()的线程将从队列头部获取QueueObject,并对其调用doNotify(),以唤醒在该对象上等待的线程。通过这种方式,在同一时间仅有一个等待线程获得唤醒,而不是所有的等待线程。这也是实现FairLock公平性的核心所在。

请注意,在同一个同步块中,锁状态依然被检查和设置,以避免出现滑漏条件。

还需注意到,QueueObject实际是一个信号量(Semaphore)。doWait()和doNotify()方法在QueueObject中保存着信号。这样做以避免一个线程在调用queueObject.doWait()之前被另一个调用unlock()并随之调用queueObject.doNotify()的线程重入,从而导致信号丢失。queueObject.doWait()调用放置在synchronized(this)块之外,以避免被monitor嵌套锁死,所以另外的线程可以解锁,只要当没有线程在lock方法的synchronized(this)块中执行即可。

最后,注意到queueObject.doWait()在try – catch块中是怎样调用的。在InterruptedException抛出的情况下,线程得以离开lock(),并需让它从队列中移除。

4.ReentrantLock源码

ReentrantLock是java中可重入锁的一个实现,一次只能有一个线程持有锁,也即所谓独占锁的概念。它包含三个内部类:Sync、NonfairSync、FairSync,通过构造函数的参数来指定锁是否是公平的,下面是一些核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;
    //默认是不公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    //参数fair决定,true为公平锁实现,false为非公平锁实现
    public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
    }

    public void lock() {
        sync.lock();
    }
    //不公平锁
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    //带超时时间的锁
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public void unlock() {
        sync.release(1);
    }
    //是否有线程等待当前锁
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    ...

我们可以发现,ReentrantLock都是把具体实现委托给内部类(Sync、NonfairSync、FairSync),ReentrantLock的重入计数是使用AbstractQueuedSynchronizer的state属性的,state大于0表示锁被占用、等于0表示空闲,小于0则是重入次数太多导致溢出了.

4.1 ReentrantLock.Sync

Sync类代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static abstract class Sync extends AbstractQueuedSynchronizer {

        abstract void lock();

        //非公平获取,公平锁和非公平锁都需要这个方法
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {   //state == 0表示无锁
                //CAS确保即使有多个线程竞争锁也是安全的
                if (compareAndSetState(0, acquires)) {  //加锁成功  
                    setExclusiveOwnerThread(current);   //设置当前持有锁的线程
                    return true;                        //获取成功
                }
            }
            else if (current == getExclusiveOwnerThread()) {//当前线程正是锁持有者
                int nextc = c + acquires;
                if (nextc < 0) // 被锁次数上溢(很少出现)
                    throw new Error("Maximum lock count exceeded");
                //锁被持有的情况下,只有持有者才能更新锁保护的资源
                setState(nextc);
                return true;
            }
            return false;
        }
        //释放
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            //只有锁的持有者才能释放锁
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {   //锁被释放
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        //当前线程是否持有锁
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        //锁的持有者
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
        //加锁次数
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
        //是否上锁,根据state字段可以判断
        final boolean isLocked() {
            return getState() != 0;
        }
    }

4.2 ReentrantLock.NonfairSync

公平锁的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final static class NonfairSync extends Sync {

     // 执行lock,尝试立即闯入,失败就退回常规流程
    final void lock() {
        if (compareAndSetState(0, 1))   //比较并设置state,成功则表示获取成功
            setExclusiveOwnerThread(Thread.currentThread());//锁持有者
        else
            acquire(1);//获取失败,进入常规流程:acquire会首先调用tryAcquire
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

acquire的实现(AbstractQueuedSynchronizer.java):

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

就是说常规流程下,首先会执行Sync类中非公平获取(nonfairTryAcquire(1))的过程.

4.3 ReentrantLock.FairSync

非公平锁的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final static class FairSync extends Sync {
    //见AbstractQueuedSynchronizer.java, 4.2节有
    final void lock() {
        acquire(1);
    }

    //公平版本的tryAcquire,除非是递归调用或没有等待者或者是第一个,否则不授予访问
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //是等待队列的第一个等待者
            if (isFirst(current) &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);   //加锁成功
                return true;
            }
        }
        //当前线程正是线程的持有者
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)  //溢出
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

isFirst的实现,即等待队列为空或者当前线程为等待队列的第一个元素:

final boolean isFirst(Thread current) {
    Node h, s;
    return ((h = head) == null ||
            ((s = h.next) != null && s.thread == current) ||
            fullIsFirst(current));
}

4.4 lock() VS lockInterruptibly()

先说线程请求锁的几个方法:

lock():拿不到lock就不罢休,不然线程就一直block;
tryLock():马上返回,拿到lock就返回true,不然返回false;
带时间限制的tryLock():拿不到lock,就等一段时间,超时返回false;

先说说线程的打扰机制,每个线程都有一个 打扰 标志。这里分两种情况:

1. 线程在sleep或wait,join, 此时如果别的进程调用此进程的 interrupt()方法,此线程会被唤醒并被要求处理InterruptedException;
2. 此线程在运行中,则不会收到提醒。但是 此线程的 “打扰标志”会被设置, 可以通过isInterrupted()查看并作出处理。

lockInterruptibly()和上面的第一种情况是一样的,线程在请求lock并被阻塞时,如果被interrupt,则“此线程会被唤醒并被要求处理InterruptedException”。

5.读写锁

假设你的程序中涉及到对一些共享资源的读和写操作,且写操作没有读操作那么频繁。在没有写操作的时候,两个线程同时读一个资源没有任何问题,所以应该允许多个线程能在同时读取共享资源;但是如果有一个线程想去写这些共享资源,就不应该再有其它线程对该资源进行读或写(也就是说:读-读能共存,读-写不能共存,写-写不能共存),这就需要一个读/写锁来解决这个问题。

5.1 读/写锁的Java实现

如果某个线程想要读取资源,只要没有线程正在对该资源进行写操作且没有线程请求对该资源的写操作即可。我们假设对写操作的请求比对读操作的请求更重要,即需要提升写请求的优先级;

此外,如果读操作发生的比较频繁,我们又没有提升写操作的优先级,那么就会产生“饥饿”现象,请求写操作的线程会一直阻塞,直到所有的读线程都从ReadWriteLock上解锁了(因为多个读可以共存),如果一直保证新线程的读操作权限,那么等待写操作的线程就会一直阻塞下去,结果就是发生“饥饿”;

因此,只有当没有线程正在锁住ReadWriteLock进行写操作,且没有线程请求该锁准备执行写操作时,才能保证读操作继续.

根据上面的描述,我们可以对读写访问资源的条件做个概述:

读:没有线程正在做写操作,且没有线程在请求写操作
写:没有线程正在做读写操作

按照上面的叙述,简单的实现出一个读/写锁,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ReadWriteLock{
    private int readers = 0;        //在读线程数
    private int writers = 0;        //在写线程数目
    private int writeRequests = 0;  //请求写线程数目
    //尝试读加锁,有写或者有写请求时候,等待
    public synchronized void lockRead() throws InterruptedException{
        while(writers > 0 || writeRequests > 0){
            wait();
        }
        readers++;
    }
    //读解锁
    public synchronized void unlockRead(){
        readers--;
        notifyAll();
    }
    //尝试写加锁
    public synchronized void lockWrite() throws InterruptedException{
        writeRequests++;
        //有在读或者在在写的线程
        while(readers > 0 || writers > 0){
            wait();
        }
        writeRequests--;
        writers++;
    }
    //写解锁
    public synchronized void unlockWrite() throws InterruptedException{
        writers--;
        notifyAll();
    }
}

在两个释放锁的方法(unlockRead,unlockWrite)中,都调用了notifyAll方法,而不是notify。要解释这个原因,我们可以想象下面一种情形:

如果有线程在等待获取读锁,同时又有线程在等待获取写锁。如果这时其中一个等待读锁的线程被notify方法唤醒,但因为此时仍有请求写锁的线程存在(writeRequests>0),所以被唤醒的线程会再次进入阻塞状态。然而,等待写锁的线程一个也没被唤醒,就像什么也没发生过一样(出现了所谓的信号丢失现象)。如果用的是notifyAll方法,所有的线程都会被唤醒,然后判断能否获得其请求的锁。

用notifyAll还有一个好处。如果有多个读线程在等待读锁且没有线程在等待写锁时,调用unlockWrite()后,所有等待读锁的线程都能立马成功获取读锁(共享读锁),而不是一次只允许一个。

5.2 读/写锁的重入

上面实现的读/写锁(ReadWriteLock) 是不可重入的,当一个已经持有写锁的线程再次请求写锁时,就会被阻塞。原因是已经有一个写线程了(就是它自己)。此外,考虑下面的例子:

Thread 1 获得了读锁
Thread 2 请求写锁,但因为Thread 1 持有了读锁,所以写锁请求被阻塞。
Thread 1 再想请求一次读锁,但因为Thread 2处于请求写锁的状态,所以想再次获取读锁也会被阻塞。

上面这种情形使用前面的ReadWriteLock就会被锁定,即一种类似于死锁的情形。导致不会再有线程能够成功获取读锁或写锁了。

为了让ReadWriteLock可重入,需要对它做一些改进,下面会分别处理读锁的重入和写锁的重入。

5.3 读锁重入

了让ReadWriteLock的读锁可重入,我们要先为读锁重入建立规则:

要保证某个线程中的读锁可重入,要么满足获取读锁的条件(没有写或写请求),要么已经持有读锁(不管是否有写请求).

要确定一个线程是否已经持有读锁,可以用一个map来存储已经持有读锁的线程以及对应线程获取读锁的次数,即Map<thread, count>,当需要判断某个线程能否获得读锁时,就利用map中存储的数据进行判断。下面是方法lockRead和unlockRead修改后的的代码(省略了写锁代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ReadWriteLock{
    //读锁计数器,记录当前持有读锁的线程以及加锁次数
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writers = 0;
    private int writeRequests = 0;

    public synchronized void lockRead() throws InterruptedException{
        Thread callingThread = Thread.currentThread();
        while(!canGrantReadAccess(callingThread)){
            wait();
        }

        readingThreads.put(callingThread, (getAccessCount(callingThread) + 1));
    }

    public synchronized void unlockRead(){
        Thread callingThread = Thread.currentThread();
        int accessCount = getAccessCount(callingThread);
        if(accessCount == 1) {  //加锁次数为0了
            readingThreads.remove(callingThread);
        } else {
            readingThreads.put(callingThread, (accessCount -1));
        }
        notifyAll();
    }
    //判断当前线程能否获得读锁
    private boolean canGrantReadAccess(Thread callingThread){
        if(writers > 0) return false;
        if(isReader(callingThread) return true;
        if(writeRequests > 0) return false;
        return true;
    }
    //获取这个读锁目前的加锁次数
    private int getReadAccessCount(Thread callingThread){
        Integer accessCount = readingThreads.get(callingThread);
        if(accessCount == null) return 0;
        return accessCount.intValue();
    }
    //当前线程是否为已加锁的读锁
    private boolean isReader(Thread callingThread){
        return readingThreads.get(callingThread) != null;
    }
}

代码中我们可以看到(canGrantReadAccess函数),只有在没有线程拥有写锁的情况下才允许读锁的重入,此外,重入的读锁比写锁请求优先级高。

5.4 写锁重入

因为写锁是排他锁,写锁的可重入规则比较简单:

仅当一个线程已经持有写锁,才允许写锁重入(再次获得写锁)。

下面是方法lockWrite和unlockWrite修改后的的代码(省略了写锁的代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;    //当前的写锁线程

    public synchronized void lockWrite() throws InterruptedException{
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while(!canGrantWriteAccess(callingThread)){
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() throws InterruptedException{
        writeAccesses--;
        if(writeAccesses == 0){
            writingThread = null;
        }
        notifyAll();
    }
    //判断能否加写锁
    private boolean canGrantWriteAccess(Thread callingThread){
        if(hasReaders()) return false;
        if(writingThread == null)    return true;
        if(!isWriter(callingThread)) return false;
        return true;
    }
    //有线程正持有读锁
    private boolean hasReaders(){
        return readingThreads.size() > 0;
    }
    //请求线程是否为当前已加写锁的线程
    private boolean isWriter(Thread callingThread){
        return writingThread == callingThread;
    }
}

5.5 读锁升级到写锁

有时,我们希望一个拥有读锁的线程,也能获得写锁。想要允许这样的操作,要求这个线程是唯一一个拥有读锁的线程(因为写锁是排他锁)。writeLock()需要做点改动来达到这个目的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;

    public synchronized void lockWrite() throws InterruptedException{
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while(!canGrantWriteAccess(callingThread)){
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() throws InterruptedException{
        writeAccesses--;
        if(writeAccesses == 0){
            writingThread = null;
        }
        notifyAll();
    }
    //能否获得写锁,如果是当前唯一的一个读锁,则可以升级获得写锁
    private boolean canGrantWriteAccess(Thread callingThread){
        if(isOnlyReader(callingThread)) return true;
        if(hasReaders()) return false;
        if(writingThread == null) return true;
        if(!isWriter(callingThread)) return false;
        return true;
    }

    private boolean hasReaders(){
        return readingThreads.size() > 0;
    }

    private boolean isWriter(Thread callingThread){
        return writingThread == callingThread;
    }
    //当前持有锁的线程是否为唯一的读锁线程
    private boolean isOnlyReader(Thread callingThread){
        return readers == 1 && readingThreads.get(callingThread) != null;
    }
}

5.6 写锁降级到读锁

有时拥有写锁的线程也希望得到读锁。如果一个线程拥有了写锁,那么自然其它线程是不可能拥有读锁或写锁了。所以对于一个拥有写锁的线程,再获得读锁,是不会有什么危险的。

我们仅仅需要对上面canGrantReadAccess方法进行简单地修改:

1
2
3
4
5
6
7
8
9
10
public class ReadWriteLock{
    //能否获得读锁,当当前持有的线程为一个写锁线程,则可以
    private boolean canGrantReadAccess(Thread callingThread){
        if(isWriter(callingThread)) return true;
        if(writingThread != null) return false;
        if(isReader(callingThread) return true;
        if(writeRequests > 0) return false;
        return true;
    }
}

5.7 可重入的ReadWriteLock的完整实现

下面是完整的ReadWriteLock实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class ReadWriteLock{
    private Map<Thread, Integer> readingThreads =
        new HashMap<Thread, Integer>();

    private int writeAccesses    = 0;
    private int writeRequests    = 0;
    private Thread writingThread = null;

    public synchronized void lockRead() throws InterruptedException{
        Thread callingThread = Thread.currentThread();
        while(!canGrantReadAccess(callingThread)){
            wait();
        }

        readingThreads.put(callingThread,
            (getReadAccessCount(callingThread) + 1));
    }

    private boolean canGrantReadAccess(Thread callingThread){
        if(isWriter(callingThread)) return true;
        if(hasWriter()) return false;
        if(isReader(callingThread)) return true;
        if(hasWriteRequests()) return false;
        return true;
    }


    public synchronized void unlockRead(){
        Thread callingThread = Thread.currentThread();
        if(!isReader(callingThread)){
            throw new IllegalMonitorStateException(
                "Calling Thread does not" +
                " hold a read lock on this ReadWriteLock");
        }
        int accessCount = getReadAccessCount(callingThread);
        if(accessCount == 1){
            readingThreads.remove(callingThread);
        } else {
            readingThreads.put(callingThread, (accessCount -1));
        }
        notifyAll();
    }

    public synchronized void lockWrite() throws InterruptedException{
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while(!canGrantWriteAccess(callingThread)){
            wait();
        }
        writeRequests--;
        writeAccesses++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite()
        throws InterruptedException{
        if(!isWriter(Thread.currentThread()){
        throw new IllegalMonitorStateException(
            "Calling Thread does not" +
            " hold the write lock on this ReadWriteLock");
        }
        writeAccesses--;
        if(writeAccesses == 0){
            writingThread = null;
        }
        notifyAll();
    }

    private boolean canGrantWriteAccess(Thread callingThread){
        if(isOnlyReader(callingThread)) return true;
        if(hasReaders()) return false;
        if(writingThread == null) return true;
        if(!isWriter(callingThread)) return false;
        return true;
    }


    private int getReadAccessCount(Thread callingThread){
        Integer accessCount = readingThreads.get(callingThread);
        if(accessCount == null) return 0;
        return accessCount.intValue();
    }


    private boolean hasReaders(){
        return readingThreads.size() > 0;
    }

    private boolean isReader(Thread callingThread){
        return readingThreads.get(callingThread) != null;
    }

    private boolean isOnlyReader(Thread callingThread){
        return readingThreads.size() == 1 &&
            readingThreads.get(callingThread) != null;
    }

    private boolean hasWriter(){
        return writingThread != null;
    }

    private boolean isWriter(Thread callingThread){
        return writingThread == callingThread;
    }

    private boolean hasWriteRequests(){
        return this.writeRequests > 0;
    }
}

在利用ReadWriteLock来保护临界区时,如果临界区可能抛出异常,在finally块中调用readUnlock()和writeUnlock()就显得很重要了。这样做是为了保证ReadWriteLock能被成功解锁,然后其它线程可以请求到该锁:

lock.lockWrite();
try{
    //do critical section code, which may throw exception
} finally {
    lock.unlockWrite();
}

6.ReentrantReadWriteLock源码

ReentrantLock 实现了标准的互斥操作,也就是一次只能有一个线程持有锁,也即所谓独占锁的概念。前面的章节中一直在强调这个特点。显然这个特点在一定程度上面减低了吞吐量,实际上独占锁是一种保守的锁策略,在这种情况下任何“读/读”,“写/读”,“写/写”操作都不能同时发生。

读写锁使用的场合是一个共享资源被大量读取操作,而只有少量的写操作(修改数据).

ReentrantReadWriteLock是java中可重入读写锁的一个实现.包含五个内部类:Sync,NonfairSync,FairSync,ReadLock,WriteLock.其中Sync,NonfairSync,FairSync用上公平锁和非公平锁的实现,ReadLock和WriteLock分别用于实现读锁和写锁.

ReentrantReadWriteLock的一些特性:

(1).写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性;读取锁是不能直接升级为写入锁的;
(2).读取锁和写入锁的数量最大分别只能是65535;

6.1 ReentrantReadWriteLock一些关键的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable  {
    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    private final Sync sync;

    //默认实现是非公平的
    public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public int getReadLockCount() {
        return sync.getReadLockCount();
    }

    public boolean isWriteLocked() {
        return sync.isWriteLocked();
    }
    //是否由当前线程持有写锁
    public boolean isWriteLockedByCurrentThread() {
        return sync.isHeldExclusively();
    }
    //读锁等待队列
    protected Collection<Thread> getQueuedWriterThreads() {
        return sync.getExclusiveQueuedThreads();
    }
    //写锁等待队列
    protected Collection<Thread> getQueuedReaderThreads() {
        return sync.getSharedQueuedThreads();
    }
}

6.2 读锁实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class ReadLock implements Lock, java.io.Serializable  {
    private final Sync sync;

    //加锁(共享锁)
    public void lock() {
        sync.acquireShared(1);
    }

    public  boolean tryLock() {
        return sync.tryReadLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    //解锁
    public  void unlock() {
        sync.releaseShared(1);
    }
}

6.3 写锁实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static class WriteLock implements Lock, java.io.Serializable  {
    private final Sync sync;
    //排他锁
    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryWriteLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.release(1);
    }
    //是否有当前线程持有
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    //加锁次数
    public int getHoldCount() {
        return sync.getWriteHoldCount();
    }
}

6.4 state

ReentrantLock和ReentrantReadWriteLock的实现很多细节都是由借由AbstractQueuedSynchronizer实现.

AQS中有一个state字段(int类型,32位)用来描述有多少线程获持有锁。在独占锁的时代这个值通常是0或者1(如果是重入的就是重入的次数),在共享锁的时代就是持有锁的数量。

ReentrantLock的锁是排他锁,因此只用来表示重入数.

ReentrantReadWriteLock的读、写锁是相关但是又不一致的,所以需要两个数来描述读锁(共享锁)和写锁(独占锁)的数量。显然现在一个state就不够用了。于是在ReentrantReadWrilteLock里面将这个字段一分为二,高位16位表示共享锁的数量,低位16位表示独占锁的数量(或者重入数量),这就是上节中提到的为什么共享锁和独占锁的数量(包括重入次数)最大只能是65535的原因了.

我们可以借此分析写入锁获取片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        //有读锁或者写锁持有者不是当前线程
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if ((w == 0 && writerShouldBlock(current)) ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

exclusiveCount©的实现很简单,就是用掩码去掉高16位(c & EXCLUSIVE_MASK).

writerShouldBlock的实现分为公平锁和非公平锁两个实现,不公平锁的实现

1
2
3
final boolean writerShouldBlock(Thread current) {
    return false;
}

即在非公平锁中写锁优先级高.

writerShouldBlock公平锁的实现:

1
2
3
4
final boolean writerShouldBlock(Thread current) {
    // only proceed if queue is empty or current thread at head
    return !isFirst(current);
}

即请求写锁的线程不是等待队列的第一个就会被阻塞,即读锁和写锁是公平排队的.

读锁的获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //有写锁并且写锁持有者不是当前线程(考虑写锁可以降级到读锁)
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
    if (sharedCount(c) == MAX_COUNT)
        throw new Error("Maximum lock count exceeded");
    if (!readerShouldBlock(current) && compareAndSetState(c, c + SHARED_UNIT)){
        HoldCounter rh = cachedHoldCounter; //首先访问缓存
        if (rh == null || rh.tid != current.getId())
            cachedHoldCounter = rh = readHolds.get();
        rh.count++;
        return 1;
    }
    return fullTryAcquireShared(current);
}

这里注意到在读锁的获取和释放都用到了一个HoldCounter类,下面会介绍.

6.5 HoldCounter

在ReentrantReadWriteLock.Sync中有包含几个内部类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
 * 每个线程特定的 read 持有计数。存放在ThreadLocal
 */
static final class HoldCounter {
    int count;
    // 用id而不是引用是为了避免保留垃圾
    final long tid = Thread.currentThread().getId();
    /** Decrement if positive; return previous value */
    int tryDecrement() {
        int c = count;
        if (c > 0)
            count = c - 1;
        return c;
    }
}

/**
 * ThreadLocal subclass. Easiest to explicitly define for sake
 * of deserialization mechanics.
 */
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

static abstract class Sync extends AbstractQueuedSynchronizer {
    //当前线程持有读锁的数目
    transient ThreadLocalHoldCounter readHolds;

    /**
     * 最近一个成功获取读锁的线程的计数。
     * 通常情况,下一个要释放的线程是最后一个获取线程
     */
    transient HoldCounter cachedHoldCounter;
}

可以看到这里使用ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程Id,这样在释放锁的时候才能知道ReadWriteLock里面缓存的上一个读取线程(cachedHoldCounter)是否是当前线程。这样做的好处是可以减少ThreadLocal.get()的次数,因为这也是一个耗时操作。

需要说明的是这样HoldCounter绑定线程id而不绑定线程对象的原因是避免HoldCounter和ThreadLocal互相绑定而GC难以释放它们,所以其实这样做只是为了帮助GC快速回收对象而已。

7.参考

http://tutorials.jenkov.com/java-concurrency/locks.html http://tutorials.jenkov.com/java-concurrency/starvation-and-fairness.html http://ifeve.com/java-concurrency-thread-directory/ http://coderbee.net/index.php/concurrent/20131209/618

Java Atomic

在多线程环境下,Java里面的++i或者–i操作不是线程安全的,因为它其实包含了三个操作:获取当前值,++或者–,写回. 在没有额外资源可以利用的情况下,只能使用加锁才能保证三个操作时“原子性”的.

在Java中,协调对线程间共享字段的访问的传统方法是使用同步,确保完成对共享字段的所有访问,同时具有适当的锁定。带来的问题主要是锁定竞争太厉害(线程常常在其他线程具有锁定时要求获得该锁定那么该线程将被阻塞,直到该锁定可用),会损害吞吐量,因为竞争的同步非常昂贵,另外还存在死锁等问题。对于现代 JVM 而言,无竞争的同步现在非常便宜。

java.util.concurrent.atomic包就是提供原子操作的类的小工具包,支持在单个变量上不用锁定(Lock-Free)的线程安全编程。

1.Atomic整体介绍

1.1 Boolean,Integer,Long和Reference的Atomic版本:

AtomicBoolean
AtomicInteger
AtomicLong
AtomicReference

1.2 Integer,Long,Reference的数组版本:

AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray

1.3 基于反射的原子更新字段操作:

AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater

约束:

1. 字段必须是volatile类型的;
2. 字段的描述类型(修饰符public/protected/default/private)是与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段;
3. 只能是实例变量,不能是类变量(static变量);
4. 只能是可修改变量,不能使final变量,因为final的语义就是不可修改,实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在;
5. 对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long),如果要修改包装类型就需要使用AtomicReferenceFieldUpdater;

1.4 AtomicMarkableReference与AtomicStampedReference

AtomicMarkableReference
AtomicStampedReference

AtomicMarkableReference类描述的一个<Object,Boolean>的对,可以原子的修改Object或者Boolean的值,这种数据结构在一些缓存或者状态描述中比较有用。这种结构在单个或者同时修改Object/Boolean的时候能够有效的提高吞吐量。

AtomicStampedReference类维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。对比AtomicMarkableReference 类的<Object,Boolean>,AtomicStampedReference 维护的是一种类似<Object,int>的数据结构,其实就是对对象(引用)的一个并发计数。但是与AtomicInteger 不同的是,此数据结构可以携带一个对象引用(Object),并且能够对此对象和计数同时进行原子操作。

在后面的章节中会提到“ABA问题”,而AtomicMarkableReference/AtomicStampedReference 在解决“ABA问题”上很有用。

2.AtomicInteger实现

其内部实现不是简单的使用synchronized,而是一个更为高效的方式CAS (compare and swap) + volatile和native方法,从而避免了synchronized的高开销,执行效率大为提升。

native的意思是使用Unsafe类,Unsafe类的功能函数是native的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class AtomicInteger extends Number implements java.io.Serializable {

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;
}

内部使用unsafe实现,value存真正的int值,valueOffset存value在AtomicInteger对象内的内存偏移地址.

2.1 volatile语义

volatile相当于synchronized的弱实现,也就是说volatile实现了类似synchronized的语义,却又没有锁机制.它确保对volatile字段的更新以可预见的方式告知其他的线程.

volatile包含以下语义:

(1).Java存储模型不会对valatile指令的操作进行重排,这个保证对volatile变量的操作时按照指令的出现顺序执行的.
(2).volatile变量不会被缓存在寄存器中(只有拥有线程可见)或者其他对CPU不可见的地方,每次总是从主存中读取volatile变量的结果.也就是说对于volatile变量的修改,其它线程总是可见的,并且不是使用自己线程栈内部的变量。也就是在happens-before法则中,对一个valatile变量的写操作后,其后的任何读操作理解可见此写操作的结果.

尽管volatile变量的特性不错,但是volatile并不能保证线程安全的,也就是说volatile字段的操作不是原子性的,volatile变量只能保证可见性(一个线程修改后其它线程能够理解看到此变化后的结果).

2.2 常用函数

getAndIncrement()相当于i++

incrementAndGet()相当于++i

getAndDecrement()相当于i–

decrementAndGet()相当于–i

2.3 特殊函数

2.3.1 set() VS lazySet

set设置为给定值,直接修改原始值;

lazySet延时设置变量值,这个等价于set()方法,但是由于value是volatile类型的,因此此字段的修改会比普通字段(非volatile字段)有稍微的性能延时(尽管可以忽略),所以如果不是想立即读取设置的新值,允许在“后台”修改值,那么此方法就很有用.

2.3.2 compareAndSet() VS weakCompareAndSet

接受2个参数,一个是期望数据(expected),一个是新数据(new);如果atomic里面的数据和期望数据一 致,则将新数据设定给atomic的数据,返回true,表明成功;否则就不设定,并返回false。

按照JSR规范,调用weakCompareAndSet时并不能保证不存在happen- before的发生(也就是可能存在指令重排序导致此操作失败).

实际实现中,compareAndSet()和weakCompareAndSet没有区别:

1
2
3
4
5
6
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

3.CAS操作

在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致有锁.

锁机制存在以下问题:

(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。
(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。
(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。

volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。

3.1 悲观锁和乐观锁

独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁;

而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。

3.2 比较并交换

上面的乐观锁用到的机制就是比较并交换(CAS,Compare and Swap).

CAS 操作包含三个操作数:内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。通常将 CAS 用于同步的方式是从地址 V 读取值 A,执行多步计算来获得新值 B,然后使用 CAS 将 V 的值从 A 改为 B。如果 V 处的值尚未同时更改,则 CAS 操作成功。

类似于 CAS 的指令允许算法执行读-修改-写操作,而无需害怕其他线程同时修改变量,因为如果其他线程修改变量,那么 CAS 会检测它(并失败),算法可以对该操作重新计算。

3.3 硬件同步原语

大多数现代处理器都包含对多处理的支持。当然这种支持包括多处理器可以共享外部设备和主内存,同时它通常还包括对指令系统的增加来支持多处理的特殊要求。特别是,几乎每个现代处理器都有通过可以检测或阻止其他处理器的并发访问的方式来更新共享变量的指令.

拿出AtomicInteger来研究在没有锁的情况下是如何做到数据正确性的。

首先毫无以为,在没有锁的机制下可能需要借助volatile原语,保证线程间的数据是可见的(共享的)。这样才获取变量的值的时候才能直接读取。

1
2
3
4
5
private volatile int value;
...
public final int get() {
        return value;
}

然后来看看++i是怎么做到的。

1
2
3
4
5
6
7
8
public final int incrementAndGet() {
    for (;;) {  //重试
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}

在这里采用了CAS操作,每次从内存中读取数据然后将此数据和+1后的结果进行CAS操作,如果成功就返回结果,否则重试直到成功为止。

而compareAndSet利用JNI来完成CPU指令的操作。

1
2
3
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

整体的过程就是这样子的,利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法。

4.指令重排

Java语言规范规定了JVM线程内部维持顺序化语义,也就是说只要程序的最终结果等同于它在严格的顺序化环境下的结果,那么指令的执行顺序就可能与代码的顺序不一致,这个过程通过叫做指令的重排序。

指令重排序存在的意义在于:JVM能够根据处理器的特性(CPU的多级缓存系统、多核处理器等)适当的重新排序机器指令,使机器指令更符合CPU的执行特点,最大限度的发挥机器的性能。

5.Happen-Before

Java语言中有一个“先行发生”(happen—before)的规则,它是Java内存模型中定义的两项操作之间的偏序关系,如果操作A先行发生于操作B,其意思就是说,在发生操作B之前,操作A产生的影响都能被操作B观察到,“影响”包括修改了内存中共享变量的值、发送了消息、调用了方法等,它与时间上的先后发生基本没有太大关系。这个原则特别重要,它是判断数据是否存在竞争、线程是否安全的主要依据。

举例来说,假设存在如下三个线程,分别执行对应的操作:

线程A中执行如下操作:i=1
线程B中执行如下操作:j=i
线程C中执行如下操作:i=2

假设线程A中的操作”i=1“ happen—before线程B中的操作“j=i”,那么就可以保证在线程B的操作执行后,变量j的值一定为1,即线程B观察到了线程A中操作“i=1”所产生的影响;现在,我们依然保持线程A和线程B之间的happen—before关系,同时线程C出现在了线程A和线程B的操作之间,但是C与B并没有happen—before关系,那么j的值就不确定了,线程C对变量i的影响可能会被线程B观察到,也可能不会,这时线程B就存在读取到不是最新数据的风险,不具备线程安全性。

6.ABA 问题

CAS操作通常存在ABA问题.

因为在更改 V 之前,CAS 主要询问“V 的值是否仍为 A”,所以在第一次读取 V 以及对 V 执行 CAS 操作之前,如果将值从 A 改为 B,然后再改回 A,会使基于 CAS 的算法混乱。在这种情况下,CAS 操作会成功,但是在一些情况下,结果可能不是您所预期的。这类问题称为 ABA 问题,通常通过将标记或版本编号与要进行 CAS 操作的每个值相关联,并原子地更新值和标记,来处理这类问题。AtomicStampedReference 类支持这种方法.

7.参考

https://www.ibm.com/developerworks/cn/java/j-jtp11234/