第六章:任务执行
1.在线程中执行任务
服务器应用:良好的吞吐量和快速的响应性.应用程序应该在负荷过载时候平缓地劣化.
2.Executor框架
将任务的提交和任务的执行体解藕.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| public class TaskExecutionWebService {
/**
* 线程池大小
*/
private static final int POOL_SIZE = 100;
private static final Executor executor = Executors.newFixedThreadPool(POOL_SIZE);
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(80);
while(true){
final Socket socket = serverSocket.accept();
Runnable runnable = new Runnable() {
@Override
public void run() {
handRequest(socket);
}
};
executor.execute(runnable);
}
}
private static void handRequest(Socket socket){
//TODO
}
}
|
线程池
优点:重用存在的线程;任务达到时候,不需要等待线程创建;
Executors提供了一些静态方法:
1. newFixedThreadPool(int nThreads):最多nThreads个线程,当某一个线程由于非预期的Exception而结束,线程池会补充一个新的线程;
2. newSingleThreadExecutor():创建一个单线程化的executor,如果这个线程异常结束,会有量外一个取代它;
3. newCachedThreadPool():创建可缓存的线程池,如果当前线程池的长度超过处理的需要时候,它可以灵活的收回空闲的线程;当需求增加时候,它可以灵活的添加新的线程,不对池的长度做任何限制;
4. newScheduledThreadPool(int corePoolSize):创建一个定长的线程池,支持定时的周期性的任务执行,类似与Timer;
Executor的生命周期
Executor是异步的执行任务,所有在任何时间里,所有之前提交的任务的状态都不可能立即可见,这些任务中,有些可能已经完成,有些可能正在运行,有些还可能还在队列中等待执行;
关闭应用程序时,程序会出现很多种情况:最平缓的关闭(任务全部完成而且没有任何新的工作)到最唐突的关闭(拔掉电源),以及介于这两个极端之前的各种可能.
为了解决执行服务的声明周期问题,ExecutorService接口扩展了Executoe,添加了一些用于生命周期管理的方法.
1
2
3
4
5
6
| public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
...
}
|
ExecutorService暗示了声明周期的3种状态:运行,关闭和终止.ExecutorService创建后的初始状态是运行状态.shutdown()方法会启动一个平缓的关闭:停止接受新的任务,同时等待已提交的任务执行完成,包括尚未开始的任务.shutdownNow()方法会启动一个强制的关闭过程:尝试取消所有运行中的任务和排在队列中尚未开始的任务.
我们可以awaitTermination()等待ExecutorService到达中止状态,也可以轮询isTerminated()判断ExecutorService是否已经终止.
延迟的并据周期性的任务
考虑用ScheduledThreadPoolExecutor代替Timer,ScheduledThreadPoolExecutor可以用Executors.newScheduledThreadPool()工厂方法创建一个ScheduledThreadPoolExecutor.
调度线程池(Scheduled Thread Pool)让我们可以提供多个线程来执行延迟的周星任务,而Timer是单线程的.
Timer的另外一个问题是如果TimerTask异常会终止timer线程.
DelayQueue管理着一个包含Delayed对象的容器,每个Delayed对象都与一个延迟时间相关联:只有在元素过期胡,DelayQueue才能让你执行take获取元素.从DelayQueue中返回的对象将依据它们所延迟的时间进行排序.
3.寻找可强化的并行性
可携带结果的任务:Callable和Future
1
2
3
4
5
6
7
8
9
10
11
12
13
| public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
|
ExecutorService的所有submit方法都返回Future:
1
2
3
4
5
6
7
8
9
| public interface ExecutorService extends Executor {
...
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
...
}
|
CompletionService
CompletionService整合了Executor和BlockingQUeue的功能.可以将Callable任务提交给它执行,染红使用类似于队列的take和poll方法,在结果完整(Future的get正确返回)可用时获得这个结果.ExecutorCompletionService是实现CompletionService接口的一个类,它将计算任务委托给一个Executor.
1
2
3
4
5
6
7
8
9
10
11
12
| public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
|
使用CompletionService完成页面渲染器(包含文本渲染,图片渲染,图片需要先下载):每需要下载一个图像,就创建一个独立的任务,在线程池中执行,将顺序的图片下载转换为并行.从CompletionService中获取结构,只要任何一个图像下载完成,就立刻展示.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| public class Render {
private ExecutorService executorService;
public Render(ExecutorService executorService) {
this.executorService = executorService;
}
/**
* 渲染页面,包括文本和图片
*
* @param source
*/
void renderPage(CharSequence source) {
final List<ImageInfo> imageInfoList = scanImageInfoList(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executorService);
/** 建立下载图片的任务,提交到CompletionService */
for (final ImageInfo imageInfo : imageInfoList) {
completionService.submit(new Callable<ImageData>() {
@Override
public ImageData call() throws Exception {
return imageInfo.downloadImage();
}
});
}
/** 渲染文本 */
renderText(source);
/** 获取图片下载结果,渲染图片 */
try{
for (int i = 0; i < imageInfoList.size(); i++) {
Future<ImageData> future = completionService.take();
renderImage(future.get());
}
}catch(InterruptedException ie){
Thread.currentThread().interrupt();
}catch(ExecutionException ee){
//TODO
}
}
}
|
为任务设置时限
某些任务需要在一定时间内返回结构,超时则结果无意义.
Future的带限时版本的get方法满足需求,超时会抛出TimeoutException.
使用限时任务的另外一个问题是,当它们超时的时候,应该停止他们无意义的继续计算.Future.get()抛出TimeoutException异常,则可以通过Future取消任务.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| /**
* 带广告的页面渲染,广告超时未获得则不显示广告
*/
Page getRenderPageWithAd() {
long endNanos = System.currentTimeMillis() + TIME_BUDGET;
/** 获取广告的future */
Future<Ad> future = executorService.submit(new FetchAdTask<Ad>());
/** 获取原始待渲染页面 */
Page page = getRenderPage();
/** 获取广告 */
Ad ad;
try {
long timeLeft = endNanos - System.currentTimeMillis();
ad = future.get(timeLeft, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
ad = DEFAULT_AD;
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
ad = DEFAULT_AD
}catch (TimeoutException te){
/** 超时,放弃广告,并取消获取广告的任务 */
ad = DEFAULT_AD;
future.cancel(true);
}
page.setAd(ad);
return page;
}
|