xiaobaoqiu Blog

Think More, Code Less

Java Concurrency in Practice 5 : Building Blocks

第五章:构建块(Building Blocks) java类库中并发构建块的集合.

1.同步容器

同步容器包含两部分:

(1).Vector和HashTable;

(2).Collections.synchronizedXxx工厂方法创建的容器;

在迭代器件,对容器加锁会导致其他线程需要访问容器时候,必须等待知道迭代结束,另外doSomething中可能还持有另外一个锁,这就可能造成死锁.

1
2
3
4
5
synchronized (vector) {
        for (Integer item : vector){
            doSomething(item);
        }
    }

在迭代器期间,对容器加锁的一个替代办法是复制容器,但需要考虑器带来的性能开销.

2.并发容器

同步容器通过对容器的所有状态进行串行访问,从而实现了他们的线程安全.

这样的代价就是削弱了并发性,当多个线程公摊竞争容器级别的锁时,吞吐量就会降低.

而并发容器正式为多线程并发访问而设计的,比如ConcurrentHashMap替代HashMap;当多数操作为读取的时候用CopyOnWriteArrayList代替List;ConcurrentLinkedQueue是一个具有优先级顺序的队列(非并发),PriorityQueue的操作不会阻塞,如果队列时空,则从队列中获取元素的操作返回null;BlockingQueue扩展了Queue,增加了可阻塞的插入和获取操作,如果队列为空,一个获取操作会一致阻塞知道队列中存在可用元素,如果队列是满的,插入操作会一致阻塞知道队列中存在可用空间;ConcurrentSkipListMap和ConcurrentSkipListSet用来作为同步的Sorted Map和SortedSet的并发替代并.

ConcurrentHashMap

在HashTable和SynchronizedMap中,获取Map的锁就可以防止任何其他线程访问该Map(独占),ConcurrentHashMap使用了一个更细化的锁机制(桶级别的锁),叫分离锁,它允许更深层次的共享访问,即任意数量的独显乘可以并发访问Map,读者和写着也可以并发访问Map,并写有限数量的写线程可以并发的修改Map,因而为并发刚问带来更高的吞吐量. ConcurrentHashMap增加的一些原子操作: 缺少即插入(key不存在的时候才插入):

1
2
3
4
5
6
public V putIfAbsent(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, true);
    }

相等便删除(只有当key和value匹配的时候才删除):

1
2
3
4
5
6
public boolean remove(Object key, Object value) {
        int hash = hash(key.hashCode());
        if (value == null)
            return false;
        return segmentFor(hash).remove(key, hash, value) != null;
}

相等便替换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//只有当key匹配某一个值才取代
public V replace(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).replace(key, hash, value);
}

//只有当key和oldValue匹配的时候才替换
public boolean replace(K key, V oldValue, V newValue) {
        if (oldValue == null || newValue == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).replace(key, hash, oldValue, newValue);
}

CopyOnWriteArrayList

CopyOnWriteArrayList避免了在迭代器件对容器加锁.显然,每次容器改变是复制需要开销,特别是容器比较大的时候,当对容器迭代操作频率远远高于对容器的修改的频率 时候,使用写时复制容器是一个合理的选择.

BlockingQueue

阻塞队列(BlockingQueue)提供了可阻塞的put和take方法,同时也提供了可定时的offer和poll方法:

1
2
3
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

E poll(long timeout, TimeUnit unit) throws InterruptedException;

Queue的长度可以有限,也可以无限.BlockingQueue的实现包括:LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列;PriorityBlockingQueue是一个按优先级顺序排序的队列,支持自定义Comparator;SynchronousQueue根本上不是一个整整意义的队列,因为它不会为队列元素维护任何存储空间.其中每个 put 必须等待一个 take,反之亦然.

