1.线程池的基本使用

在Java中,线程池主要通过ThreadPoolExecutor和
ScheduledThreadPoolExecutor来体现。 以ThreadPoolExecutor为例,在使用线程池之前,必须要了解以下几个概念:

  • 核心线程数(Core Pool Size):表示核心线程的数量。所谓核心线程,是在"大多数情况下(未开启allowCoreThreadTimeOut参数)"都不会被销毁的线程(即使它们一直处于空闲状态);
  • 最大线程数(Max Pool Size):当任务堆积,且池中没有空闲的核心线程来处理任务时,线程池会创建一些临时的线程来处理堆积的任务。这些临时的线程会在空闲一定时间后被销毁掉, 最大的临时线程数量为MaxPoolSize - CorePoolSize;
  • 存活时间(Keep Alive Time):分为具体的数值和单位,如60s。表示临时线程空闲存活时间(在开启allowCoreThreadTimeOut参数后也表示核心线程的空闲存活时间)。 当临时线程空闲时间超过该值之后,就会被销毁掉;
  • 工作队列(Work Queue):存放挤压的任务;
  • 拒绝策略(Reject Handler):当线程池的线程数量已经达到上限(Max Pool Size),全部都处于非空闲的状态,且工作队列已满无法再堆积任务时, 会按照预先设定的方式拒绝新的任务;

线程池围绕这几个核心概念以固定的方式运行:

image

ThreadPoolExecutor根据这5个核心概念提供了6个构造函数参数(存活时间分为值和单位),如下:

// 核心线程数
int corePoolSize = 10;
// 最大线程数
int maxPoolSize = 15;
// 存活时间(值)
long keepAliveTime = 60;
// 存活时间(单位)
TimeUnit keepAliveUnit = TimeUnit.SECONDS;
// 工作队列(具体队列特性请见下文)
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
// 拒绝策略(具体策略特性请见下文)
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();

// 实例化线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
        corePoolSize, maxPoolSize, 
        keepAliveTime, keepAliveUnit, 
        workQueue, rejectHandler
);

ThreadPoolExecutor还提供了一个额外的可配置参数,其类型为ThreadFactory。顾名思义,其表示创建线程的工厂,默认使用DefaultThreadFactory ,主要用于设置池中线程的名称、优先级、类型(是否守护线程)等。

2. 常见工作队列

选择使用不同的工作队列,也就会使得线程池表现出不同的特点。常见的工作队列包括以下几种:

2.1 ArrayBlockingQueue

属于有界队列,在构造时,必须指定其容量。底层存储结构为数组,利用"双指针"的形式确定读取和写入的索引。其内部只有一把锁供读取和写入操作同时使用,从而无法实现读写并行。

2.2 LinkedBlockingQueue

构造时,可以指定其容量,或不指定容量。在不指定容量时,属于无界队列(理论上,实际上最大容量为Integer.MAX_VALUE)。底层数据结构为链表。

其内部根据two lock queue理论实现。

A two lock queue would allow one consumer and one producer to work at the same time in a multi-CPU system.

读锁和写锁分离,通过AtomicInteger的count字段来判断能否读或写,吞吐量非常高。

但受链表特性的影响,在高并发场景下,会频繁新增或删除节点,从而导致频繁的GC。

2.3 PriorityBlockingQueue

优先级队列,能够提供有序的数据消费顺序。构造时可以指定比较器(Comparator)。在不指定比较器的情况下,存入的数据必须是可比较的(Comparable)。属于无界队列。

底层是基于数组实现的二叉堆,默认初始容量为11,在容量不足的情况下会自动扩容。其读取和写入操作共享同一把锁。

2.4 SynchronousQueue

同步队列,本质上没有任何存储空间,当生产者存入数据时,立即会被阻塞住,直到有消费者消费这个数据。同理,当消费者消费数据时,如果队列中没有数据,则消费者会被阻塞住,直到获取到数据为止。

