0 执行者Executor的由来
在介绍具体的工具之前,先讲讲设计者的思路。在Java1.4之前,已经提供了Runnable接口、Thread类、Timer类和synchronize关键字,它们已经足以完成各种各样的多线程编程任务,为什么还要提供执行者这样的概念呢?
这是因为Java的设计者想把线程的创建、执行和调度分离。在Concurrency包出现之前,线程的创建基本上靠new一个Thread对象,执行靠start()方法,而线程的调度则完全依赖程序员在具体代码中自己写出来。
而在Concurrency包出现之后,线程的创建还是依靠Thread、Runnable和Callable(新加入)对象的实例化;而线程的执行则靠Executor、ExecutorService的对象执行execute()方法或submit()方法;线程的调度则被固化为几个具体的线程池类,如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ExecutorCompletionService等等。这样表面上增加了复杂度,而实际上成功将线程的创建、执行和调度的业务逻辑分离,使程序员能够将精力集中在线程中业务逻辑的编写,大大提高了编码效率,降低了出错的概率,而且大大提高了性能。
在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。
Java的线程既是工作单元,也是执行机制。从JDK
5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。
工具介绍
1 线程执行者
这个功能主要由三个接口和类提供,分别是:
Executor:执行者接口,所有执行者的父类;
ExecutorService:执行者服务接口,具体的执行者类都继承自此接口;
Executors:执行者工具类,大部分执行者的实例以及线程池都由它的工厂方法创建。
先看一个例子:
public class ExecutorExam {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new Task("task1"));
service.execute(new Task("task2"));
service.execute(new Task("task3"));
service.shutdown();
}
}
class Task implements Runnable {
private final String name;
Task(String name) {
this.name = name;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(name + "-[" + i + "]");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Executors工具类的newCachedThreadPool方法创建了一个执行者的对象,该对象是ExecutorService的实现类(后面我们会知道它是一个ThreadPoolExecutor对象)。
Task是Runnable接口的实现类,代码中new出了三个Task对象,它们被提交给ExecutorService的execute方法作为参数,即三个线程被ExecutorService执行了,其调度机制隐含在执行者的具体类中。最后ExecutorService调用shutdown方法,阻止继续提交其他线程,并等待执行中的线程结束。
通过这种方式,线程的创建、执行和调度被分离了,程序员可以将精力集中在线程的内部业务中。
2 得到异步执行的结果
在Java1.4之前,如果要得到一个线程运行后产生的值,没有一套现成的机制,程序员可以通过Thread类的成员变量、程序的全局变量等方式来得到一个线程运行后产生的某个值。但是这样的话,你必须不断探测线程是否已经成功结束,或者运用同步技术来等待线程执行完成,再去获取异步执行的结果。
在Java Concurrency中,得到异步结果有了一套固定的机制,即通过Callable接口、Future接口和ExecutorService的submit方法来得到异步执行的结果,相关工具的介绍如下:
- Callable:泛型接口,与Runnable接口类似,它的实例可以被另一个线程执行,内部有一个call方法,返回一个泛型变量V
- Future:泛型接口,代表依次异步执行的结果值,调用其get方法可以得到一次异步执行的结果,如果运算未完成,则阻塞直到完成;调用其cancel方法可以取消一次异步执行
- CompletionService:一种执行者,可将submit的多个任务的结果按照完成的先后顺序存入一个内部队列,然后可以使用take方法从队列中依次取出结果并移除,如果调用take时计算未完成则会阻塞
先看一个简单的例子,得到一个异步执行的整形值
public class CallableExam {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
Future<Integer> future = service.submit(() -> {
System.out.println("Callable is running");
TimeUnit.SECONDS.sleep(2);
return 47;
});
service.shutdown();
System.out.println("future.get = " + future.get());
}
}
接下来是一个使用CompletionService的例子,创建5个线程计算一些值,执行完成后使用CompletionService依次取出结果并打印
public class CompletionServiceExam {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newCachedThreadPool();
CompletionService<Integer> completionService = new ExecutorCompletionService<>(service);
for (int i = 0; i < 5; i++) {
completionService.submit(new TaskInteger(i));
}
service.shutdown();
//will block
for (int i = 0; i < 5; i++) {
Future<Integer> future = completionService.take();
System.out.println(future.get());
}
}
}
class TaskInteger implements Callable<Integer> {
private final int sum;
TaskInteger(int sum) {
this.sum = sum;
}
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(sum);
return sum * sum;
}
}
需要注意的是CompletionService的创建方法,它的构造函数需要一个ExecutorService的对象作为参数
3 重复执行和延期执行
在Java1.4之前,一般使用Timer来重复或者延期执行任务。Java Concurrency为了使之与Executor理念协调,引入了ScheduledExecutorService来完成同样的工作。
- ScheduledExecutorService:另一种执行者,可以将提交的任务延期执行,也可以将提交的任务反复执行
- ScheduledFuture:与Future接口类似,代表一个被调度执行的异步任务的返回值
下面的例子中ScheduledExecutorService的实例scheduler调度了两个任务,
- 第一个任务使用scheduleAtFixedRate()方法每隔一秒重复打印“beep”
- 第二个任务使用schedule()方法在10秒后延迟执行,它的作用是取消第一个任务
代码如下
public class ScheduledExecutorServiceExam {
public static void main(String[] args) {
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2);
ScheduledFuture<?> scheduledFuture = scheduler.scheduleAtFixedRate(() -> System.out.println("beep"), 1, 1, TimeUnit.SECONDS); scheduler.schedule(() -> { System.out.println("cancel beep"); scheduledFuture.cancel(true); scheduler.shutdown(); }, 10, TimeUnit.SECONDS); } }
4 TimeUnit
上面的例子中用到了TimeUnit,因此这里就先介绍一下。TimeUnit是Java Concurrency包引入的新式的表达时间间隔或延迟的单位。在JDK1.5后面引入的新类中,都使用TimeUnit作为时间的表达方式。例如:
lock.tryLock(50L, TimeUnit.MILLISECONDS)
public ScheduledFuture<?> schedule (Runnable command,long delay, TimeUnit unit);
还有,使用最多的,替代Thread.sleep的这种用法:
TimeUnit.SECONDS.sleep(5);
TimeUnit有以下6个时间单位:
NANOSECONDS:纳秒,1000纳秒为一微秒
MICROSECONDS:微秒,1000微秒为一毫秒
MILLISECONDS:毫秒,1000毫秒为一秒
SECONDS:秒,60秒为1分钟
MINUTES:分钟,60分钟为1小时
HOURS:小时,24小时为1天
DAYS:天
原来Java的时间单位默认为毫秒,引入TimeUnit后增加了纳秒和微秒,精度更高了,同时引入了很多转换方法,便于使用。
1 Executor框架简介
1.1 Executor框架的两级调度模型
在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这种两级调度模型的示意图如下图所示。
从图中可以看出,应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
1.2 Executor框架的结构与成员
1.2.1 Executor框架的结构
Executor框架主要由3大部分组成如下。
- 任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口。
- 任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
- 异步计算的结果。包括接口Future和实现Future接口的FutureTask类。
Executor框架包含的主要的类与接口如图
下面是这些类和接口的简介。
- Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
- ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
- ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
- Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
- Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。
Executor框架的使用示意图如图