线程池
背景
- 池化技术:事先准备好一些资源,等待用的时候拿、不用的时候还回来。也就是线程复用
- 好处:降低资源的消耗(减少资源对象的创建、销毁次数),提高程序的性能,方便管理,控制最大并发数
- 常见池子:连接池、内存池、对象池
线程池的异常处理:
- 不会影响线程池里面其他线程的正常执行
- 当线程异常会调用 ThreadPoolExecutor.runWorker() 方法,最后面的 finally 中的 processWordkerExit(),会将此线程 remove,并重新 addWorker() 一个线程
- 一个线程池中的线程异常了,那么线程池会怎么处理这个线程?
创建线程池的两种方法:
Executors 创建:
- 单例线程池Executors.newSingleThreadExecutor():创建一个单线程的线程池,可保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行;
- 缓存线程池Executors.newCachedThreadPool():创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程;
- 固定线程池Executors.newFixedThreadPool(5):创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;
- 周期线程池Executors.newScheduledThreadPool():创建一个周期性的线程池,支持定时及周期性执行任务。
ThreadPoolExecutor创建:
- ThreadPoolExecutor类提供了4种构造方法,可根据需要来自定义一个线程池。
补充
- 阿里手册不建议这么创建、用原生接口创建。上面的三个方法底层都是通过ThreadPoolExecutor来创建,不安全。所以我们需要通过ThreadPoolExecutor来创建线程池而不是Executors来创建
execute 与 submit 的区别:
- execute():可以提交 Runnable 类型的任务,没有返回值 void
- submit():可以提交 Runnable、Callable 类型的任务,返回值为 Future 类型
- execute():如果遇到异常直接抛出
- submit():如果遇到异常,会在使用 Future 的 get() 获取返回值的时候,才会抛出异常
ThreadPoolExecutor
参数介绍
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小,简单来说就是一直开放的线程
int maximumPoolSize, //最大核心线程池大小
long keepAliveTime, //空闲线程存活时间
TimeUnit unit, //超时单位
BlockingQueue<Runnable> workQueue, //阻塞对列,需要指定长度
ThreadFactory threadFactory, //线程工厂
RejectedExecutionHandler handler) { //拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:核心线程数,默认是不会回收的,除非设置allowCoreThreadTimeOut 参数设置为 true 的时候,核心线程在空闲了 keepAliveTime 的时间后也会被回收的,相当于线程池自动给你动态修改了;
- maximumPoolSize:想要开放的线程超出了CorePoolSize加上workQueue的长度,就会开放空闲的线程。CorePoolSize + 空闲线程 = maximumPoolSize;
- keepAliveTime:空闲线程存活时间,线程池可容纳的最大线程数:maximumPoolSize + workQueue。如果workQueue超过了规定时间还是为空,空闲线程就会关闭,就只剩下CorePoolSize在工作。这里是空闲线程的存活时间,对等待区(workQueue)有效,对核心线程是无效的(设置allowCoreThreadTimeOut 为true就有效了);
- threadFactory:线程工厂(创建线程)、一般不用动。默认的线程工厂Executors.defaultThreadFactory();
- handler:如果当前线程数超过了maximumPoolSize + workQueue。就会采取拒绝策略
四种拒绝策略
- AbortPolicy:默认拒绝策略,丢弃任务,并抛出拒绝执行RejectedExecutionException异常;
- DiscardPolicy:直接丢弃;
- DiscardOldestPolicy:丢弃workQueue中最老的一个任务,并将新任务加入;
- CallerRunsPolicy:不丢弃,超出的任务由使用调用线程直接运行任务。
如何设置参数大小
- 我们是否可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程
- 线程池调优之动态参数配置
corePoolSize
- 可以通过 setCorePoolSize() 方法动态配置、或者根据压测调整核心线程数的大小
// 设置corePoolSize的大小
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
maximumPoolSize
- CPU 密集型:获取当前电脑CPU的核数 + 1来作为最大的线程池、保证效率最佳。Runtime.getRuntime().availableProcessors() + 1。(这里为什么加一:即使当计算(CPU)密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保 CPU 的时钟周期不会被浪费;也就是备用线程、防止线程中断带来的影响)
- IO 密集型:一般是两倍的CPU的核数
- 参考:压测代码片段
我们可以根据自己的业务场景,从“N+1”和“2N”两个公式中选出一个适合的,计算出一个大概的线程数量,之后通过实际压测(IO密集型可以调用IO流等操作、CPU密集型可以计算素数等操作),逐渐往“增大线程数量”和“减小线程数量”这两个方向调整,然后观察整体的处理时间变化,最终确定一个具体的线程数量。
// 修改maximumPoolSize大小
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
workQueue
- 没有设置workQueue大小的方法。所以和这个时候就需要我们自定义一个阻塞对列。如何定义呢,我们想要设置对列的长度的时候一般会用到capacity这个关键词,通过查看源码。以LinkedBlockIngQueue队列为例
// 设置为final,并且没有setter()、getter()方法
private final int capacity;
自定义的阻塞对列。步骤:复制LinkedBlockIngQueue的内容,将category去掉final换成volatile,加上get、set方法
/** The capacity bound,&nbs***bsp;Integer.MAX_VALUE if none */
private volatile int capacity;
public void setCapacity(int capacity){
this.capacity = capacity;
}
public int getCapacity() {
return capacity;
}
这样就可以修改参数了。
ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
// 修改的数值
queue.setCapacity(temp);
问题
- 在springboot容器里面写,由于没有响应容器的关闭或者Tomcat的关闭信号导致shutdown()关闭线程池失效。
1)、创建线程池(函数式接口):
/**
* @author SHshuo
* @data 2021/8/20--9:33
* 创建线程池,核心线程为2、最大线程池为本机CPU的最大线程数+1、
* 空闲线程存活时间为30秒、链式阻塞对列长度为2、默认线程工厂、默认拒绝策略超出会抛出异常
* <p>
* 没有输入值,只有返回值所以用supplier函数接口
*/
public class ThreadUtils {
// 创建线程
private Supplier<ExecutorService> supplier = () -> {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors()+1,
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
return threadPool;
};
// 提供对外开放的接口
public ExecutorService getThreadPool() {
return supplier.get();
}
}
2)、调用线程:shutdown()方法失效
/**
* @author SHshuo
* @data 2021/8/20--16:27
* 这个例子很有感触,我根据视频讲解的创建线程池的方法,优化我写项目对应的内容。
* 我第一次用到函数接口,但是发现一个大问题:就是shutdown语句一直失效
* 项目的结果就是用了springboot集合的线程池ThreadPoolTaskExecutor。
*
* 具体参考:https://blog.csdn.net/francislpx/article/details/105249189
*/
public class ThreadService {
ThreadUtils threadUtils = new ThreadUtils();
public void test(List<String> list) {
list.parallelStream().forEach(p -> {
// 这里的多线程并发执行
threadUtils.getThreadPool().execute(() -> {
execute(p);
});
});
System.out.println("线程池关闭");
threadUtils.getThreadPool().shutdown();
}
public void execute(String account) {
System.out.println("执行内容");
}
}
参考资料
ThreadPoolTaskExecutor
- springboot集成的ThreadPoolTaskExecutor,不得不说,真的太好了。对ThreadPoolExecutor的封装已经帮我们实现了修改corePoolSize、maximumPoolSize、workQueue的方法。
源码:
// 修改corePoolSize的大小
public void setCorePoolSize(int corePoolSize) {
synchronized (this.poolSizeMonitor) {
this.corePoolSize = corePoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setCorePoolSize(corePoolSize);
}
}
}
// 修改maxPoolSize的大小
public void setMaxPoolSize(int maxPoolSize) {
synchronized (this.poolSizeMonitor) {
this.maxPoolSize = maxPoolSize;
if (this.threadPoolExecutor != null) {
this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
}
}
}
// 修改QueueCapacity的大小
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
完整案例:
@Configuration
@EnableAsync
public class ThreadConfig implements AsyncConfigurer {
// 创建线程池
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() + 1);
executor.setQueueCapacity(2);
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
参考:
动态线程池
很好的完善了线程池的参数配置问题