什么是线程池
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
而本文描述线程池是JDK中提供的ThreadPoolExecutor类。
当然,使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
线程池的创建
创建线程池主要是ThreadPoolExecutor类来完成,ThreadPoolExecutor 的有许多重载的构造方法,通过参数最多的构造方法来理解创建线程池有哪些需要配置的参数。ThreadPoolExecutor 的构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
下面对参数进行说明:
- corePoolSize:表示核心线程池的大小。当提交一个任务时,如果当前核心线程池的线程个数没有达到 corePoolSize,则会创建新的线程来执行所提交的任务,即使当前核心线程池有空闲的线程。如果当前核心线程池的线程个数已经达到了 corePoolSize,则不再重新创建线程。如果调用了
prestartCoreThread()
或者prestartAllCoreThreads()
,线程池创建的时候所有的核心线程都会被创建并且启动。 - maximumPoolSize:表示线程池能创建线程的最大个数。如果当阻塞队列已满时,并且当前线程池线程个数没有超过 maximumPoolSize 的话,就会创建新的线程来执行任务。
- keepAliveTime:空闲线程存活时间。如果当前线程池的线程个数已经超过了 corePoolSize,并且线程空闲时间超过了 keepAliveTime 的话,就会将这些空闲线程销毁,这样可以尽可能降低系统资源消耗。
- unit:时间单位。为 keepAliveTime 指定时间单位。
- workQueue:阻塞队列。用于保存任务的阻塞队列,关于阻塞队列可以看这篇文章。可以使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue。
- threadFactory:创建线程的工程类。可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字,如果出现并发问题,也方便查找问题原因。
- handler:饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就需要采用一种策略来处理这种情况。采用的策略有这几种:
- AbortPolicy: 直接拒绝所提交的任务,并抛出RejectedExecutionException异常;
- CallerRunsPolicy:只用调用者所在的线程来执行任务;
- DiscardPolicy:不处理直接丢弃掉任务;
- DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务
线程池的运行状态
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
根据上面源码可知, COUNT_BITS
的值为29, CAPACITY
的值为2的29次方-1, 二进制表示为: "00011111111111111111111111111111"(明显29个1);
上面的源码中线程池的运行状态的二进制表示:
状态 | 二进制 | 意义 |
---|---|---|
RUNNING | 11100000000000000000000000000000 | 接受新execute的task, 执行已入队的task |
SHUTDOWN | 0 | 不接受新execute的task, 但执行已入队的task, 中断所有空闲的线程 |
STOP | 00100000000000000000000000000000 | 不接受新execute的task, 不执行已入队的task, 中断所有的线程 |
TIDYING | 01000000000000000000000000000000 | 所有线程停止, workerCount 数量为0, 将执行hook方法: terminated() |
TERMINATED | 01100000000000000000000000000000 | terminated()方法执行完毕 |
可以看出, 线程池的状态由32位int
整型的二进制的前三位表示.
核心属性ctl源码(线程池状态和有效线程数)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
核心属性ctl
, 数据类型是AtomicInteger
, 表示了两个含义:
- 线程池运行状态(
runState
) - 线程池中的有效线程数(
workerCount
)
那是如何做到一个属性表示两个含义的呢? 那就要看看ctlOf
方法
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctlOf
方法在线程池内部用来更新线程池的ctl
属性,比如ctl
初始化的时候: ctl = new AtomicInteger(ctlOf(RUNNING, 0))
, 调用ThreadPoolExecutor#shutdown
方法等;
rs
表示runState
, wc
表示workerCount
;
将 runState
和workerCount
做按位或运算得到ctl
的值;
而runState
和workerCount
的值由下面两个方法packing和unpacking, 这里的形参c
就是ctl.get()
的值;
// Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; }
下面用表格更清晰理解:
方法 | 方法体 | 带入CAPACITY的值 |
---|---|---|
runStateOf | c & ~CAPACITY | c & 11100000000000000000000000000000 |
workerCountOf | c & CAPACITY | c & 00011111111111111111111111111111 |
按位与运算, 相同位置, 同1才为1, 其余为0;
结合表格看, runStateOf
方法取ctl
前3位表示runState
, workerCountOf
方法取第4~32位的值表示workerCount
;
在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个改而另一个没有改的情况,如果将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的。
代码分析
java.util.concurrent.ThreadPoolExecutor#execute:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. *1.判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程, * 如果能完成新线程创建exexute方法结束,成功提交任务 * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 2.在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态 * 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要 * reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程; * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 3.如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池 * 已经达到饱和状态,所以reject这个他任务 */ int c = ctl.get(); // 1.如果当前有效线程数小于核心线程数,调用addWorker执行任务(即创建一条线程执行该任务) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果当前有效线程大于等于核心线程数,并且当前线程池状态为运行状态,则将任务添加到阻塞队列中,等待空闲线程取出队列workQueue执行 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command);// 移除成功,拒绝该非运行的任务 else if (workerCountOf(recheck) == 0) // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); } // 3.如果阻塞队列已满,则调用addWorker执行任务(即创建一条线程执行该任务) else if (!addWorker(command, false)) reject(command);// 如果创建线程失败,则调用线程拒绝策略 }
画一个简单的图:
上图中的worker可简单理解为线程池中的一个线程, workers.size()
即使线程池中的线程数;
- 当
workers.size()
小于corePoolSize
时, 创建新的线程执行提交的task. - 当
workers.size()
大于corePoolSize
时, 并且workQueue
没有满, 将task添加到workQueue
. - 当
workers.size()
大于corePoolSize
时, 并且workQueue
已经满了, 但是workers.size()<maximumPoolSize, 就创建一个空闲线程处理task.
当workers.size()大于corePoolSize时, 并且workQueue已经满了, 但是workers.size()>=maximumPoolSize, 执行拒绝策略.
java.util.concurrent.ThreadPoolExecutor#addWorker:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果rs大于SHUTDOWN,则说明此时线程池不在接受新任务了 // 如果rs等于SHUTDOWN,同时满足firstTask为空,且阻塞队列如果有任务,则继续执行任务 // 也就说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 如果有效线程数大于等于线程池所容纳的最大线程数(基本不可能发生),不能添加任务 // 或者有效线程数大于等于当前限制的线程数,也不能添加任务 // 限制线程数量有任务是否要核心线程执行决定,core=true使用核心线程执行任务 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // 使用CAS增加有效线程数量 break retry; c = ctl.get(); // 如果再次获取ctl变量值 if (runStateOf(c) != rs)// 再次对比运行状态,如果不一致,再次循环执行,说明此处有其他并发操作,修改了State continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 任务是否已执行 boolean workerStarted = false; // 任务是否已添加 boolean workerAdded = false; // 任务包装类,我们的任务都需要添加到Worker中 Worker w = null; try { w = new Worker(firstTask); // 获取Worker中的Thread值 final Thread t = w.thread; if (t != null) { // 操作workers HashSet 数据结构需要同步加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取当前线程池的运行状态 int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING状态; // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 // rs是RUNNING状态时,直接创建线程执行任务 // 当rs等于SHUTDOWN时,并且firstTask为空,也可以创建线程执行任务,也就说明了SHUTDOWN状态时不再接受新任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 启动线程执行任务 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } 这里特别强调,firstTask是开启线程执行的首个任务,之后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,需要注意。 创建一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中,这里需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁。
如何执行任务?
我们注意到上面的代码中:
// 启动线程执行任务 if (workerAdded) { t.start(); workerStarted = true; }
这里的t是w.thread得到的,即是Worker中用于执行任务的线程,该线程由ThreadFactory创建,我们再看看生成Worker的构造方法:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
newThread传的参数是Worker本身,而Worker实现了Runnable接口,所以当我们执行t.start()时,执行的是Worker的run()方法,找到入口了:
java.util.concurrent.ThreadPoolExecutor.Worker#run:
public void run() { runWorker(this); }
java.util.concurrent.ThreadPoolExecutor#runWorker:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 循环从workQueue阻塞队列中获取任务并执行 while (task != null || (task = getTask()) != null) { // 加同步锁的目的是为了防止同一个任务出现多个线程执行的问题 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 如果线程池正在关闭,须确保中断当前线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行任务前可以做一些操作,交由子类实现 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { // 将task置为空,让线程自行调用getTask()方法从workQueue阻塞队列中获取任务 task = null; // 记录Worker执行了多少次任务,用于统计 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 线程回收过程 processWorkerExit(w, completedAbruptly); //将task置为空,目的是为了让线程自行调用getTask()方法从workQueue阻塞队列中获取任务。 } }
如何保证核心线程不被销毁?
我们之前已经知道线程池中可维持corePoolSize数量的常驻核心线程,那么它们是如何保证执行完任务而不被线程池回收的呢?在前面的章节中你可能已经到从workQueue队列中会阻塞式地获取任务,如果没有获取任务,那么就会一直阻塞下去
java.util.concurrent.ThreadPoolExecutor#getTask:
private Runnable getTask() { // 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP // 则CAS操作减少工作线程数量,并且返回null,线程被回收 // 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount();// CAS操作将线程池中的线程数量减一 return null; } // 获取线程池中的有效线程数量 int wc = workerCountOf(c); // Are workers subject to culling? // 如果开发者主动开启allowCoreThreadTimeOut并且获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的 // allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收 // 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这里说明了两点销毁线程的条件: // 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize, // 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了 // 以上两点满足其一,都可以触发线程超时回收 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 尝试用CAS将线程池线程数量减一 if (compareAndDecrementWorkerCount(c)) // 减一成功后返回null,线程被回收 return null; // 否则循环重试 continue; } try { // 如果timed为true,阻塞超时获取任务,否则阻塞获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 如果poll超时获取任务超时了, 将timeOut设置为true // 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } 实现思路大致是: 将timedOut超时标记默认设置为false; 计算timed的值,该值决定了线程的生死大权,(timed && timedOut) 即是线程超时回收的条件之一,需要注意的是第一次(timed && timedOut) 为false,因为timedOut默认值为false,此时还没到poll超时获取的操作; 根据timed值来决定是用阻塞超时获取任务还是阻塞获取任务,如果用阻塞超时获取任务,超时后timedOut会被设置为true,接着继续循环,若(timed && timedOut) 为true,满足线程超时回收。
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 正常的话再runWorker的getTask方法workerCount已经被减一了 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 累加线程的completedTasks completedTaskCount += w.completedTasks; // 从线程池中移除超时或者出现异常的线程 workers.remove(w); } finally { mainLock.unlock(); } // 尝试停止线程池 tryTerminate(); int c = ctl.get(); // runState为RUNNING或SHUTDOWN if (runStateLessThan(c, STOP)) { // 线程不是异常结束 if (!completedAbruptly) { // 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务 if (min == 0 && !workQueue.isEmpty()) min = 1; // 线程池还不为空那就不用担心了 if (workerCountOf(c) >= min) return; // replacement not needed } // 1.线程异常退出 // 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理 addWorker(null, false); } }
tryTerminate
processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少wokerCount或SHUTDOWN状态下从队列中移除任务。
final void tryTerminate() { for (;;) { int c = ctl.get(); // 以下状态直接返回: // 1.线程池还处于RUNNING状态 // 2.SHUTDOWN状态但是任务队列非空 // 3.runState >= TIDYING 线程池已经停止了或在停止了 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return; // 只能是以下情形会继续下面的逻辑:结束线程池。 // 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了 // 2.STOP状态,当调用了shutdownNow方法 // workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态 // 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。 if (workerCountOf(c) != 0) { // Eligible to terminate // runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。 // ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 进入TIDYING状态 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 子类重载:一些资源清理工作 terminated(); } finally { // TERMINATED状态 ctl.set(ctlOf(TERMINATED, 0)); // 继续awaitTermination termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS }
shutdown
shutdown这个方***将runState置为SHUTDOWN,会终止所有空闲的线程。shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方***终止所有的线程。主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法:
他们的实现如下:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回 advanceRunState(SHUTDOWN); // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit → // tryTerminate方法中会保证队列中剩余的任务得到执行。 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // STOP状态:不再接受新任务且不再执行队列中的任务。 advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 返回队列中还没有被执行的任务。 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock, // 因此保证了中断的肯定是空闲的线程。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; // 初始化时state == -1 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }