什么是线程池
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
而本文描述线程池是JDK中提供的ThreadPoolExecutor类。
当然,使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
线程池的创建
创建线程池主要是ThreadPoolExecutor类来完成,ThreadPoolExecutor 的有许多重载的构造方法,通过参数最多的构造方法来理解创建线程池有哪些需要配置的参数。ThreadPoolExecutor 的构造方法为:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)下面对参数进行说明:
- corePoolSize:表示核心线程池的大小。当提交一个任务时,如果当前核心线程池的线程个数没有达到 corePoolSize,则会创建新的线程来执行所提交的任务,即使当前核心线程池有空闲的线程。如果当前核心线程池的线程个数已经达到了 corePoolSize,则不再重新创建线程。如果调用了
prestartCoreThread()或者prestartAllCoreThreads(),线程池创建的时候所有的核心线程都会被创建并且启动。 - maximumPoolSize:表示线程池能创建线程的最大个数。如果当阻塞队列已满时,并且当前线程池线程个数没有超过 maximumPoolSize 的话,就会创建新的线程来执行任务。
- keepAliveTime:空闲线程存活时间。如果当前线程池的线程个数已经超过了 corePoolSize,并且线程空闲时间超过了 keepAliveTime 的话,就会将这些空闲线程销毁,这样可以尽可能降低系统资源消耗。
- unit:时间单位。为 keepAliveTime 指定时间单位。
- workQueue:阻塞队列。用于保存任务的阻塞队列,关于阻塞队列可以看这篇文章。可以使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue。
- threadFactory:创建线程的工程类。可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字,如果出现并发问题,也方便查找问题原因。
- handler:饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就需要采用一种策略来处理这种情况。采用的策略有这几种:
- AbortPolicy: 直接拒绝所提交的任务,并抛出RejectedExecutionException异常;
- CallerRunsPolicy:只用调用者所在的线程来执行任务;
- DiscardPolicy:不处理直接丢弃掉任务;
- DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务
线程池的运行状态
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
根据上面源码可知, COUNT_BITS的值为29, CAPACITY的值为2的29次方-1, 二进制表示为: "00011111111111111111111111111111"(明显29个1);
上面的源码中线程池的运行状态的二进制表示:
| 状态 | 二进制 | 意义 |
|---|---|---|
RUNNING | 11100000000000000000000000000000 | 接受新execute的task, 执行已入队的task |
SHUTDOWN | 0 | 不接受新execute的task, 但执行已入队的task, 中断所有空闲的线程 |
STOP | 00100000000000000000000000000000 | 不接受新execute的task, 不执行已入队的task, 中断所有的线程 |
TIDYING | 01000000000000000000000000000000 | 所有线程停止, workerCount数量为0, 将执行hook方法: terminated() |
TERMINATED | 01100000000000000000000000000000 | terminated()方法执行完毕 |
可以看出, 线程池的状态由32位int整型的二进制的前三位表示.
核心属性ctl源码(线程池状态和有效线程数)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
核心属性ctl, 数据类型是AtomicInteger, 表示了两个含义:
- 线程池运行状态(
runState) - 线程池中的有效线程数(
workerCount)
那是如何做到一个属性表示两个含义的呢? 那就要看看ctlOf方法
private static int ctlOf(int rs, int wc) { return rs | wc; } ctlOf方法在线程池内部用来更新线程池的ctl属性,比如ctl初始化的时候: ctl = new AtomicInteger(ctlOf(RUNNING, 0)), 调用ThreadPoolExecutor#shutdown方法等;
rs表示runState, wc表示workerCount;
将 runState和workerCount做按位或运算得到ctl的值;
而runState和workerCount的值由下面两个方法packing和unpacking, 这里的形参c就是ctl.get()的值;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; } 下面用表格更清晰理解:
| 方法 | 方法体 | 带入CAPACITY的值 |
|---|---|---|
runStateOf | c & ~CAPACITY | c & 11100000000000000000000000000000 |
workerCountOf | c & CAPACITY | c & 00011111111111111111111111111111 |
按位与运算, 相同位置, 同1才为1, 其余为0;
结合表格看, runStateOf方法取ctl前3位表示runState, workerCountOf方法取第4~32位的值表示workerCount;
在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个改而另一个没有改的情况,如果将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的。
代码分析
java.util.concurrent.ThreadPoolExecutor#execute:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*1.判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,
* 如果能完成新线程创建exexute方法结束,成功提交任务
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 2.在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态
* 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要
* reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 3.如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池
* 已经达到饱和状态,所以reject这个他任务
*/
int c = ctl.get();
// 1.如果当前有效线程数小于核心线程数,调用addWorker执行任务(即创建一条线程执行该任务)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前有效线程大于等于核心线程数,并且当前线程池状态为运行状态,则将任务添加到阻塞队列中,等待空闲线程取出队列workQueue执行
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);// 移除成功,拒绝该非运行的任务
else if (workerCountOf(recheck) == 0)
// 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
// 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
addWorker(null, false);
}
// 3.如果阻塞队列已满,则调用addWorker执行任务(即创建一条线程执行该任务)
else if (!addWorker(command, false))
reject(command);// 如果创建线程失败,则调用线程拒绝策略
}画一个简单的图:
上图中的worker可简单理解为线程池中的一个线程, workers.size()即使线程池中的线程数;
- 当
workers.size()小于corePoolSize时, 创建新的线程执行提交的task. - 当
workers.size()大于corePoolSize时, 并且workQueue没有满, 将task添加到workQueue. - 当
workers.size()大于corePoolSize时, 并且workQueue已经满了, 但是workers.size()<maximumPoolSize, 就创建一个空闲线程处理task. 当workers.size()大于corePoolSize时, 并且workQueue已经满了, 但是workers.size()>=maximumPoolSize, 执行拒绝策略.
java.util.concurrent.ThreadPoolExecutor#addWorker:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果rs大于SHUTDOWN,则说明此时线程池不在接受新任务了
// 如果rs等于SHUTDOWN,同时满足firstTask为空,且阻塞队列如果有任务,则继续执行任务
// 也就说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果有效线程数大于等于线程池所容纳的最大线程数(基本不可能发生),不能添加任务
// 或者有效线程数大于等于当前限制的线程数,也不能添加任务
// 限制线程数量有任务是否要核心线程执行决定,core=true使用核心线程执行任务
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c)) // 使用CAS增加有效线程数量
break retry;
c = ctl.get(); // 如果再次获取ctl变量值
if (runStateOf(c) != rs)// 再次对比运行状态,如果不一致,再次循环执行,说明此处有其他并发操作,修改了State
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 任务是否已执行
boolean workerStarted = false;
// 任务是否已添加
boolean workerAdded = false;
// 任务包装类,我们的任务都需要添加到Worker中
Worker w = null;
try {
w = new Worker(firstTask);
// 获取Worker中的Thread值
final Thread t = w.thread;
if (t != null) {
// 操作workers HashSet 数据结构需要同步加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 获取当前线程池的运行状态
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
// rs是RUNNING状态时,直接创建线程执行任务
// 当rs等于SHUTDOWN时,并且firstTask为空,也可以创建线程执行任务,也就说明了SHUTDOWN状态时不再接受新任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动线程执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这里特别强调,firstTask是开启线程执行的首个任务,之后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,需要注意。
创建一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中,这里需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁。 如何执行任务?
我们注意到上面的代码中:
// 启动线程执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}这里的t是w.thread得到的,即是Worker中用于执行任务的线程,该线程由ThreadFactory创建,我们再看看生成Worker的构造方法:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}newThread传的参数是Worker本身,而Worker实现了Runnable接口,所以当我们执行t.start()时,执行的是Worker的run()方法,找到入口了:
java.util.concurrent.ThreadPoolExecutor.Worker#run:
public void run() {
runWorker(this);
}java.util.concurrent.ThreadPoolExecutor#runWorker:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环从workQueue阻塞队列中获取任务并执行
while (task != null || (task = getTask()) != null) {
// 加同步锁的目的是为了防止同一个任务出现多个线程执行的问题
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在关闭,须确保中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前可以做一些操作,交由子类实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 将task置为空,让线程自行调用getTask()方法从workQueue阻塞队列中获取任务
task = null;
// 记录Worker执行了多少次任务,用于统计
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程回收过程
processWorkerExit(w, completedAbruptly);
//将task置为空,目的是为了让线程自行调用getTask()方法从workQueue阻塞队列中获取任务。
}
} 如何保证核心线程不被销毁?
我们之前已经知道线程池中可维持corePoolSize数量的常驻核心线程,那么它们是如何保证执行完任务而不被线程池回收的呢?在前面的章节中你可能已经到从workQueue队列中会阻塞式地获取任务,如果没有获取任务,那么就会一直阻塞下去
java.util.concurrent.ThreadPoolExecutor#getTask:
private Runnable getTask() {
// 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP
// 则CAS操作减少工作线程数量,并且返回null,线程被回收
// 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();// CAS操作将线程池中的线程数量减一
return null;
}
// 获取线程池中的有效线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果开发者主动开启allowCoreThreadTimeOut并且获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的
// allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收
// 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 这里说明了两点销毁线程的条件:
// 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize,
// 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了
// 以上两点满足其一,都可以触发线程超时回收
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 尝试用CAS将线程池线程数量减一
if (compareAndDecrementWorkerCount(c))
// 减一成功后返回null,线程被回收
return null;
// 否则循环重试
continue;
}
try {
// 如果timed为true,阻塞超时获取任务,否则阻塞获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果poll超时获取任务超时了, 将timeOut设置为true
// 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
实现思路大致是:
将timedOut超时标记默认设置为false;
计算timed的值,该值决定了线程的生死大权,(timed && timedOut) 即是线程超时回收的条件之一,需要注意的是第一次(timed && timedOut) 为false,因为timedOut默认值为false,此时还没到poll超时获取的操作;
根据timed值来决定是用阻塞超时获取任务还是阻塞获取任务,如果用阻塞超时获取任务,超时后timedOut会被设置为true,接着继续循环,若(timed && timedOut) 为true,满足线程超时回收。 processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 正常的话再runWorker的getTask方法workerCount已经被减一了
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累加线程的completedTasks
completedTaskCount += w.completedTasks;
// 从线程池中移除超时或者出现异常的线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试停止线程池
tryTerminate();
int c = ctl.get();
// runState为RUNNING或SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 线程不是异常结束
if (!completedAbruptly) {
// 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 线程池还不为空那就不用担心了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 1.线程异常退出
// 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理
addWorker(null, false);
}
} tryTerminate
processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少wokerCount或SHUTDOWN状态下从队列中移除任务。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下状态直接返回:
// 1.线程池还处于RUNNING状态
// 2.SHUTDOWN状态但是任务队列非空
// 3.runState >= TIDYING 线程池已经停止了或在停止了
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
// 只能是以下情形会继续下面的逻辑:结束线程池。
// 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
// 2.STOP状态,当调用了shutdownNow方法
// workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
// 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
if (workerCountOf(c) != 0) { // Eligible to terminate
// runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
// ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 进入TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 子类重载:一些资源清理工作
terminated();
} finally {
// TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 继续awaitTermination
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
} shutdown
shutdown这个方***将runState置为SHUTDOWN,会终止所有空闲的线程。shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方***终止所有的线程。主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法:
他们的实现如下:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
advanceRunState(SHUTDOWN);
// 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
// tryTerminate方法中会保证队列中剩余的任务得到执行。
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// STOP状态:不再接受新任务且不再执行队列中的任务。
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 返回队列中还没有被执行的任务。
tasks = drainQueue();
}
finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,
// 因此保证了中断的肯定是空闲的线程。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
}
finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
// 初始化时state == -1
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
} 


京公网安备 11010502036488号