其提供了公平和非公平两种模式,可以通过构造函数确定。公平模式下,其内部数据结构为
SynchronousQueue.TransferQueue。非公平模式下,其内部数据结构为SynchronousQueue.TransferStack。

SynchronousQueue先自旋,超过一定次数之后再加锁。效率较高。

2.5 LinkedTransferQueue

可以理解为具有存储空间的SynchronousQueue,或是公平模式下的SynchronousQueue和LinkedBlockingQueue的结合。属于无界队列。

2.6 DelayQueue

延时队列,无界,其只能存储
java.util.concurrent.Delayed类型的数据。底层基于PriorityBlockingQueue实现。

2.7 DelayedWorkQueue

是"定时任务线程池"(
ScheduledThreadPoolExecutor)的私有内部类,且仅能使用在ScheduledThreadPoolExecutor中。

DelayedWorkQueue只能存储Runnable类型的数据(其他队列一般以泛型的形式定义其所存储的数据类型),可见其类定义:

static class DelayedWorkQueue 
                extends AbstractQueue<Runnable> 
                implements BlockingQueue<Runnable> {}

不过,
ScheduledThreadPoolExecutor会把Runnable类型的任务包装成RunnableScheduledFuture来使用,从而也导致了DelayedWorkQueue所存储数据的实际类型为RunnableScheduledFuture。

跟PriorityBlockingQueue一样,DelayedWorkQueue底层是数据结构也是用数组实现的二叉堆,不过其默认的容量是16。

3. 常见拒绝策略

J.U.C包中默认提供了4种拒绝策略:

3.1 AbortPolicy

这是默认的拒绝策略。当线程池无法容纳更多任务,且还有新任务提交时,会抛出
RejectedExecutionException异常。

3.2 DiscardPolicy

静默拒绝策略。同AbortPolicy类似,但不会抛出异常。

3.3 CallerRunsPolicy

该策略会使得线程池在无法容纳更多任务且同时有新任务提交时,交由调用者所在线程来执行任务。

例如:

// 设置当前线程名为"Main"
Thread.currentThread().setName("Main");

// 定义任务:在休眠5s之后,打印语句,说明由哪个线程执行
Runnable sleepingRunnable = () -> {
    try {
        Thread.sleep(1000 * 5);
        System.out.println("任务由" + Thread.currentThread().getName() + "执行");
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
};

// 该线程池核心线程数和最大线程数都为1,且队列容量也为1,也就是说,该线程池最多同时容纳2个任务
ExecutorService executorService = new ThreadPoolExecutor(
    1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), 
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 连续提交3个任务
executorService.submit(sleepingRunnable);
executorService.submit(sleepingRunnable);
executorService.submit(sleepingRunnable);

运行上述代码,得到结果

任务由Main执行
任务由pool-1-thread-1执行
任务由pool-1-thread-1执行

由此可见,在Main线程连续提交2个任务之后,线程池已经达到其容量的极限。当Main线程再次提交新的任务时,根据CallerRunsPolicy策略的影响,只能由Main线程自己执行这个任务了。

CallerRunsPolicy的原理非常简单,可见其源码:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // 判断线程池是否关闭
    if (!e.isShutdown()) {
        // 在线程池没有关闭的情况下,直接执行任务,注意:当前执行的线程是调用线程
        r.run();
    }
}

3.4 DiscardOldestPolicy

该策略会丢弃掉工作队列中等待时间最长的任务,也就是当前队列中的第一个任务,并将新的任务添加到队列尾部。

其核心源码非常简单,如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    // 判断线程池是否关闭
    if (!e.isShutdown()) {
        e.getQueue().poll(); // 取出对头头部的数据
        e.execute(r); // 提交新任务,注意,新任务会自然放入到队列尾部
    }

}

4. 预定义线程池

除了通过核心参数自己配置线程池之外,也可以通过
java.util.concurrent.Executors所提供静态方法创建预定义的线程池。

4.1 FixedThreadPool

固定大小的线程池,使用时必须给定线程数量,其实际源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

