0 执行者Executor的由来

在介绍具体的工具之前,先讲讲设计者的思路。在Java1.4之前,已经提供了Runnable接口、Thread类、Timer类和synchronize关键字,它们已经足以完成各种各样的多线程编程任务,为什么还要提供执行者这样的概念呢?
这是因为Java的设计者想把线程的创建、执行和调度分离。在Concurrency包出现之前,线程的创建基本上靠new一个Thread对象,执行靠start()方法,而线程的调度则完全依赖程序员在具体代码中自己写出来。
而在Concurrency包出现之后,线程的创建还是依靠Thread、Runnable和Callable(新加入)对象的实例化;而线程的执行则靠Executor、ExecutorService的对象执行execute()方法或submit()方法;线程的调度则被固化为几个具体的线程池类,如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ExecutorCompletionService等等。这样表面上增加了复杂度,而实际上成功将线程的创建、执行和调度的业务逻辑分离,使程序员能够将精力集中在线程中业务逻辑的编写,大大提高了编码效率,降低了出错的概率,而且大大提高了性能。

在Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。
Java的线程既是工作单元,也是执行机制。从JDK
5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

工具介绍

1 线程执行者

这个功能主要由三个接口和类提供,分别是:
Executor:执行者接口,所有执行者的父类;
ExecutorService:执行者服务接口,具体的执行者类都继承自此接口;
Executors:执行者工具类,大部分执行者的实例以及线程池都由它的工厂方法创建。
先看一个例子:

public class ExecutorExam {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(new Task("task1"));
        service.execute(new Task("task2"));
        service.execute(new Task("task3"));
        service.shutdown();
    }
}

class Task implements Runnable {
    private final String name;

    Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(name + "-[" + i + "]");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Executors工具类的newCachedThreadPool方法创建了一个执行者的对象,该对象是ExecutorService的实现类(后面我们会知道它是一个ThreadPoolExecutor对象)。
Task是Runnable接口的实现类,代码中new出了三个Task对象,它们被提交给ExecutorService的execute方法作为参数,即三个线程被ExecutorService执行了,其调度机制隐含在执行者的具体类中。最后ExecutorService调用shutdown方法,阻止继续提交其他线程,并等待执行中的线程结束。
通过这种方式,线程的创建、执行和调度被分离了,程序员可以将精力集中在线程的内部业务中。

2 得到异步执行的结果

在Java1.4之前,如果要得到一个线程运行后产生的值,没有一套现成的机制,程序员可以通过Thread类的成员变量、程序的全局变量等方式来得到一个线程运行后产生的某个值。但是这样的话,你必须不断探测线程是否已经成功结束,或者运用同步技术来等待线程执行完成,再去获取异步执行的结果。
在Java Concurrency中,得到异步结果有了一套固定的机制,即通过Callable接口、Future接口和ExecutorService的submit方法来得到异步执行的结果,相关工具的介绍如下:

  • Callable:泛型接口,与Runnable接口类似,它的实例可以被另一个线程执行,内部有一个call方法,返回一个泛型变量V
  • Future:泛型接口,代表依次异步执行的结果值,调用其get方法可以得到一次异步执行的结果,如果运算未完成,则阻塞直到完成;调用其cancel方法可以取消一次异步执行
  • CompletionService:一种执行者,可将submit的多个任务的结果按照完成的先后顺序存入一个内部队列,然后可以使用take方法从队列中依次取出结果并移除,如果调用take时计算未完成则会阻塞

先看一个简单的例子,得到一个异步执行的整形值

public class CallableExam {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        Future<Integer> future = service.submit(() -> {
            System.out.println("Callable is running");
            TimeUnit.SECONDS.sleep(2);
            return 47;
        });
        service.shutdown();
        System.out.println("future.get = " + future.get());
    }
}

接下来是一个使用CompletionService的例子,创建5个线程计算一些值,执行完成后使用CompletionService依次取出结果并打印

public class CompletionServiceExam {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newCachedThreadPool();
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(service);
        for (int i = 0; i < 5; i++) {
            completionService.submit(new TaskInteger(i));
        }
        service.shutdown();
        //will block
        for (int i = 0; i < 5; i++) {
            Future<Integer> future = completionService.take();
            System.out.println(future.get());
        }

    }
}

class TaskInteger implements Callable<Integer> {
    private final int sum;

    TaskInteger(int sum)  {
        this.sum = sum;
    }

    @Override
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(sum);
        return sum * sum;
    }
}

需要注意的是CompletionService的创建方法,它的构造函数需要一个ExecutorService的对象作为参数

3 重复执行和延期执行

在Java1.4之前,一般使用Timer来重复或者延期执行任务。Java Concurrency为了使之与Executor理念协调,引入了ScheduledExecutorService来完成同样的工作。

  • ScheduledExecutorService:另一种执行者,可以将提交的任务延期执行,也可以将提交的任务反复执行
  • ScheduledFuture:与Future接口类似,代表一个被调度执行的异步任务的返回值

下面的例子中ScheduledExecutorService的实例scheduler调度了两个任务,

  • 第一个任务使用scheduleAtFixedRate()方法每隔一秒重复打印“beep”
  • 第二个任务使用schedule()方法在10秒后延迟执行,它的作用是取消第一个任务

代码如下

public class ScheduledExecutorServiceExam {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2);
        ScheduledFuture<?> scheduledFuture = scheduler.scheduleAtFixedRate(() -> System.out.println("beep"), 1, 1, TimeUnit.SECONDS); scheduler.schedule(() -> { System.out.println("cancel beep"); scheduledFuture.cancel(true); scheduler.shutdown(); }, 10, TimeUnit.SECONDS); } }

4 TimeUnit

上面的例子中用到了TimeUnit,因此这里就先介绍一下。TimeUnit是Java Concurrency包引入的新式的表达时间间隔或延迟的单位。在JDK1.5后面引入的新类中,都使用TimeUnit作为时间的表达方式。例如:

lock.tryLock(50L, TimeUnit.MILLISECONDS)
        public ScheduledFuture<?> schedule (Runnable command,long delay, TimeUnit unit);

还有,使用最多的,替代Thread.sleep的这种用法:

TimeUnit.SECONDS.sleep(5);
TimeUnit有以下6个时间单位:
NANOSECONDS:纳秒,1000纳秒为一微秒
MICROSECONDS:微秒,1000微秒为一毫秒
MILLISECONDS:毫秒,1000毫秒为一秒
SECONDS:秒,60秒为1分钟
MINUTES:分钟,60分钟为1小时
HOURS:小时,24小时为1天
DAYS:天

原来Java的时间单位默认为毫秒,引入TimeUnit后增加了纳秒和微秒,精度更高了,同时引入了很多转换方法,便于使用。

1 Executor框架简介

1.1 Executor框架的两级调度模型

在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这种两级调度模型的示意图如下图所示。

从图中可以看出,应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。

1.2 Executor框架的结构与成员

1.2.1 Executor框架的结构

Executor框架主要由3大部分组成如下。

  • 任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口。
  • 任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
  • 异步计算的结果。包括接口Future和实现Future接口的FutureTask类。

Executor框架包含的主要的类与接口如图

下面是这些类和接口的简介。

  • Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
  • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
  • Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
  • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行。

Executor框架的使用示意图如图