第五章:构建块(Building Blocks)
java类库中并发构建块的集合.
1.同步容器
同步容器包含两部分:
(1).Vector和HashTable;
(2).Collections.synchronizedXxx工厂方法创建的容器;
在迭代器件,对容器加锁会导致其他线程需要访问容器时候,必须等待知道迭代结束,另外doSomething中可能还持有另外一个锁,这就可能造成死锁.
1
2
3
4
5
synchronized ( vector ) {
for ( Integer item : vector ){
doSomething ( item );
}
}
在迭代器期间,对容器加锁的一个替代办法是复制容器,但需要考虑器带来的性能开销.
2.并发容器
同步容器通过对容器的所有状态进行串行访问,从而实现了他们的线程安全.
这样的代价就是削弱了并发性,当多个线程公摊竞争容器级别的锁时,吞吐量就会降低.
而并发容器正式为多线程并发访问而设计的,比如ConcurrentHashMap替代HashMap;当多数操作为读取的时候用CopyOnWriteArrayList代替List;ConcurrentLinkedQueue是一个具有优先级顺序的队列(非并发),PriorityQueue的操作不会阻塞,如果队列时空,则从队列中获取元素的操作返回null;BlockingQueue扩展了Queue,增加了可阻塞的插入和获取操作,如果队列为空,一个获取操作会一致阻塞知道队列中存在可用元素,如果队列是满的,插入操作会一致阻塞知道队列中存在可用空间;ConcurrentSkipListMap和ConcurrentSkipListSet用来作为同步的Sorted Map和SortedSet的并发替代并.
ConcurrentHashMap
在HashTable和SynchronizedMap中,获取Map的锁就可以防止任何其他线程访问该Map(独占),ConcurrentHashMap使用了一个更细化的锁机制(桶级别的锁),叫分离锁,它允许更深层次的共享访问,即任意数量的独显乘可以并发访问Map,读者和写着也可以并发访问Map,并写有限数量的写线程可以并发的修改Map,因而为并发刚问带来更高的吞吐量.
ConcurrentHashMap增加的一些原子操作:
缺少即插入(key不存在的时候才插入):
1
2
3
4
5
6
public V putIfAbsent ( K key , V value ) {
if ( value == null )
throw new NullPointerException ();
int hash = hash ( key . hashCode ());
return segmentFor ( hash ). put ( key , hash , value , true );
}
相等便删除(只有当key和value匹配的时候才删除):
1
2
3
4
5
6
public boolean remove ( Object key , Object value ) {
int hash = hash ( key . hashCode ());
if ( value == null )
return false ;
return segmentFor ( hash ). remove ( key , hash , value ) != null ;
}
相等便替换:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//只有当key匹配某一个值才取代
public V replace ( K key , V value ) {
if ( value == null )
throw new NullPointerException ();
int hash = hash ( key . hashCode ());
return segmentFor ( hash ). replace ( key , hash , value );
}
//只有当key和oldValue匹配的时候才替换
public boolean replace ( K key , V oldValue , V newValue ) {
if ( oldValue == null || newValue == null )
throw new NullPointerException ();
int hash = hash ( key . hashCode ());
return segmentFor ( hash ). replace ( key , hash , oldValue , newValue );
}
CopyOnWriteArrayList
CopyOnWriteArrayList避免了在迭代器件对容器加锁.显然,每次容器改变是复制需要开销,特别是容器比较大的时候,当对容器迭代操作频率远远高于对容器的修改的频率 时候,使用写时复制容器是一个合理的选择.
BlockingQueue
阻塞队列(BlockingQueue)提供了可阻塞的put和take方法,同时也提供了可定时的offer和poll方法:
1
2
3
boolean offer ( E e , long timeout , TimeUnit unit ) throws InterruptedException ;
E poll ( long timeout , TimeUnit unit ) throws InterruptedException ;
Queue的长度可以有限,也可以无限.BlockingQueue的实现包括:LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列;PriorityBlockingQueue是一个按优先级顺序排序的队列,支持自定义Comparator;SynchronousQueue根本上不是一个整整意义的队列,因为它不会为队列元素维护任何存储空间.其中每个 put 必须等待一个 take,反之亦然.
生产者-消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class FileCrawler implements Runnable {
private final BlockingQueue < File > fileQueue ;
private final File root ;
public FileCrawler ( BlockingQueue < File > fileQueue , File root ) {
this . fileQueue = fileQueue ;
this . root = root ;
}
@Override
public void run () {
try {
crawl ( root );
} catch ( InterruptedException e ) {
Thread . currentThread (). interrupt ();
}
}
private void crawl ( File root ) throws InterruptedException {
File [] files = root . listFiles ();
if ( files != null && files . length > 0 ) {
for ( File file : files ) {
if ( file . isDirectory ())
crawl ( file );
else if ( alreadyIndexed ( file ) == false )
fileQueue . put ( file );
}
}
}
private boolean alreadyIndexed ( File file ) {
//TODO
return false ;
}
}
public class FileIndexer implements Runnable {
private final BlockingQueue < File > fileQueue ;
public FileIndexer ( BlockingQueue < File > fileQueue ) {
this . fileQueue = fileQueue ;
}
@Override
public void run () {
try {
while ( true )
index ( fileQueue . take ());
} catch ( InterruptedException e ) {
Thread . currentThread (). interrupt ();
}
}
private void index ( File file ) {
//TODO : do index
}
}
public class CrawlAndIndex {
public static void main ( String [] args ) {
final int MAX_QUEUE_SIZE = 1024 ;
final int INDEXER_COUNT = 100 ;
final File [] roots = getFiles ();
BlockingQueue < File > fileQueue = new LinkedBlockingDeque < File >( MAX_QUEUE_SIZE );
for ( File file : roots )
new Thread ( new FileCrawler ( fileQueue , file )). start ();
for ( int i = 0 ; i < INDEXER_COUNT ; i ++)
new Thread ( new FileIndexer ( fileQueue )). start ();
}
private static File [] getFiles () {
//TODO
return null ;
}
}
阻塞和可中断的方法
线程可能会因为几种原因被阻塞或者暂停:等待IO操作结束,等待获的一个锁,等待从Thread.sleep()中唤醒,或者等待另外一个线程的计算结果.
当一个线程被阻塞时候,它通常被挂起,并设置乘线程阻塞的某一个状态(BLOCKED,WAITING或者TIMED_WAITING).
对于InterruptedException,通常最明智的策略是把这个InterruptedException传递给你的调用者.
恢复中断:有时还我们不能抛出InterruptedException,比如代码是Runnable的一部分,这是很,我们需要捕获InterruptedException并在当前线程中通过调用interrupt()从中断中恢复.如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TaskRunnable implements Runnable {
private final BlockingQueue < Task > taskQueue ;
public TaskRunnable ( BlockingQueue < Task > taskQueue ) {
this . taskQueue = taskQueue ;
}
@Override
public void run () {
try {
while ( true )
process ( taskQueue . take ());
} catch ( InterruptedException e ) {
Thread . currentThread (). interrupt (); //恢复中断状态
}
}
}
3.Synchronizer
Synchronizer是一个对象,它根据自身状态调节线程的控制流.
阻塞队列可以扮演一个Synchronizer角色,其他类型的Synchronizer包括信号量(semaphore),关卡(barrier)以及闭锁(latch).
所有的Synchronizer都享有类似的结构特征:他们封装状态,这些庄套决定线程执行到某一点是通过还是被迫等待;他们还提供了操控状态的方法,以及高校地等到Synchronizer进入到期望状态的方法.
闭锁
闭锁可以延迟线程的进度直到进程到达终止(termnal)状态.
一个闭锁就像一道大门:在闭锁达到终点状态之前,门一致是关闭的,没有线程能够通过,终点状态到来的时候,门卡了,允许所有都通过.
一旦闭锁到达终点状态,它就不能在改变状态了,所有它会永远保持趟开状态.
闭锁可以用来确保特定活动在其他活动完成之后才发生,如:
1. 确保一个计算不会执行,直到它需要的资源都被初始化完成;
2. 确保一个服务不会开始,直到它依赖的其他服务都已经开始;
3. 瞪大,直到活动的所有部分窦唯继续处理做好准备(如CS对战中所有玩家都准备就绪);
CountDownLatch是一个闭锁实现,允许一个或多个线程等待事件集的发生.包含一个计数器,表示需要等待的事件数,countDown方法对计算器做减操作,表示一个事件已经发生,await方法等待计算机达到0,此时表示所有事件都已经发生.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public long timeTasks ( int nThreads , final Runnable task ) throws InterruptedException {
final CountDownLatch endGate = new CountDownLatch ( nThreads );
for ( int i = 0 ; i < nThreads ; i ++) {
Thread t = new Thread () {
@Override
public void run () {
try {
task . run ();
} finally {
endGate . countDown ();
}
}
};
t . start ();
}
long start = System . nanoTime ();
endGate . await ();
long end = System . nanoTime ();
System . out . println ( "All Job Finish, use time(nano) = " + ( end - start ));
return end - start ;
}
public void doTask () throws InterruptedException {
timeTasks ( 10 , new Runnable () {
@Override
public void run () {
try {
Thread . sleep (( long ) ( 1000 * Math . random ()));
System . out . println ( "Finished." );
} catch ( InterruptedException e ) {
}
}
});
}
FutureTask
FutureTask描述库一个抽象的可携带结果的计算.FutureTask通过Callable实现的,它等价与一个可携带结果的Runnable,并且有上那个状态:等待,运行和完成.一旦FutureTask进入完成状态,则会永远停止在这个状态上.
Future.get行为依赖与任务的状态,如果它已经完成,get可以立刻得到返回结果,否在会阻塞知道任务转入完成状态,然后返回结构或者抛出异常.
Executor框架利用FuntureTask来完成异步任务,并可以用来计算任何潜在的耗时计算,而且可以在真正要计算结构之前就启动他们开始计算.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Preload {
private final FutureTask < BigDecimal > futureTask = new FutureTask < BigDecimal >( new Callable < BigDecimal >() {
@Override
public BigDecimal call () throws Exception {
return timeConsumingWork ();
}
});
private final Thread thread = new Thread ( futureTask );
public void start () {
thread . start ();
}
private BigDecimal get (){
try {
return futureTask . get (); //获取数据
} catch ( ExecutionException e ){
Throwable cause = e . getCause ();
//处理各种原因的异常
return null ;
} catch ( InterruptedException e ){
return null ;
}
}
/**
* 耗时的计算
*
* @return
*/
private BigDecimal timeConsumingWork () {
// TODO
return null ;
}
}
Callable记录的这些任务,可以抛出受检查或者未受检查的异常,无论执行任务的代码抛出什么,它都被封装为一个ExecutionException,并被Future.get重新抛出.
信号量
计算信号量(Counting semaphore)用来控制能够同时访问某特定资源的活动的数量,或者同时执行某一给定操作的数量.可以用来实现资源池或者给一个容器限定边界.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class BoundedHashSet < T > {
private final Set < T > set ;
private final Semaphore sem ;
public BoundedHashSet ( int bound ) {
this . set = Collections . synchronizedSet ( new HashSet < T >());
sem = new Semaphore ( bound );
}
public boolean add ( T o ) throws InterruptedException {
sem . acquire (); //获得许可
boolean added = false ;
try {
added = set . add ( o );
return added ;
} finally {
if (! added )
sem . release ();
}
}
public boolean remove ( Object o ){
boolean removed = set . remove ( o );
if ( removed )
sem . release (); //将许可放回资源池
return removed ;
}
}
关卡
关卡(Barrier)类似于闭锁,不同之处在于,所有线程必须同时到达关卡点,才能继续处理.
闭锁等待是事件,关卡等待的是其他线程.
CyclicBarrier更像一个水闸, 线程执行就想水流, 在水闸处都会堵住, 等到水满(线程到齐)了, 才开始泄流.其实现协议类似于一些朋友指定的集合地点:“我们每个人6:00在麦当劳见,到了以后不见不散,之后我们再决定接下来做什么”.
CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点.如果所有线程都到达了关卡点,关卡就被成功的突破,这样所有的线程都被释放,关卡会重置以备下一次使用.
Exchanger是关卡的另外一种形式.
高速缓存
为一个昂贵的函数缓存计算结果.Map中存Future而不是具体的数值,这样避免重复计算.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public interface Computable < A , V > {
V compute ( A arg ) throws InterruptedException ;
}
public class Memoizer < A , V > implements Computable < A , V > {
private final ConcurrentHashMap < A , Future < V >> cache = new ConcurrentHashMap < A , Future < V >>();
private final Computable < A , V > c ;
public Memoizer ( Computable < A , V > c ) {
this . c = c ;
}
@Override
public V compute ( final A arg ) throws InterruptedException {
while ( true ) {
Future < V > f = cache . get ( arg );
if ( f == null ){
Callable < V > eval = new Callable < V >() {
@Override
public V call () throws Exception {
return c . compute ( arg );
}
};
FutureTask < V > ft = new FutureTask < V >( eval );
f = cache . putIfAbsent ( arg , ft );
if ( f == null ){
f = ft ;
ft . run ();
}
}
try {
return f . get ();
} catch ( CancellationException e ){
cache . remove ( arg , f );
} catch ( ExecutionException e ){
//TODO:根据 e.getCause() 处理
}
}
}
}