线程池的工作原理,以及拒绝策略,大家都很熟悉,下面主要讲一下线程池shutdown的原理,以及一些不常用操作的原理。
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
复制代码
启动有序关闭,在该关闭中执行先前提交的任务,但不接受任何新任务。如果已关闭,则调用不会产生任何其他影响。此方法不等待先前提交的任务完成执行。使用awaitTermination可以做到这一点。
advanceRunState
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
复制代码
将runState转换为给定状态,或者已经存在的状态比给定状态大时将直接返回。 循环使用CAS设置状态,设置成功返回。
interruptIdleWorkers
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
复制代码
因为work每次执行任务的时候都会先lock,完成任务后unlock, 如果tryLock可以成功说明work当前没有在执行任务。使用interrupt中断空闲的work线程。
tryTerminate
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
复制代码
shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
复制代码
尝试停止所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务的列表。从此方法返回后,这些任务将从任务队列中耗尽(删除)。此方法不等待主动执行的任务终止。除了尽最大努力尝试停止处理正在执行的任务之外,没有任何保证。此实现通过中断取消任务,因此任何无法响应中断的任务都可能永远不会终止。
interruptWorkers
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
复制代码
中断所有线程,即使处于活动状态也是如此。
drainQueue
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
复制代码
使用drainTo方法将任务队列写入新列表。但是,如果队列是DelayQueue或其他类型的队列,但poll或drainTo可能无法删除某些元素,则将它们逐个删除。
Worker
Worker主要维护运行任务线程的中断控制状态,以及其他次要簿记。此类扩展了AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁。我们实现了一个简单的非可重入互斥锁,而不是使用ReentrantLock,因为我们不希望辅助任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取该锁。另外,为了抑制直到线程真正开始运行任务之前的中断,我们将锁定状态初始化为负值,并在启动时将其清除(在runWorker中)。
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
复制代码
Work反复从队列中获取任务并执行它们,同时解决许多问题:
- 我们可能从一个初始任务开始,在这种情况下,我们不需要第一个。否则,只要池是 运行时,我们从getTask获得任务。如果返回null,则 Work由于池状态或配置参数更改而退出。其他退出是由于引发异常 外部代码,在这种情况下,completedAbruptly为true,其通常导致processWorkerExit替换此线程。
- 在运行任何任务之前,先获取锁以防止在执行任务时其他池中断,然后我们确保除非池正在停止,否则此线程没有它的interrupt set。
- 在每次运行任务之前,都要调用beforeExecute,可能会引发异常,在这种情况下,我们导致线程死亡(中断循环,使用completelyAbruptly为true)无需处理 任务。
- 假设beforeExecute正常完成,我们运行 任务,收集其抛出的任何异常以发送给afterExecute。 我们分别处理RuntimeException,Error(两者 规范保证我们可以捕获)和任意Throwables。 因为我们无法在Runnable.run中抛出Throwables,所以我们 将它们包装在错误的出路(到线程的 UncaughtExceptionHandler)。 任何抛出的异常也保守地导致线程死亡。
- task.run完成后,我们调用afterExecute,这可能也会引发异常,这也会导致线程 死。 根据JLS Sec 14.20,此例外是即使task.run抛出也将有效。
- 异常机制的net效果是afterExecute和线程的UncaughtExceptionHandler具有相同的精度 我们可以提供的有关以下方面遇到的任何问题的信息用户代码。
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate(); // shutdown状态时,每个工作线程完成工作后,终止线程池
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false); // 工作线程执行任务异常退出时,重新启动一个工作线程来完成任务
}
}
复制代码
线程池的状态
线程池的控制状态ctl是一个atomic integer。表示workCount和runState两个字段。
workerCount指示有效线程数。int的后29位表示有效线程数。
runState,指示是否正在运行,正在关闭等。int的前三位表示线程池的状态。
- RUNNING: 接受新任务并处理排队的任务
- SHUTDOWN: 不接受新任务,而是处理排队的任务
- STOP: 不接受新任务,不处理排队任务以及中断进行中的任务
- TIDYING: 所有任务已终止,workerCount为零,线程转换为状态TIDYING将运行Terminated()挂钩方法
- TERMINATED: terminated()执行完成
运行状态切换
- RUNNING -> SHUTDOWN: 在调用shutdown()时,可能隐式在finalize()中
- (RUNNING or SHUTDOWN) -> STOP: 调用shutdownNow()
- SHUTDOWN -> TIDYING: 队列和work pool为空
- STOP -> TIDYING: work pool为空
- TIDYING -> TERMINATED: terminated()执行完成
状态相关的一些代码
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private static final int COUNT_BITS = Integer.SIZE - 3; // 29,11101
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 29位,全是1,值 536870911
private static final int RUNNING = -1 << COUNT_BITS; // 32位,前三位是1, 值 -536870912
private static final int SHUTDOWN = 0 << COUNT_BITS; // 值 0
private static final int STOP = 1 << COUNT_BITS; // 30位,第一位是1,值 536870912
private static final int TIDYING = 2 << COUNT_BITS; // 31位,第一位是1,值 1073741824
private static final int TERMINATED = 3 << COUNT_BITS; // 31位,前两位是1, 值 1610612736
-1 // 11111111111111111111111111111111, 32位
0 // 0
1 // 1
2 // 10
3 // 11
~CAPACITY // 32位,前三位是1,11100000000000000000000000000000,值-536870912