生产者-消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class FileCrawler implements Runnable {
    private final BlockingQueue<File> fileQueue;
    private final File root;

    public FileCrawler(BlockingQueue<File> fileQueue, File root) {
        this.fileQueue = fileQueue;
        this.root = root;
    }

    @Override
    public void run() {
        try {
            crawl(root);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void crawl(File root) throws InterruptedException {
        File[] files = root.listFiles();
        if (files != null && files.length > 0) {
            for (File file : files) {
                if (file.isDirectory())
                    crawl(file);
                else if (alreadyIndexed(file) == false)
                    fileQueue.put(file);
            }
        }
    }

    private boolean alreadyIndexed(File file) {
        //TODO
        return false;
    }
}

public class FileIndexer implements Runnable {

    private final BlockingQueue<File> fileQueue;

    public FileIndexer(BlockingQueue<File> fileQueue) {
        this.fileQueue = fileQueue;
    }

    @Override
    public void run() {
        try {
            while (true)
                index(fileQueue.take());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void index(File file) {
        //TODO : do index
    }
}

public class CrawlAndIndex {
    public static void main(String[] args) {
        final int MAX_QUEUE_SIZE = 1024;
        final int INDEXER_COUNT = 100;

        final File[] roots = getFiles();

        BlockingQueue<File> fileQueue = new LinkedBlockingDeque<File>(MAX_QUEUE_SIZE);

        for (File file : roots)
            new Thread(new FileCrawler(fileQueue, file)).start();

        for(int i=0; i<INDEXER_COUNT; i++)
            new Thread(new FileIndexer(fileQueue)).start();

    }

    private static File[] getFiles() {
        //TODO
        return null;
    }
}

阻塞和可中断的方法

线程可能会因为几种原因被阻塞或者暂停:等待IO操作结束,等待获的一个锁,等待从Thread.sleep()中唤醒,或者等待另外一个线程的计算结果.

当一个线程被阻塞时候,它通常被挂起,并设置乘线程阻塞的某一个状态(BLOCKED,WAITING或者TIMED_WAITING).

对于InterruptedException,通常最明智的策略是把这个InterruptedException传递给你的调用者.

恢复中断:有时还我们不能抛出InterruptedException,比如代码是Runnable的一部分,这是很,我们需要捕获InterruptedException并在当前线程中通过调用interrupt()从中断中恢复.如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TaskRunnable implements Runnable {

    private final BlockingQueue<Task> taskQueue;

    public TaskRunnable(BlockingQueue<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }

    @Override
    public void run() {
        try {
            while (true)
                process(taskQueue.take());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); //恢复中断状态
        }
    }
}

3.Synchronizer

Synchronizer是一个对象,它根据自身状态调节线程的控制流.

阻塞队列可以扮演一个Synchronizer角色,其他类型的Synchronizer包括信号量(semaphore),关卡(barrier)以及闭锁(latch).

所有的Synchronizer都享有类似的结构特征:他们封装状态,这些庄套决定线程执行到某一点是通过还是被迫等待;他们还提供了操控状态的方法,以及高校地等到Synchronizer进入到期望状态的方法.

闭锁

闭锁可以延迟线程的进度直到进程到达终止(termnal)状态.

一个闭锁就像一道大门:在闭锁达到终点状态之前,门一致是关闭的,没有线程能够通过,终点状态到来的时候,门卡了,允许所有都通过.

一旦闭锁到达终点状态,它就不能在改变状态了,所有它会永远保持趟开状态.

闭锁可以用来确保特定活动在其他活动完成之后才发生,如: 1. 确保一个计算不会执行,直到它需要的资源都被初始化完成; 2. 确保一个服务不会开始,直到它依赖的其他服务都已经开始; 3. 瞪大,直到活动的所有部分窦唯继续处理做好准备(如CS对战中所有玩家都准备就绪);

CountDownLatch是一个闭锁实现,允许一个或多个线程等待事件集的发生.包含一个计数器,表示需要等待的事件数,countDown方法对计算器做减操作,表示一个事件已经发生,await方法等待计算机达到0,此时表示所有事件都已经发生.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        task.run();
                    } finally {
                        endGate.countDown();
                    }
                }
            };
            t.start();
        }

        long start = System.nanoTime();
        endGate.await();
        long end = System.nanoTime();
        System.out.println("All Job Finish, use time(nano) = " + (end - start));

        return end - start;
    }

    public void doTask() throws InterruptedException{
        timeTasks(10, new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep((long) (1000 * Math.random()));
                    System.out.println("Finished.");
                } catch (InterruptedException e) {
                }
            }
        });
    }

