xiaobaoqiu Blog

Think More, Code Less

Java Concurrency in Practice 6 : 任务执行

第六章:任务执行

1.在线程中执行任务

服务器应用:良好的吞吐量和快速的响应性.应用程序应该在负荷过载时候平缓地劣化.

2.Executor框架

将任务的提交和任务的执行体解藕.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TaskExecutionWebService {
    /**
     * 线程池大小
     */
    private static final int POOL_SIZE = 100;

    private static final Executor executor = Executors.newFixedThreadPool(POOL_SIZE);

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(80);
        while(true){
            final Socket socket = serverSocket.accept();
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    handRequest(socket);
                }
            };

            executor.execute(runnable);
        }
    }

    private static void handRequest(Socket socket){
        //TODO
    }
}

线程池

优点:重用存在的线程;任务达到时候,不需要等待线程创建;

Executors提供了一些静态方法: 1. newFixedThreadPool(int nThreads):最多nThreads个线程,当某一个线程由于非预期的Exception而结束,线程池会补充一个新的线程; 2. newSingleThreadExecutor():创建一个单线程化的executor,如果这个线程异常结束,会有量外一个取代它; 3. newCachedThreadPool():创建可缓存的线程池,如果当前线程池的长度超过处理的需要时候,它可以灵活的收回空闲的线程;当需求增加时候,它可以灵活的添加新的线程,不对池的长度做任何限制; 4. newScheduledThreadPool(int corePoolSize):创建一个定长的线程池,支持定时的周期性的任务执行,类似与Timer;

Executor的生命周期

Executor是异步的执行任务,所有在任何时间里,所有之前提交的任务的状态都不可能立即可见,这些任务中,有些可能已经完成,有些可能正在运行,有些还可能还在队列中等待执行;

关闭应用程序时,程序会出现很多种情况:最平缓的关闭(任务全部完成而且没有任何新的工作)到最唐突的关闭(拔掉电源),以及介于这两个极端之前的各种可能.

为了解决执行服务的声明周期问题,ExecutorService接口扩展了Executoe,添加了一些用于生命周期管理的方法.

1
2
3
4
5
6
public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();
    ...
}

ExecutorService暗示了声明周期的3种状态:运行,关闭和终止.ExecutorService创建后的初始状态是运行状态.shutdown()方法会启动一个平缓的关闭:停止接受新的任务,同时等待已提交的任务执行完成,包括尚未开始的任务.shutdownNow()方法会启动一个强制的关闭过程:尝试取消所有运行中的任务和排在队列中尚未开始的任务.

我们可以awaitTermination()等待ExecutorService到达中止状态,也可以轮询isTerminated()判断ExecutorService是否已经终止.

延迟的并据周期性的任务

考虑用ScheduledThreadPoolExecutor代替Timer,ScheduledThreadPoolExecutor可以用Executors.newScheduledThreadPool()工厂方法创建一个ScheduledThreadPoolExecutor.

调度线程池(Scheduled Thread Pool)让我们可以提供多个线程来执行延迟的周星任务,而Timer是单线程的.

Timer的另外一个问题是如果TimerTask异常会终止timer线程.

DelayQueue管理着一个包含Delayed对象的容器,每个Delayed对象都与一个延迟时间相关联:只有在元素过期胡,DelayQueue才能让你执行take获取元素.从DelayQueue中返回的对象将依据它们所延迟的时间进行排序.

3.寻找可强化的并行性

可携带结果的任务:Callable和Future

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService的所有submit方法都返回Future:

1
2
3
4
5
6
7
8
9
public interface ExecutorService extends Executor {
    ...
    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
    ...
}

CompletionService

CompletionService整合了Executor和BlockingQUeue的功能.可以将Callable任务提交给它执行,染红使用类似于队列的take和poll方法,在结果完整(Future的get正确返回)可用时获得这个结果.ExecutorCompletionService是实现CompletionService接口的一个类,它将计算任务委托给一个Executor.

1
2
3
4
5
6
7
8
9
10
11
12
public interface CompletionService<V> {

    Future<V> submit(Callable<V> task);

    Future<V> submit(Runnable task, V result);

    Future<V> take() throws InterruptedException;

    Future<V> poll();

    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

使用CompletionService完成页面渲染器(包含文本渲染,图片渲染,图片需要先下载):每需要下载一个图像,就创建一个独立的任务,在线程池中执行,将顺序的图片下载转换为并行.从CompletionService中获取结构,只要任何一个图像下载完成,就立刻展示.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Render {

    private ExecutorService executorService;

    public Render(ExecutorService executorService) {
        this.executorService = executorService;
    }

    /**
     * 渲染页面,包括文本和图片
     * 
     * @param source
     */
    void renderPage(CharSequence source) {
        final List<ImageInfo> imageInfoList = scanImageInfoList(source);
        CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executorService);

        /** 建立下载图片的任务,提交到CompletionService */
        for (final ImageInfo imageInfo : imageInfoList) {
            completionService.submit(new Callable<ImageData>() {
                @Override
                public ImageData call() throws Exception {
                    return imageInfo.downloadImage();
                }
            });
        }

        /** 渲染文本 */
        renderText(source);

        /** 获取图片下载结果,渲染图片 */
        try{
            for (int i = 0; i < imageInfoList.size(); i++) {
                Future<ImageData> future = completionService.take();
                renderImage(future.get());
            }
        }catch(InterruptedException ie){
            Thread.currentThread().interrupt();
        }catch(ExecutionException ee){
            //TODO
        }
    }
}

为任务设置时限

某些任务需要在一定时间内返回结构,超时则结果无意义.

Future的带限时版本的get方法满足需求,超时会抛出TimeoutException.

使用限时任务的另外一个问题是,当它们超时的时候,应该停止他们无意义的继续计算.Future.get()抛出TimeoutException异常,则可以通过Future取消任务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * 带广告的页面渲染,广告超时未获得则不显示广告
 */
Page getRenderPageWithAd() {
    long endNanos = System.currentTimeMillis() + TIME_BUDGET;
    /** 获取广告的future */
    Future<Ad> future = executorService.submit(new FetchAdTask<Ad>());

    /** 获取原始待渲染页面 */
    Page page = getRenderPage();

    /** 获取广告 */
    Ad ad;
    try {
        long  timeLeft = endNanos - System.currentTimeMillis();
        ad = future.get(timeLeft, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ie) {
        ad = DEFAULT_AD;
        Thread.currentThread().interrupt();
    } catch (ExecutionException ee) {
        ad = DEFAULT_AD
    }catch (TimeoutException te){
        /** 超时,放弃广告,并取消获取广告的任务 */
        ad = DEFAULT_AD;
        future.cancel(true);
    }

    page.setAd(ad);
    return page;
}