文章目录
- 线程池主要解决两个问题
- ctl 含义 ---- 记录线程池状态和线程池中线程个数
- mainLock & termination
线程池主要解决两个问题
-
一是当执行大量异步任务时线程池 能够提供较好的性能 。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。
-
二是线程池提供了一种 资源限制和管理的手段 ,比如可以限制线程的个数,动态新增线程等。每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。
另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情境的需要,程序员可以使用更方便的Executors的工厂方法,比如 newCachedThreadPool
(线程池线程个数最多可达 Integer.MAX_VALUE
,线程自动回收)、 newFixedThreadPool
(固定大小的线程池)和 newSingleThreadExecutor
(单个线程)等来创建线程池,当然用户还可以自定义。
类关系图
在上图中,Executors其实是个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程池实例。
ctl 含义 ---- 记录线程池状态和线程池中线程个数
ThreadPoolExecutor继承了AbstractExecutorService, 成员变量ctl是一个Integer的原子变量,用来记录线程池状态和线程池中线程个数 ,类似于ReentrantReadWriteLock使用一个变量来保存两种信息。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
这里假设Integer类型是32位二进制表示,则其中高3位用来表示线程池状态,后面29位用来记录线程池线程个数。
/用来标记线程池状态(高3位),线程个数(低29位) //默认是RUNNING状态,线程个数为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //线程个数掩码位数 private static final int COUNT_BITS = Integer.SIZE - 3; //线程最大个数(低29位)00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态:
//(高3位):11100000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //(高3位):00000000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //(高3位):00100000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //(高3位):01000000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //(高3位):01100000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // 获取高三位 运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取低29位 线程个数 private static int workerCountOf(int c) { return c & CAPACITY; } //计算ctl新值,线程状态 与 线程个数 private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池状态 及转换
-
RUNNING:接受新任务并且处理阻塞队列里的任务。
-
SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。
-
STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务。
-
TIDYING:所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为0,将要调用terminated方法。
-
TERMINATED:终止状态。terminated方法调用完成以后的状态。
线程池状态转换列举如下。
-
RUNNING -> SHUTDOWN :显式调用shutdown()方法,或者隐式调用了finalize()、方法里面的shutdown()方法。
-
RUNNING 或 SHUTDOWN)-> STOP :显式调用 shutdownNow()方法时。
-
SHUTDOWN -> TIDYING :当线程池和任务队列都为空时。
-
STOP -> TIDYING :当线程池为空时。
-
TIDYING -> TERMINATED: 当 terminated() hook 方法执行完成时
线程池参数
-
corePoolSize:线程池核心线程个数。
-
workQueue:用于保存等待执行的任务的阻塞队列, 比如基于数组的有界ArrayBlockingQueue、基于链表的无界LinkedBlockingQueue、最多只有一个元素的同步队列SynchronousQueue及优先级队列PriorityBlockingQueue等。
-
maximunPoolSize:线程池最大线程数量。
-
ThreadFactory:创建线程的工厂。
-
RejectedExecutionHandler:饱和策略,当队列满并且线程个数达到maximunPoolSize后采取的策略。
比如
AbortPolicy(抛出异常)、
CallerRunsPolicy(使用调用者所在线程来运行任务)、
DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)
DiscardPolicy(默默丢弃,不抛出异常)
-
keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。
-
TimeUnit:存活时间的时间单位
线程池类型
- newFixedThreadPool :创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE。 keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
// 使用自定义线程创建工厂 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
- newSingleThreadExecutor: 创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE。 keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
- newCachedThreadPool :创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列。keeyAliveTime=60说明只要当前线程在60s内空闲则回收。 这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
mainLock & termination
/** * Lock held on access to workers set and related bookkeeping. * While we could use a concurrent set of some sort, it turns out * to be generally preferable to use a lock. Among the reasons is * that this serializes interruptIdleWorkers, which avoids * unnecessary interrupt storms, especially during shutdown. * Otherwise exiting threads would concurrently interrupt those * that have not yet interrupted. It also simplifies some of the * associated statistics bookkeeping of largestPoolSize etc. We * also hold mainLock on shutdown and shutdownNow, for the sake of * ensuring workers set is stable while separately checking * permission to interrupt and actually interrupting. */ private final ReentrantLock mainLock = new ReentrantLock();
/** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition();
-
mainLock是独占锁,用来控制新增Worker线程操作的原子性。
-
termination是该锁对应的条件队列,在线程调用awaitTermination时用来存放阻塞的线程。
Worker继承AQS和Runnable接口,是具体承载任务的对象。Worker继承了AQS,自己实现了简单不可重入独占锁,其中state=0表示锁未被获取状态,state=1表示锁已经被获取的状态,state=-1是创建Worker时默认的状态,创建时状态设置为-1是为了避免该线程在运行runWorker()方法前被中断。其中变量firstTask记录该工作线程执行的第一个任务,thread是具体执行任务的线程。
DefaultThreadFactory是线程工厂,newThread方法是对线程的一个修饰。其中poolNumber是个静态的原子变量,用来统计线程工厂的个数,threadNumber用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。