前言
限于平台篇幅原因,只好拆成两篇来写,没看过上篇文章的朋友可以点击博主主页查看
3.工作线程的实现
从addWorker()方法的实现可以看出,工作线程的创建和启动都跟ThreadPoolExecutor中的内部类Worker有关。下面我们分析Worker类来看一下工作线程的实现。
Worker类继承自AQS类,具有锁的功能;实现了Runable接口,可以将自身作为一个任务在线程中执行。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
Worker的主要字段就下面三个,代码也比较简单。
//用来封装worker的线程,线程池中真正运行的线程,通过线程工厂创建而来
final Thread thread;
//worker所对应的第一个任务,可能为空
Runnable firstTask;
//记录当前线程完成的任务数
volatile long completedTasks;
Worker的构造函数如下。
Worker(Runnable firstTask) {
//设置AQS的state为-1,在执行runWorker()方法之前阻止线程中断
setState(-1);
//初始化第一个任务
this.firstTask = firstTask;
//利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this
//也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法
this.thread = getThreadFactory().newThread(this);
}
Worker类继承了AQS类,重写了其相应的方法,实现了一个自定义的同步器,实现了不可重入锁。
//是否持有独占锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
//尝试获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
//设置独占线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁
protected boolean tryRelease(int unused) {
//设置独占线程为null
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//获取锁
public void lock() { acquire(1); }
//尝试获取锁
public boolean tryLock() { return tryAcquire(1); }
//释放锁
public void unlock() { release(1); }
//是否持有锁
public boolean isLocked() { return isHeldExclusively(); }
Worker类还提供了一个中断线程thread的方法。
void interruptIfStarted() {
Thread t;
//AQS状态大于等于0,worker对应的线程不为null,且该线程没有被中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
再来看一下Worker类的run()方法的实现,会发现run()方法最终调用了ThreadPoolExecutor类的runWorker()方法。
public void run() {
runWorker(this);
}
4.线程复用机制
通过上文可以知道,worker中的线程start 后,执行的是worker的run()方法,而run()方法最终会调用ThreadPoolExecutor类的runWorker()方法,runWorker()方法实现了线程池中的线程复用机制。下面我们来看一下runWorker()方法的实现。
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取w的firstTask
Runnable task = w.firstTask;
//设置w的firstTask为null
w.firstTask = null;
// 释放锁,设置AQS的state为0,允许中断
w.unlock();
//用于标识线程是否异常终止,finally中processWorkerExit()方***有不同逻辑
boolean completedAbruptly = true;
try {
//循环调用getTask()获取任务,不断从任务缓存队列获取任务并执行
while (task != null || (task = getTask()) != null) {
//进入循环内部,代表已经获取到可执行的任务,则对worker对象加锁,保证线程在执行任务过程中不会被中断
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || //若线程池状态大于等于STOP,那么意味着该线程要中断
(Thread.interrupted() && //线程被中断
runStateAtLeast(ctl.get(), STOP))) && //且是因为线程池内部状态变化而被中断
!wt.isInterrupted()) //确保该线程未被中断
//发出中断请求
wt.interrupt();
try {
//开始执行任务前的Hook方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//到这里正式开始执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务后的Hook方法
afterExecute(task, thrown);
}
} finally {
//置空task,准备通过getTask()获取下一个任务
task = null;
//completedTasks递增
w.completedTasks++;
//释放掉worker持有的独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//到这里,线程执行结束,需要执行结束线程的一些清理工作
//线程执行结束可能有两种情况:
//1.getTask()返回null,也就是说,这个worker的使命结束了,线程执行结束
//2.任务执行过程中发生了异常
//第一种情况,getTask()返回null,那么getTask()中会将workerCount递减
//第二种情况,workerCount没有进行处理,这个递减操作会在processWorkerExit()中处理
processWorkerExit(w, completedAbruptly);
}
}
runWorker()方法是线程池的核心,实现了线程池中的线程复用机制,来看一下
runWorker()方法都做了哪些工作:
- 运行第一个任务firstTask之后,循环调用getTask()方法获取任务,不断从任务缓存队列获取任务并执行;
- 获取到任务之后就对worker对象加锁,保证线程在执行任务的过程中不会被中断,任务执行完会释放锁;
- 在执行任务的前后,可以根据业务场景重写beforeExecute()和afterExecute()等Hook方法;
- 执行通过getTask()方法获取到的任务
- 线程执行结束后,调用processWorkerExit()方法执行结束线程的一些清理工作
从runWorker()方法的实现可以看出,runWorker()方法中主要调用了getTask()方法和processWorkerExit()方法,下面分别看一下这两个方法的实现。
getTask()的实现
getTask()方法用来不断地从任务缓存队列获取任务并交给线程执行,下面分析一下其实现。
private Runnable getTask() {
//标识当前线程是否超时未能获取到task对象
boolean timedOut = false;
for (;;) {
//获取线程池的控制状态
int c = ctl.get();
//获取线程池的运行状态
int rs = runStateOf(c);
//如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递减,方法返回null,回收线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取worker数量
int wc = workerCountOf(c);
//标识当前线程在空闲时,是否应该超时回收
// 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果worker数量大于maximumPoolSize(有可能调用了 setMaximumPoolSize(),导致worker数量大于maximumPoolSize)
if ((wc > maximumPoolSize || (timed && timedOut)) //或者获取任务超时
&& (wc > 1 || workQueue.isEmpty())) { //workerCount大于1或者阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个工作线程)
if (compareAndDecrementWorkerCount(c))
//线程池工作线程数量递减,方法返回null,回收线程
return null;
//线程池工作线程数量递减失败,跳过剩余部分,继续循环
continue;
}
try {
//如果允许超时回收,则调用阻塞队列的poll(),只在keepAliveTime时间内等待获取任务,一旦超过则返回null
//否则调用take(),如果队列为空,线程进入阻塞状态,无限时等待任务,直到队列中有可取任务或者响应中断信号退出
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//若task不为null,则返回成功获取的task对象
if (r != null)
return r;
// 若返回task为null,表示线程空闲时间超时,则设置timeOut为true
timedOut = true;
} catch (InterruptedException retry) {
//如果此worker发生了中断,采取的方案是重试,没有超时
//在哪些情况下会发生中断?调用setMaximumPoolSize(),shutDown(),shutDownNow()
timedOut = false;
}
}
}
接下来总结一下getTask()方***在哪些情况下返回:
- 线程池处于RUNNING状态,阻塞队列不为空,返回成功获取的task对象
- 线程池处于SHUTDOWN状态,阻塞队列不为空,返回成功获取的task对象
- 线程池状态大于等于STOP,返回null,回收线程
- 线程池处于SHUTDOWN状态,并且阻塞队列为空,返回null,回收线程
- worker数量大于maximumPoolSize,返回null,回收线程
- 线程空闲时间超时,返回null,回收线程
processWorkerExit()的实现
processWorkerExit()方法负责执行结束线程的一些清理工作,下面分析一下其实现。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果用户任务执行过程中发生了异常,则需要递减workerCount
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//将worker完成任务的数量累加到总的完成任务数中
completedTaskCount += w.completedTasks;
//从workers集合中移除该worker
workers.remove(w);
} finally {
//释放锁
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
//获取线程池控制状态
int c = ctl.get();
if (runStateLessThan(c, STOP)) { //线程池运行状态小于STOP
if (!completedAbruptly) { //如果用户任务执行过程中发生了异常,则直接调用addWorker()方法创建线程
//是否允许核心线程超时
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//允许核心超时并且workQueue阻塞队列不为空,那线程池中至少有一个工作线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果工作线程数量workerCount大于等于核心池大小corePoolSize,
//或者允许核心超时并且workQueue阻塞队列不为空时,线程池中至少有一个工作线程,直接返回
if (workerCountOf(c) >= min)
return;
//若不满足上述条件,则调用addWorker()方法创建线程
}
//创建新的线程取代当前线程
addWorker(null, false);
}
}
processWorkerExit()方法中主要调用了tryTerminate()方法,下面看一下tryTerminate()方法的实现。
final void tryTerminate() {
for (;;) {
//获取线程池控制状态
int c = ctl.get();
if (isRunning(c) || //线程池的运行状态为RUNNING
runStateAtLeast(c, TIDYING) || //线程池的运行状态大于等于TIDYING
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //线程池的运行状态为SHUTDOWN且阻塞队列不为空
//不能终止,直接返回
return;
//只有当线程池的运行状态为STOP,或线程池运行状态为SHUTDOWN且阻塞队列为空时,可以执行到这里
//如果线程池工作线程的数量不为0
if (workerCountOf(c) != 0) {
//仅仅中断一个空闲的worker
interruptIdleWorkers(ONLY_ONE);
return;
}
//只有当线程池工作线程的数量为0时可以执行到这里
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS操作设置线程池运行状态为TIDYING,工作线程数量为0
try {
//执行terminated()钩子方法
terminated();
} finally {
//设置线程池运行状态为TERMINATED,工作线程数量为0
ctl.set(ctlOf(TERMINATED, 0));
//唤醒在termination条件上等待的所有线程
termination.signalAll();
}
return;
}
} finally {
//释放锁
mainLock.unlock();
}
//若CAS操作失败则重试
}
}
tryTerminate()方法的作用是尝试终止线程池,它会在所有可能终止线程池的地方被调用,满足终止线程池的条件有两个:首先,线程池状态为STOP,或者为SHUTDOWN且任务缓存队列为空;其次,工作线程数量为0。
满足了上述两个条件之后,tryTerminate()方法获取全局锁,设置线程池运行状态为TIDYING,之后执行terminated()钩子方法,最后设置线程池状态为TERMINATED。
至此,线程池运行状态变为TERMINATED,工作线程数量为0,workers已清空,且workQueue也已清空,所有线程都执行结束,线程池的生命周期到此结束。
5.关闭线程池
关闭线程池有两个方法,shutdown()和shutdownNow(),下面分别看一下这两个方法的实现。
shutdown()的实现
shutdown()方法将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//检查shutdown权限
checkShutdownAccess();
//设置线程池运行状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有空闲worker
interruptIdleWorkers();
//用onShutdown()钩子方法
onShutdown();
} finally {
//释放锁
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
shutdown()方法首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后中断所有空闲的worker,再调用onShutdown()钩子方法,最后尝试终止线程池。
shutdown()方法调用了interruptIdleWorkers()方法中断所有空闲的worker,其实现如下。
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//onlyOne标识是否只中断一个线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//遍历workers集合
for (Worker w : workers) {
//worker对应的线程
Thread t = w.thread;
//线程未被中断且成功获得锁
if (!t.isInterrupted() && w.tryLock()) {
try {
//发出中断请求
t.interrupt();
} catch (SecurityException ignore) {
} finally {
//释放锁
w.unlock();
}
}
//若只中断一个线程,则跳出循环
if (onlyOne)
break;
}
} finally {
//释放锁
mainLock.unlock();
}
}
shutdownNow()的实现
shutdownNow()方法将线程池运行状态设置为STOP,此时线程池不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//检查shutdown权限
checkShutdownAccess();
//设置线程池运行状态为STOP
advanceRunState(STOP);
//中断所有worker
interruptWorkers();
//将任务缓存队列中等待执行的任务取出并放到list中
tasks = drainQueue();
} finally {
//释放锁
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
//返回任务缓存队列中等待执行的任务列表
return tasks;
}
shutdownNow()方法与shutdown()方法相似,不同之处在于,前者设置线程池的运行状态为STOP,之后中断所有的worker(并非只是空闲的worker),尝试终止线程池之后,返回任务缓存队列中等待执行的任务列表。
shutdownNow()方法调用了interruptWorkers()方法中断所有的worker(并非只是空闲的worker),其实现如下。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
//获取全局锁
mainLock.lock();
try {
//遍历workers集合
for (Worker w : workers)
//调用Worker类的interruptIfStarted()方法中断线程
w.interruptIfStarted();
} finally {
//释放锁
mainLock.unlock();
}
}
五.总结
至此,我们已经阅读了线程池框架的核心类ThreadPoolExecutor类的大部分源码,由衷地赞叹这个类很多地方设计的巧妙之处:
- 将线程池的运行状态和工作线程数量打包在一起,并使用了大量的位运算
- 使用CAS操作更新线程控制状态ctl,确保对ctl的更新是原子操作
- 内部类Worker类继承了AQS,实现了一个自定义的同步器,实现了不可重入锁
- 使用while循环自旋地从任务缓存队列中获取任务并执行,实现了线程复用机制
- 调用interrupt()方法中断线程,但注意该方法并不能直接中断线程的运行,只是发出了中断信号,配合BlockingQueue的take(),poll()方法的使用,打断线程的阻塞状态
其实,线程池的本质就是生产者消费者模式,线程池的调用者不断向线程池提交任务,线程池里面的工作线程不断获取这些任务并执行(从任务缓存队列获取任务或者直接执行任务)。
读完本文,相信大家对线程池的实现原理有了深刻的认识,比如向线程池提交一个任务之后线程池的执行流程,一个任务从被提交到被执行会经历哪些过程,一个工作线程从被创建到正常执行到执行结束的执行过程,等等。