前言
Java 中线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。合理的使用线程池可以带来多个好处:
(1)降低资源消耗。通过重复利用已创建的线程降低线程在创建和销毁时造成的消耗。
(2)提高响应速度。当处理执行任务时,任务可以不需要等待线程的创建就能立刻执行。
(3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
1、线程池的实现原理
线程池的处理流程如上图所示
线程池中通过 ctl 字段来表示线程池中的当前状态,主池控制状态 ctl 是 AtomicInteger 类型,包装了两个概念字段:workerCount 和 runState,workerCount 表示有效线程数,runState 表示是否正在运行、正在关闭等状态。使用 ctl 字段表示两个概念,ctl 的前 3 位表示线程池状态,线程池中限制 workerCount 为(2^29 )-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个线程。workerCount 是允许启动和不允许停止的工作程序的数量。该值可能与实际的活动线程数暂时不同,例如,当 ThreadFactory 在被询问时未能创建线程时,以及退出线程在终止前仍在执行记时。用户可见的池大小报告为工作集的当前大小。 runState 提供主要的生命周期控制,取值如下表所示:
字段名 | 含义 |
---|---|
RUNNING | 接受新任务并处理排队任务 |
SHUTDOWN | 不接受新任务,但处理排队任务 |
STOP | 不接受新任务,不处理排队任务,并中断正在进行的任务 |
TIDYING | 所有任务都已终止,workerCount 为零,转换到状态 TIDYING 的线程将运行 terminate() 方法 |
TERMINATED | terminate() 方法执行完成 |
runState 随着时间的推移而改变,在 awaitTermination() 方法中等待的线程将在状态达到 TERMINATED 时返回。状态的转换为:
RUNNING -> SHUTDOWN 在调用 shutdown() 时,可能隐含在 finalize() 中
(RUNNING 或 SHUTDOWN)-> STOP 在调用 shutdownNow() 时
SHUTDOWN -> TIDYING 当队列和线程池都为空时
STOP -> TIDYING 当线程池为空时
TIDYING -> TERMINATED 当 terminate() 方法完成时
开发人员如果需要在线程池变为 TIDYING 状态时进行相应的处理,可以通过重载 terminated() 函数来实现。
结合上图说明线程池 ThreadPoolExecutor 执行流程,使用 execute() 方法提交任务到线程池中执行时分为4种场景:
(1)线程池中运行的线程数量小于 corePoolSize,创建新线程来执行任务。
(2)线程池中运行线程数量不小于 corePoolSize,将任务加入到阻塞队列 BlockingQueue。
(3)如果无法将任务加入到阻塞队列(队列已满),创建新的线程来处理任务(这里需要获取全局锁)。
(4)当创建新的线程数量使线程池中当前运行线程数量超过 maximumPoolSize,线程池中拒绝任务,调用 RejectedExecutionHandler.rejectedExecution() 方法处理。
源码分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果线程数小于基本线程数,创建线程执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程数不小于基本线程数,将任务添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列已满,创建新的线程去处理
else if (!addWorker(command, false))
//执行拒绝策略
reject(command);
}
复制代码
线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。
2、线程池的创建和使用
创建线程池
创建线程池之前,首先要知道创建线程池中的核心参数:
corePoolSize(核心线程数大小):当提交任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于核心线程数时就不再创建。
runnableTaskQueue(任务队列):用于保存等待执行任务的阻塞队列。一般选择以下几种:
ArrayBlockingQueue:基于数组的有界阻塞队列,按照 FIFO 原则对元素进行排序。
LinkedBlockingQueue:基于链表的阻塞队列,按照 FIFO 原则对元素进行排序。
SynchronousQueue:同步阻塞队列,也是不存储元素的阻塞队列。每一个插入操作必须要等到另一个 线程调用移除操作,否则插入操作一直处于阻塞状态。
PriorityBlockingQueue:优先阻塞队列,一个具有优先级的无限阻塞队列。
maximumPoolSize(最大线程数大小):线程池允许创建的最大线程数,当队列已满,并且线程池中的线程数小于最大线程数,则线程池会创建新的线程执行任务。当使用无界队列时,此参数无用。
RejectedExecutionHandler(拒绝策略):当任务队列和线程池都满了,说明线程池处于饱和状态,那么必须使用拒绝策略来处理新提交的任务。JDK 内置拒绝策略有以下 4 种:
AbortPolicy:直接抛出异常
CallerRunsPolicy:使用调用者所在的线程来执行任务
DiscardOldestPolicy:丢弃队列中最近的一个任务来执行当前任务
DiscardPolicy:直接丢弃不处理
可以根据应用场景来实现 RejectedExecutionHandler 接口自定义处理策略。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
复制代码
keepAliveTime(线程存活时间):线程池的工作线程空闲后,保持存活的时间。
TimeUnit(存活时间单位):可选单位DAYS(天)、HOURS(小时)、MINUTES(分钟)、MILLISECONDS(毫秒)、MICROSECONDS(微妙)、NANOSECONDS(纳秒)。
ThreadFactory(线程工厂):可以通过线程工厂给创建出来的线程设置有意义的名字。
创建线程池主要分为两大类,第一种是通过 Executors 工厂类创建线程池,第二种是自定义创建线程池。根据《阿里java开发手册》中的规范,线程池不允许使用 Executors 去创建,原因是规避资源耗尽的风险。
使用 Executors 工厂类创建
创建一个单线程化的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
创建固定线程数的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
以上两种创建线程池方式使用链表阻塞队列来存放任务,实际场景中可能会堆积大量请求导致 OOM
创建可缓存线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
允许创建的线程数量最大为 Integer.MAX_VALUE,当创建大量线程时会导致 CPU 处于重负载状态和 OOM 的发生
自定义创建线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
复制代码
向线程池提交任务
向线程池提交任务可以使用两个方法,分别为 execute() 和 submit()。
execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute() 方法中传入的是 Runnable 类的实例。
public static void main(String [] args){
...
threadPool.execute(new Runnable{
public void run(){
//do something...
}
});
...
}
复制代码
submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get() 方法来获取返回值。get() 方***阻塞当前线程直到任务完成,使用 get(long timeout, TimeUnit unit)方***阻塞当前线程一段时间后立即返回,这时候可能任务没有执行完。
public static void main(String [] args){
...
Future<Object> future = threadPool.submit(handleTask);
try{
Objects res = future.get();
}catch(InterruptedException e){
//处理中断异常
}catch(ExecutionException e){
//处理无法执行异常
}finally{
threadPool.shutdown();
}
...
}
复制代码
关闭线程池
可以通过调用线程池的 shutdown() 或shutdownNow() 方法来关闭线程池。他们的原理是遍历线程池中的工作线程,然后逐个调用 interrupt() 方法来中断线程,所以无法响应中断任务可能永远无法终止。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
复制代码
shutdown() 和 shutdownNow() 方法的区别在于 shutdownNow 方法首先将线程池的状态设置为 STOP,然后尝试停止正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
//shutdownNow()
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
...
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
复制代码
//shutdown()
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
...
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
复制代码
3、线程池参数设置推荐
线程池使用面临的核心的问题在于:线程池的参数并不好配置。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考。
(1)以任务型为参考的简单评估:
假设线程池大小的设置(N 为 CPU 的个数)
如果纯计算的任务,多线程并不能带来性能提升,因为 CPU 处理能力是稀缺的资源,相反导致较多的线程切换的花销,此时建议线程数为 CPU 数量或+1;----为什么+1?因为可以防止 N 个线程中有一个线程意外中断或者退出,CPU 不会空闲等待。
如果是 IO 密集型应用, 则线程池大小设置为 2N+1. 线程数 = CPU 核数 × 目标 CPU 利用率 ×(1 + 平均等待时间 / 平均工作时间)
(2)以任务数为参考的理想状态评估:
1)默认值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
复制代码
2)如何设置 * 需要根据相关值来决定 - tasks :每秒的任务数,假设为500~1000 - taskCost:每个任务花费时间,假设为0.1s - responsetime:系统允许容忍的最大响应时间,假设为1s
- 计算获取
-
corePoolSize = 每秒需要多少个线程处理?
- threadcount = tasks / ( 1 / taskCost ) =tasks*taskcout = (5001000)x0.1 = 50100 个线程。corePoolSize 设置应该大于 50
- 根据 8020 原则,如果 80% 的每秒任务数小于800,那么 corePoolSize 设置为80 即可
-
queueCapacity = ( coreSizePool / taskCost ) * responsetime
- 计算可得 queueCapacity = 80/0.1*1 = 800。意思是队列里的线程可以等待 1s,超过了的需要新开线程来执行
- 切记不能设置为 Integer.MAX_VALUE,这样队列会很大,线程数只会保持在 corePoolSize 大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
-
maxPoolSize = (max(tasks) - queueCapacity) / ( 1 / taskCost)
- 计算可得 maxPoolSize = (1000-800)/10 = 20 (+50)
- (最大任务数-队列容量)/ 每个线程每秒处理能力 = 最大线程数
-
rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
-
keepAliveTime 和 allowCoreThreadTimeout 采用默认通常能满足
-
以上都为理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器 cpu load 已经满了,则需要通过升级硬件和优化代码,降低 taskCost 来处理。
(仅为简单的理想状态的评估,可作为线程池参数设置的一个参考)
4、线程池使用场景推荐
场景一
与主业务无直接数据依赖的从业务可以使用异步线程池来处理,在项目初始化时创建线程池并交给将从业务中的任务提交给异步线程池执行能够缩短响应时间。
/**
* 创建线程池使用异步注解方式调用
*/
@Bean
public ThreadPoolTaskExecutor asyncExecutorPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
/**配置核心线程数*/
executor.setCorePoolSize(20);
/**配置最大线程数*/
executor.setMaxPoolSize(100);
/**配置队列大小*/
executor.setQueueCapacity(500);
/**等待任务在关机时完成--表明等待所有线程执行完*/
executor.setWaitForTasksToCompleteOnShutdown(true);
/** 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止*/
executor.setAwaitTerminationSeconds(60);
/**配置线程池中的线程的名称前缀*/
executor.setThreadNamePrefix("test-async-thread-");
/**rejection-policy:当pool已经达到max size的时候,如何处理新任务(CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行)*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
/**初始化执行器*/
executor.initialize();
return executor;
}
//使用@Async异步注解
@Async("asyncExecutorPool")
public void processTask1() {
//doSomething
}
复制代码
严禁在业务代码中起线程!!!
/**
* 创建线程池直接使用
*/
public class ThreadPoolExecutorTest{
private ThreadPoolExecutor executor;
@PostConstruct
public void init() {
/** 线程池初始化 */
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(30, 60, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200));
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
ThreadFactory threadFactory = new CustomizableThreadFactory("test-creates-thread-");
threadPoolExecutor.setThreadFactory(threadFactory);
this.executor = threadPoolExecutor;
}
...
public void processTask(){
Future<> future = executor.submit(
//doSomething...
);
}
}
复制代码
场景二
当任务需要按照指定顺序(FIFO, LIFO, 优先级)执行时,推荐创建使用单线程化的线程池。
public class SingleExecutorTest{
private HashMap<Long,ThreadPoolExecutor> executorMap = new HashMap<>();
...
public void init() {
/** 线程池初始化 */
for (int i = 0; i < 5; i++) {
/**任务队列容量:1000*/
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1, 1,0,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
/**拒绝策略:静默丢弃,不抛异常*/
threadPoolExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.DiscardPolicy());
ThreadFactory threadFactory = new CustomizableThreadFactory("testSingle-"+ i +"-");
threadPoolExecutor.setThreadFactory(threadFactory);
executorMap.put(Long.valueOf(i),threadPoolExecutor);
}
}
...
/** 需要顺序执行的任务 */
public void processTask(){
...
/** 获取单一线程池 */
ThreadPoolExecutor executor = executorMap.get(Long.valueOf(id % 5));
/** 向线程池中提交任务 */
excutor.submit(
//doSomething...
);
}
}
复制代码
总结
本文章主要说明了线程池的执行原理和创建方式以及推荐线程池参数设置和一般使用场景。在开发中,开发人员需要根据业务来合理的创建和使用线程池达到降低资源消耗,提高响应速度的目的。
原文链接:https://juejin.cn/post/7067324722811240479