文章目录

  • 线程池主要解决两个问题
  • ctl 含义 ---- 记录线程池状态和线程池中线程个数
  • mainLock & termination

线程池主要解决两个问题

  • 一是当执行大量异步任务时线程池 能够提供较好的性能 。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。

  • 二是线程池提供了一种 资源限制和管理的手段 ,比如可以限制线程的个数,动态新增线程等。每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。

另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情境的需要,程序员可以使用更方便的Executors的工厂方法,比如 newCachedThreadPool (线程池线程个数最多可达 Integer.MAX_VALUE ,线程自动回收)、 newFixedThreadPool (固定大小的线程池)和 newSingleThreadExecutor (单个线程)等来创建线程池,当然用户还可以自定义。

类关系图

在上图中,Executors其实是个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程池实例。

ctl 含义 ---- 记录线程池状态和线程池中线程个数

ThreadPoolExecutor继承了AbstractExecutorService, 成员变量ctl是一个Integer的原子变量,用来记录线程池状态和线程池中线程个数 ,类似于ReentrantReadWriteLock使用一个变量来保存两种信息。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

这里假设Integer类型是32位二进制表示,则其中高3位用来表示线程池状态,后面29位用来记录线程池线程个数。

/用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//线程个数掩码位数
private static final int COUNT_BITS = Integer.SIZE - 3;

//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

线程池状态:

//(高3位):11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//(高3位):00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//(高3位):01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//(高3位):01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取高三位 运行状态
private static int runStateOf(int c)     {
  return c & ~CAPACITY; }

//获取低29位 线程个数
private static int workerCountOf(int c)  {
  return c & CAPACITY; }

//计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc) {
  return rs | wc; }

线程池状态 及转换

  • RUNNING:接受新任务并且处理阻塞队列里的任务。

  • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。

  • STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务。

  • TIDYING:所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为0,将要调用terminated方法。

  • TERMINATED:终止状态。terminated方法调用完成以后的状态。

线程池状态转换列举如下。

  • RUNNING -> SHUTDOWN :显式调用shutdown()方法,或者隐式调用了finalize()、方法里面的shutdown()方法。

  • RUNNING 或 SHUTDOWN)-> STOP :显式调用 shutdownNow()方法时。

  • SHUTDOWN -> TIDYING :当线程池和任务队列都为空时。

  • STOP -> TIDYING :当线程池为空时。

  • TIDYING -> TERMINATED: 当 terminated() hook 方法执行完成时

线程池参数

  • corePoolSize:线程池核心线程个数。

  • workQueue:用于保存等待执行的任务的阻塞队列, 比如基于数组的有界ArrayBlockingQueue、基于链表的无界LinkedBlockingQueue、最多只有一个元素的同步队列SynchronousQueue及优先级队列PriorityBlockingQueue等。

  • maximunPoolSize:线程池最大线程数量。

  • ThreadFactory:创建线程的工厂。

  • RejectedExecutionHandler:饱和策略,当队列满并且线程个数达到maximunPoolSize后采取的策略。

    比如

    AbortPolicy(抛出异常)、

    CallerRunsPolicy(使用调用者所在线程来运行任务)、

    DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)

    DiscardPolicy(默默丢弃,不抛出异常)

  • keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。

  • TimeUnit:存活时间的时间单位

线程池类型

  • newFixedThreadPool :创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE。 keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收
public static ExecutorService newFixedThreadPool(int nThreads) {
 
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
// 使用自定义线程创建工厂
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
 
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
  • newSingleThreadExecutor: 创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE。 keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
public static ExecutorService newSingleThreadExecutor() {
 
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
 
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
  • newCachedThreadPool :创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列。keeyAliveTime=60说明只要当前线程在60s内空闲则回收。 这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务。
public static ExecutorService newCachedThreadPool() {
 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

mainLock & termination

/**
     * Lock held on access to workers set and related bookkeeping.
     * While we could use a concurrent set of some sort, it turns out
     * to be generally preferable to use a lock. Among the reasons is
     * that this serializes interruptIdleWorkers, which avoids
     * unnecessary interrupt storms, especially during shutdown.
     * Otherwise exiting threads would concurrently interrupt those
     * that have not yet interrupted. It also simplifies some of the
     * associated statistics bookkeeping of largestPoolSize etc. We
     * also hold mainLock on shutdown and shutdownNow, for the sake of
     * ensuring workers set is stable while separately checking
     * permission to interrupt and actually interrupting.
     */
    private final ReentrantLock mainLock = new ReentrantLock();
/**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();
  • mainLock是独占锁,用来控制新增Worker线程操作的原子性。

  • termination是该锁对应的条件队列,在线程调用awaitTermination时用来存放阻塞的线程。

Worker继承AQS和Runnable接口,是具体承载任务的对象。Worker继承了AQS,自己实现了简单不可重入独占锁,其中state=0表示锁未被获取状态,state=1表示锁已经被获取的状态,state=-1是创建Worker时默认的状态,创建时状态设置为-1是为了避免该线程在运行runWorker()方法前被中断。其中变量firstTask记录该工作线程执行的第一个任务,thread是具体执行任务的线程。

DefaultThreadFactory是线程工厂,newThread方法是对线程的一个修饰。其中poolNumber是个静态的原子变量,用来统计线程工厂的个数,threadNumber用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。

源码分析