简介
线程池其实就是存放着多个线程对象的一个池子,统一管理线程的创建和销毁,类似于数据库连接池。使用线程池的目的很简单,就是尽可能减少系统开销,提高系统性能。
使用线程池的好处
- 降低系统开销,创建线程会消耗一定的 CPU 和内存,销毁线程也会消耗 CPU,我们可以提前创建一些线程放在线程里,需要使用线程时从池里拿来即用,用完就扔回池里,达到重复利用的效果,如此就可以降低一定的系统开销。
- 提高程序效率,比如我们的某个功能代码中需要用到线程,直接从池里拿线程,省掉了创建线程花费的时间,如此就可以加快程序的响应速度。
- 便于管理线程,如果线程创建太多会拖垮我们的系统,需要根据具体情况来创建线程,不能太多,也不能太少,线程池可以限制最大线程数,也可以灵活的配置相关参数来实现充分的利用多核 CPU 的计算能力,还提供了相关便于监控的 API。
创建线程池的方式
Executors
Executors 其实就是一个创建各种线程池的工厂类,提供了大量的静态方法。
例子:Executors.newScheduledThreadPool(5);
- newSingleThreadExecutor
创建只有一个线程的线程池,即使用单线程串行执行所有任务,这种方式适合需要顺序执行任务的场景 - newFixedThreadPool
创建一个固定线程量的线程池,初始化时需要确定线程数量,执行任务时从线程池中拿,假如执行的任务数比较多,达到了线程初始化时数量,则会缓存到一个无界(即没有长度限制)的阻塞队列中。几年前这种方式使用场景比较多,现在已经有更好的使用方式了 - newCachedThreadPool
创建一个可缓存线程空闲 60 秒的无界线程池,使用 SynchronousQueue 作为阻塞队列,这个队列比较特殊,只能存储一个元素。**可缓存是如何理解呢,其实就是线程执行完任务后不立即销毁线程,而是在 60 秒内无需执行任务的情况下才销毁,适合处理大量耗时较短的任务场景。 - newScheduleThreadPool
创建一个延时或者延时后定时执行的固定工作线程数量的线程池,定时执行类似于 Timer,适合定时处理任务场景。ThreadPoolExecutor
使用 Executors 创建的线程池不够灵活,大多数参数都是默认的,比如 newFixedThreadPool 的核心线程数和最大线程数必须一致,而且没有线程空闲时间,如果使用不当还会导致 OOM,比如 newSingleThreadExecutor 是使用无界的 LinkedBlockingQueue 阻塞队列。
corePoolSize
核心线程数,即线程池长期存活的线程数量。maximumPoolSize
最大线程数,此参数可以有效的控制线程数,避免无限制的创建线程导致 OOM,最大线程数是常驻+临时线程数量的总和。keepAliveTime
线程的空闲时间,在超过核心线程数情况下,并且队列满了情况下,而被创建出来的线程的存活时间,线程在没有执行任务的情况下超过这个时间就会被销毁。unit
空闲时间的单位,比如毫秒、秒、分钟和小时等等。workQueue
等待队列,在超过核心线程数情况下,任务将放在等待队列,它是一个 BlockingQueue 阻塞队列。Java 自带的阻塞队列:- ArrayBlockingQueue,有界阻塞队列,基于数组实现的队列;
- LinkedBlockingQueue,无界阻塞队列,基于链表实现的队列;
- SynchronousQueue,只能存储一个元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。此队列是 Executors.newCachedThreadPool()的默认阻塞队列;
- PriorityBlockingQueue,具体优先级的无界阻塞队列。
threadFactory
线程工厂,用于创建一个线程,一般用于设置线程的名称。handler
拒绝策略,当核心线满了、队列满了,超过了最大线程数情况下的处理策略。
拒绝策略列表:CallerRunsPolicy,常用,在调用者线程执行;
AbortPolicy,默认,抛出 RejectedExecutionException 异常;
DiscardPolicy,任务被丢弃,不做处理,会出现任务被吞的想象;
DiscardOldestPolicy,丢弃队列里最旧(最先入队)的那个任务,再尝试执行当前任务(重复此过程)。
import java.util.concurrent.*; public class ThreadPoolTaskExecutorDemo { public static class TaskA implements Runnable{ private String name; public TaskA(String name) { this.name = name; } @Override public void run() { try { System.out.println(this.name + "-运行中"); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50)); // 设置线程池的拒绝策略为"在调用者线程执行,即不丢弃所有任务" pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { Runnable r = new TaskA("任务-"+i); pool.execute(r); } pool.shutdown(); } }
ThreadPoolTaskExecutor
这是 Spring Boot 2.x 整合线程池的方式,也是一种自定义线程池,配合注解 @Async 使用,也比较常用。
import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * 线程池配置 */ @Configuration public class ExecutorConfig{ /** * xxxExecutor 是自定义的名称 */ @Bean(name="xxxExecutor") public Executor xxxxxxExecutor() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); //核心线程数 pool.setCorePoolSize(5); //最大线程数 pool.setMaxPoolSize(5); //缓冲队列大小 pool.setQueueCapacity(1024); //线程最大空闲时间 pool.setKeepAliveSeconds(300); //线程的名称前缀 pool.setThreadNamePrefix("xxxExecutor-"); //拒绝策略 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 pool.initialize(); return pool; } } /** * 异步方法,即 run 方法的逻辑,xxxExecutor 要跟配置时的一致 */ @Async("xxxExecutor") public void xxxxxxByAsync() { // do something...... }
提交任务的方式
execute()
execute() 在 ** Executor 接口类中声明,具体实现方法在 ThreadPoolExecutor 类**,此方法时 ThreadPoolExecutor 类最核心的方法,作用是提交任务到线程池,由线程池来分配哪个线程去执行任务。不需要返回任务结果的使用这种方式提交,任务执行出错会抛异常。【虽然使用特殊的方式也能实现】
submit()
submit() 在 ExecutorService 接口中声明,具体实现在 AbstractExecutorService 抽象类中,此方法作用也是提交任务到线程池,但是与 execute() 方法有些不同,submit() 可以使用 Future 来获取任务执行结果,但是最终还是调用 execute() 提交任务(所以此方法是 ThreadPoolExecutor 最核心)。需要返回任务结果的使用这种方式提交,获取结果 get() 是阻塞方法,配合 CountDownLatch 使用可以避免阻塞
线程池的状态
ThreadPoolExecutor 类中定义了几个变量表示线程池的各个状态。
RUNNING
创建线程池后,处于 RUNNING 状态
SHUTDOWN
调用 shutdown() 后,状态被改为 SHUTDOWN 状态,此时不接受新的任务,等待所有任务执行完毕
STOP
调用 shutdownNow() 后,状态被改为 STOP 状态,此时不接受新的任务,终止正在执行的任务,并且移除等待队列的所有任务并返回
TIDYING
当线程池处于 SHUTDOWN 状态时,等待队列为空、并且线程池中执行的任务也为空时,此时状态被改为 TIDYING
TERMINATED【最终的状态】
当线程池处于 SHUTDOWN 或者 STOP 状态时,等待队列的任务都执行完毕或者被清空,并且所有线程都被销毁,此时状态被改为 TERMINATED
关闭线程池的方式
shutdown()
shutdown() 会把线程池的状态改为 SHUTDOWN,拒绝新提交的任务,继续执行已提交的任务(包含等待队列的任务)。
shutdownNow()
shutdownNow() 会将线程池状态置为 STOP,拒绝新提交的任务,同时会中断当前正在执行任务的线程,并且会移除等待队列的所有任务并返回。
执行 shutdown() 或者 shutdownNow()之后,如何拒绝新提交的任务?【提交任务的时候,根据线程池的状态判断】
调用 shutdown() 或者 shutdownNow() 方法之后会把线程池的状态改为 SHUTDOWN 或者 STOP 状态,当提交任务时先判断状态是不是 RUNNING,如果不是则抛 rejectedExecution 异常达到拒绝任务的目的。
isShutDown() 和 isTerminated() 的区别?
只要调用 shutdown() 和 shutdownNow() 中的任意一个后,调用 isShutDown() 都返回 true;当所有任务都成功关闭后,调用 isTerminated() 才返回 true。
线程池的线程初始化
动态的调整线程池容量
线程池监控
写一个定时任务定时输出,正在执行的任务数、曾经创建过的最大线程数、当前活动线程数等,再结合分析创建线程池时设置的核心线程数、最大线程数、等待队列和拒绝策略,定位问题就变得很容易了。如果想把监控做得更完善的话,可以设定一些阈值,比如:当等待队列达到某个值时进行报警等等,如此就可以提前感知问题防范未然。
以下是API
线程池调优
corePoolSize(核心线程数)【不会关闭,会一直保持,不受keeplive的影响】
根据具体应用情况而定,假如很少使用线程或者一天只使用一到两次,则可以设置为 0,假如需要经常使用线程多个执行任务,则根据具体任务数来设置一个或者多个核心线程数。
maximumPoolSize(最大线程数)
假如 N 是 CPU 的核数,那么最大线程数最少要设置为 N 才能体现多核 CPU 的能力,通常情况下,IO 密集型设置为 2N,CPU 密集型设置为 N+1。
workQueue(等待队列)
任务量少使用 SynchronousQueue、无界队列(比如:LinkedBlockingQueue);任务量大使用有界队列(比如:ArrayBlockingQueue),防止 OOM。
keepAliveTime(线程的空闲时间)
通常设置为 5 分钟、10 分钟,尽量让线程多执行些任务再被销毁。
handler(拒绝策略)
这个参数比较好理解,根据应用具体需求来定就可以了,常用 CallerRunsPolicy 策略,保证不丢弃任何一个任务。
ThreadPoolExecutor 线程池的原理★
任务提交到线程池的处理流程
任务提交到线程池的处理流程如下:
判断线程池的当前线程数是否大于核心线程数 corePoolSize,如果不是,则创建线程执行任务;
如果当前线程数大于核心线程数,在判断等待队列 workQueue 是否已满,如果队列没满,把任务放进队列;
如果等待队列已满,则判断当前线程数是否大于最大线程数 maximumPoolSize,如果不是,创建线程执行任务;
如果当前线程数大于最大线程数,则根据拒绝策略进行处理。
public void execute(Runnable command) { //判断提交过来的任务是否为 null,如果是则抛异常 if (command == null) { throw new NullPointerException(); } else { //获取线程池的当前线程数 int c = this.ctl.get(); //如果当前线程数小于核心线程数 corePoolSize,则创建新的线程执行任务 if (workerCountOf(c) < this.corePoolSize) { if (this.addWorker(command, true)) { return; } c = this.ctl.get(); } //isRunning(c)判断线程池状态是否为 RUNNING //this.workQueue.offer(command),往等待队列插入任务,如果队列满了直接返回 false,所以此处使用 offer() if (isRunning(c) && this.workQueue.offer(command)) { //再次获取线程池的当前线程数 int recheck = this.ctl.get(); //再次验证线程池状态是否为 RUNNING,如果不是 RUNNING,则移除插入的任务 if (!isRunning(recheck) && this.remove(command)) { //根据拒绝策略进行处理 this.reject(command); //如果是 RUNNING,当前线程数等于 0,则创建新的线程执行任务 } else if (workerCountOf(recheck) == 0) { this.addWorker((Runnable)null, false); } //如果创建非核心线程失败,则根据拒绝策略进行处理 } else if (!this.addWorker(command, false)) { this.reject(command); } } }
注意事项
线程池底层的存储结构一个 HashSet(存储线程)、一个阻塞队列(存储等待执行的任务);
只有等待队列满了,才创建非核心线程,非核心线程在经过配置的空闲时间后被销毁;
提供了三个钩子(beforeExecute、afterExecute、terminated),在执行任务前后、线程池彻底终止后执行我们自定义的逻辑。
总结
不能嵌套线程池,嵌套会导致任务串行执行;
尽量使用自定义的方式创建线程池,谨慎使用 Executors 工厂类;
获取任务结果使用异步方式,比如:CompletionService 或者 CompletableFuture,避免阻塞;
避免异常被吞,比如:使用 submit() 提交任务,假如任务出现异常,假如没有获取任务结果(比如:CompletionService 接口)的情况下,则不打印异常信息(即被吞掉),使用 execute() 提交任务不会出现这种问题。