使用线程池是为了减少创建和销毁线程带来的资源消耗。线程池适用于单任务处理时间短且任务量较大的场景。
线程池核心参数
参数 |
说明 |
int corePoolSize |
核心线程数 |
int maximumPoolSize |
最大线程数 |
long keepAliveTime |
非核心线程最大可空闲时间 |
TimeUnit unit |
非核心线程最大可空闲时间单位 |
BlockingQueue<Runnable> workQueue |
阻塞队列 |
ThreadFactory threadFactory |
线程工厂 |
RejectedExecutionHandler handler |
拒绝策略 |
运行过程
- 初始状态,每来一个任务都会创建一个核心线程,直到线程数等于
corePoolSize
。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
- 核心线程达到最大时,会将新任务加入到阻塞队列中,核心线程会从队列中取任务。
if (isRunning(c) && workQueue.offer(command)) {
// ...
}
- 阻塞队列满时,再创建非核心线程来执行新任务。
else if (!addWorker(command, false))
- 线程数达到最大后,还有新任务提交进来,就执行拒绝策略。
线程池状态

- RUNNING
初始状态,能够接收新任务,执行新任务
- SHUTDOWN
调用shutdown方法时会变成该状态,该状态下不再接收新任务,但是可以将已提交的任务执行完。
- STOP
调用shutdownNow方法时会变成该状态, 该状态下不接收新任务,不处理已提交的任务且会中断正在执行的任务。
- TIDYING
所有任务已终止且workCount=0时,线程池会进入该状态。
- TERMINATED
线程池彻底终止状态。
拒绝策略

策略 |
说明 |
AbortPolicy |
抛RejectedExecutionException 异常 |
CallerRunsPolicy |
使用调用者线程执行任务 |
DiscardPolicy |
直接丢弃任务 |
DiscardOldestPolicy |
丢弃等待最久的任务,即队列最前面的任务,并继续执行新任务 |
Worker
private boolean addWorker(Runnable firstTask, boolean core) {
//...
boolean workerAdded = false;
Worker w = null;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//...
workers.add(w);
workerAdded = true;
//...
if (workerAdded) {
// 线程启动后会执行worker的run方法
t.start();
workerStarted = true;
}
}
}
// 构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 创建线程时将worker本身传递进去了。
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 取得任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 线程重用关键逻辑
// 循环执行当前任务 或 从队列取任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// ...
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
// ..
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// ...
// 允许空闲核心线程被销毁或者线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
// 返回null时,工作线程会结束并销毁
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程数设置建议
- 对于CPU密集型任务,CPU闲置时间较少,设置为CPU核数+1
- 对于IO密集型任务,CPU闲置时间相对较多,设置为2*CPU核数+1
但是实际场景还是需要根据压测结果来设置的
线程池执行异常时,工作线程如何表现
- submit方式提交任务时,任务会被
FutureTask
包装一下,发生异常时,不会对外抛出异常,所以线程不受影响。
- execute方式提交的任务未被包装,所以发生异常时,线程会退出,但是线程池会重新创建新的线程。