可以看到,该线程池的核心线程数和最大线程数保持一致,且使用的无界队列(LinkedBlockingQueue)。没有指定拒绝策略,则使用默认的AbortPolicy策略。不过由于其使用的是无界队列,在高并发的情况下,出现的应该是OutOfMemoryError而不是
RejectedExecutionException。

4.2 CachedThreadPool

这是一个能够缓存线程的线程池,其实际源码如下:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}

该线程池没有核心线程,理论上可以创建无限的临时线程,临时线程的空闲存活时间为60s。注意,该线程池使用的是SynchronousQueue,上文中已经提到,SynchronousQueue是没有实际存储空间的,因此当没有空闲临时线程来处理新任务时,该线程池都会新创建一个临时线程来处理新的任务。

CachedThreadPool适合于"固定时间段内产生大量流量"的场景,比如说"限时抢购"。它会在高峰时间段内尽可能创建多的线程来处理业务,并在高峰期结束后回收掉大量的临时线程,从而不占用系统过多的资源。

不过,由于其的临时线程数是"没有上限"的,所以也会产生OutOfMemoryError。

4.3 SingleThreadExecutor

顾名思义,这是一个只有一个线程的线程池,其使用的是无界队列(LinkedBlockingQueue)。当开发者期望任务能够以串行的方式执行时,可以考虑使用SingleThreadExecutor。

不过该线程池缺点也非常明显。其使用一个线程来执行所有的任务,速度肯定较慢,容易造成任务堆积,容易产生OutOfMemoryError。

4.4 ScheduledThreadPool

这是一个可执行定时任务的线程池,使用时需要指定核心线程数量,如:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(20);

定位
Executors.newScheduledThreadPool(int)方法的源码:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 10L, TimeUnit.MILLISECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}

其理论上可以创建无限的临时线程,从而导致OutOfMemoryError,不过在实际使用中,不会存在太多的定时任务,因此OutOfMemoryError的情况基本上不会出现。

ScheduledExecutorService有两个核心方法需要注意:

  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):表示以固定的速度定期执行任务,其四个参数的意义分别为: command:表示具体需要执行的任务; initialDelay:表示延迟多少时间之后开始第一次执行任务; period:表示两次任务执行之间的时间间隔; unit:initialDelay和period参数的时间单位 在使用该方法时,需要考虑一种特殊情况。假设command每次执行需要10s,但是period参数设置的5s 。那么在第二个任务准备开始时,第一个任务还没有执行结束。此时,线程池会等待第一个任务执行结束,然后立即开始第二个任务。
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):表示以固定的间隔时间重复执行任务,除了第三个参数delay表示两次任务之间的延时时间之外,其他3个参数的意义与scheduleAtFixedRate方法一致。 scheduleWithFixedDelay会在每个任务执行结束之后再计算延时,例如:command每次执行需要10s,delay参数设置的5s,那么它会在第一个任务执行结束之后,等待5s再开始执行第二个任务。

4.5 WorkStealingPool

是Java8 新增加的线程池,由
Executors.newWorkStealingPool()方法创建。该线程池是ForkJoinPool类型的而非ThreadPoolExecutor或ScheduledThreadPoolExecutor。

ForkJoinPool的本质是将大的任务拆分为多个小的任务,并交由多个线程并行执行。但是需要注意,只有ForkJoinTask类型的任务才会被拆分。实际使用时,为了简化难度,一般不会直接使用ForkJoinTask,而是使用其子类,如:

  • RecursiveAction:无返回值的任务;
  • RecursiveTask:有返回值的任务;
  • CountedCompleter:无返回值但可以触发回调的任务;

ForkJoinPool在进行非常大量的数据计算时,会体现出比较明显的优势。

5. 线程池异常处理

通过submit(Runnable)方法提交任务时,会得到一个Future类型的返回值。若任务出现异常,则异常会被包装在返回值对象中。