FutureTask

FutureTask描述库一个抽象的可携带结果的计算.FutureTask通过Callable实现的,它等价与一个可携带结果的Runnable,并且有上那个状态:等待,运行和完成.一旦FutureTask进入完成状态,则会永远停止在这个状态上.

Future.get行为依赖与任务的状态,如果它已经完成,get可以立刻得到返回结果,否在会阻塞知道任务转入完成状态,然后返回结构或者抛出异常.

Executor框架利用FuntureTask来完成异步任务,并可以用来计算任何潜在的耗时计算,而且可以在真正要计算结构之前就启动他们开始计算.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Preload {
    private final FutureTask<BigDecimal> futureTask = new FutureTask<BigDecimal>(new Callable<BigDecimal>() {
        @Override
        public BigDecimal call() throws Exception {
            return timeConsumingWork();
        }
    });

    private final Thread thread = new Thread(futureTask);

    public void start() {
        thread.start();
    }

    private BigDecimal get(){
        try{
            return futureTask.get();    //获取数据
        }catch(ExecutionException e){
            Throwable cause = e.getCause();
            //处理各种原因的异常
            return null;
        }catch (InterruptedException e){
            return null;
        }
    }

    /**
     * 耗时的计算
     * 
     * @return
     */
    private BigDecimal timeConsumingWork() {
        // TODO
        return null;
    }
}

Callable记录的这些任务,可以抛出受检查或者未受检查的异常,无论执行任务的代码抛出什么,它都被封装为一个ExecutionException,并被Future.get重新抛出.

信号量

计算信号量(Counting semaphore)用来控制能够同时访问某特定资源的活动的数量,或者同时执行某一给定操作的数量.可以用来实现资源池或者给一个容器限定边界.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        sem.acquire();  //获得许可
        boolean added = false;

        try {
            added = set.add(o);
            return added;
        }finally {
            if(!added)
                sem.release();
        }
    }

    public boolean remove(Object o){
        boolean removed = set.remove(o);
        if(removed)
            sem.release();  //将许可放回资源池
        return removed;
    }
}

关卡

关卡(Barrier)类似于闭锁,不同之处在于,所有线程必须同时到达关卡点,才能继续处理.

闭锁等待是事件,关卡等待的是其他线程.

CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.其实现协议类似于一些朋友指定的集合地点:“我们每个人6:00在麦当劳见,到了以后不见不散,之后我们再决定接下来做什么”.

CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点.如果所有线程都到达了关卡点,关卡就被成功的突破,这样所有的线程都被释放,关卡会重置以备下一次使用.

Exchanger是关卡的另外一种形式.

高速缓存

为一个昂贵的函数缓存计算结果.Map中存Future而不是具体的数值,这样避免重复计算.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public interface Computable<A, V> {
    V compute(A arg) throws InterruptedException;
}

public class Memoizer<A, V> implements Computable<A, V> {

    private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();

    private final Computable<A, V> c;

    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }

    @Override
    public V compute(final A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if(f == null){
                Callable<V> eval = new Callable<V>() {
                    @Override
                    public V call() throws Exception {
                        return c.compute(arg);
                    }
                };

                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg, ft);

                if(f == null){
                    f = ft;
                    ft.run();
                }
            }
            try{
                return f.get();
            }catch (CancellationException e){
                cache.remove(arg, f);
            }catch (ExecutionException e){
                //TODO:根据 e.getCause() 处理
            }
        }
    }
}