Future对象提供了get()方法用于获取异步任务执行结果。在调用get()时,会被要求处理ExecutionException异常。而任务中出现的异常可以通过
ExecutionException.getCause() 方法获取到。

若通过execute(Runnable)方法提交任务,当任务出现异常时,异常会被直接打印在控制台上。此时可以利用线程本身的UncaughtExceptionHandler来完成异常的处理。

Java中每个线程都可以通过Thread#
setUncaughtExceptionHandler(UncaughtExceptionHandler)来为线程设置异常处理器,当出现异常时,JVM会调用Thread#dispatchUncaughtException(Throwable),在这个方法中会调用异常处理器中的uncaughtException(Thread, Throwable)方法来执行处理操作。

开发者可以利用线程池的ThreadFactory参数来方便的为池中的每一个线程设置异常处理器,如下:

// 自定义异常处理器,在遇到异常时,打印出线程的名称和异常信息
public class ExceptionHandler implements Thread.UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("the thread: " + t.getName() + " throws error: " + e.getMessage());
    }
}

// 自定义线程工厂,为每一个线程设置异常处理器
public class CustomizedThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setPriority(Thread.NORM_PRIORITY);
        thread.setDaemon(false);

        // 在这里设置异常处理器
        thread.setUncaughtExceptionHandler(new ExceptionHandler());
        return thread;
    }
}

每个线程也可以设置自己的ThreadGroup,ThreadGroup继承自
Thread.UncaughtExceptionHandler,因此也可以通过ThreadGroup 来完成异常处理的工作,如下:

// 自定义线程组并提供异常处理的能力
public class CustomizedThreadGroup extends ThreadGroup {

    public static final String NAME = "异常处理线程组";

    public CustomizedThreadGroup() {
        // 必须为线程组提供一个名称
        super(NAME);
    }

    // 重写uncaughtException以提供异常处理的能力
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.printf("the thread: %s throws exception: %s%n", t.getName(), e.getMessage());
    }
}

// 自定义线程工厂,为每个线程设置线程组
public class CustomizedThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        // 创建线程并设置线程组
        Thread thread = new Thread(new CustomizedThreadGroup(), r);
        thread.setDaemon(false);
        thread.setPriority(Thread.NORM_PRIORITY);
        return thread;
    }
}

6. 关闭线程池

Java中提供了2种方式用于关闭一个正在运行的线程池:

6.1 shutdown()

调用shutdown()方法,线程池进入到SHUTDOWN状态。此时线程池不接受新的任务,提交新的任务时直接触发拒绝策略。注意,默认的拒绝策略中,除了AbortPolicy是直接抛出异常之外,剩余的3个,要么什么都不做(DiscardPolicy),要么只在非关闭状态才执行处理,因此不会有任何反应。

虽然不再接受新的任务,但线程池会等待正在执行的任务已经存放在工作队列中的任务执行完毕时才真正关闭,回收资源。

6.2 shutdownNow()

另一种关闭线程池的方法是调用shutdownNow()方法,相对于shutdown()方法而言,调用shutdownNow()之后线程池会进入到STOP状态。此时线程池也不会接新的任务,提交新任务也会触发拒绝策略,但线程池只等待正在执行的任务执行完毕就真正关闭并回收资源,已经在工作队列中的任务不会被执行,而是以返回值的形式返回。

7. 线程池状态

线程池共有5种状态。初始时为RUNNING,表示线程池正在工作中。调用shutdown()方法进入到SHUTDOWN状态,或是调用shutdownNow()方法进入到STOP状态。这两种状态下,线程池不是真正被关闭,但也不会接受新的任务,不过会等待已有的任务全部或部分执行结束(详情见上文:关闭线程池)。之后,线程池将进入到TIDYING状态,此时,线程池会调用钩子方法terminated()。这个方法在ThreadPoolExecutor中实现为空,可由子类重写以完成某些业务操作。在钩子方法调用完毕之后,线程池会进入到TERMINATED状态,这时候就是真正被关闭了。

这5种状态的流转关系如下图所示:

image