本文关键字:
线程
,线程池
,单线程
,多线程
,线程池的好处
,线程回收
,创建方式
,核心参数
,底层机制
,拒绝策略
,参数设置
,动态监控
,线程隔离
线程和线程池相关的知识,是Java学习或者面试中一定会遇到的知识点,本篇我们会从线程和进程,并行与并发,单线程和多线程等,一直讲解到线程池,线程池的好处,创建方式,重要的核心参数,几个重要的方法,底层实现,拒绝策略,参数设置,动态调整,线程隔离等等。主要的大纲如下:
线程池的好处
线程池,使用了池化思想来管理线程,池化技术就是为了最大化效益,最小化用户风险,将资源统一放在一起管理的思想。这种思想在很多地方都有使用到,不仅仅是计算机,比如金融,企业管理,设备管理等。
为什么要线程池?如果在并发的场景,编码人员根据需求来创建线程池,可能会有以下的问题:
- 我们很难确定系统有多少线程在运行,如果使用就创建,不使用就销毁,那么创建和销毁线程的消耗也是比较大的
- 假设来了很多请求,可能是爬虫,疯狂创建线程,可能把系统资源耗尽。
实现线程池有什么好处呢?
- 降低资源消耗:池化技术可以重复利用已经创建的线程,降低线程创建和销毁的损耗。
- 提高响应速度:利用已经存在的线程进行处理,少去了创建线程的时间
- 管理线程可控:线程是稀缺资源,不能无限创建,线程池可以做到统一分配和监控
- 拓展其他功能:比如定时线程池,可以定时执行任务
其实池化技术,用在比较多地方,比如:
- 数据库连接池:数据库连接是稀缺资源,先创建好,提高响应速度,重复利用已有的连接
- 实例池:先创建好对象放到池子里面,循环利用,减少来回创建和销毁的消耗
线程池相关的类
下面是与线程池相关的类的继承关系:
Executor
Executor
是顶级接口,里面只有一个方法execute(Runnable command)
,定义的是调度线程池来执行任务,它定义了线程池的基本规范,执行任务是它的天职。
ExecutorService
ExecutorService
继承了Executor
,但是它仍然是一个接口,它多了一些方法:
void shutdown()
:关闭线程池,会等待任务执行完。List<Runnable> shutdownNow()
:立刻关闭线程池,尝试停止所有正在积极执行的任务,停止等待任务的处理,并返回一个正在等待执行的任务列表(还没有执行的)。boolean isShutdown()
:判断线程池是不是已经关闭,但是可能线程还在执行。boolean isTerminated()
:在执行shutdown/shutdownNow之后,所有的任务已经完成,这个状态就是true。boolean awaitTermination(long timeout, TimeUnit unit)
:执行shutdown之后,阻塞等到terminated状态,除非超时或者被打断。<T> Future<T> submit(Callable<T> task)
: 提交一个有返回值的任务,并且返回该任务尚未有结果的Future,调用future.get()方法,可以返回任务完成的时候的结果。<T> Future<T> submit(Runnable task, T result)
:提交一个任务,传入返回结果,这个result没有什么作用,只是指定类型和一个返回的结果。Future<?> submit(Runnable task)
: 提交任务,返回Future<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
:批量执行tasks,获取Future的list,可以批量提交任务。<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
:批量提交任务,并指定超时时间<T> T invokeAny(Collection<? extends Callable<T>> tasks)
: 阻塞,获取第一个完成任务的结果值,<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
:阻塞,获取第一个完成结果的值,指定超时时间
可能有同学对前面的<T> Future<T> submit(Runnable task, T result)
有疑问,这个reuslt有什么作用?
其实它没有什么作用,只是持有它,任务完成后,还是调用 future.get()
返回这个结果,用result
new 了一个 ftask
,其内部其实是使用了Runnable的包装类 RunnableAdapter
,没有对result做特殊的处理,调用 call()
方法的时候,直接返回这个结果。(Executors 中具体的实现)
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); // 返回传入的结果 return result; } }
还有一个方法值得一提:invokeAny()
: 在 ThreadPoolExecutor
中使用ExecutorService
中的方法 invokeAny()
取得第一个完成的任务的结果,当第一个任务执行完成后,会调用 interrupt()
方法将其他任务中断。
注意,ExecutorService
是接口,里面都是定义,并没有涉及实现,而前面的讲解都是基于它的名字(规定的规范)以及它的普遍实现来说的。
可以看到 ExecutorService
定义的是线程池的一些操作,包括关闭,判断是否关闭,是否停止,提交任务,批量提交任务等等。
AbstractExecutorService
AbstractExecutorService
是一个抽象类,实现了 ExecutorService
接口,这是大部分线程池的基本实现,定时的线程池先不关注,主要的方法如下:
不仅实现了submit
,invokeAll
,invokeAny
等方法,而且提供了一个 newTaskFor
方法用于构建 RunnableFuture
对象,那些能够获取到任务返回结果的对象都是通过 newTaskFor
来获取的。不展开里面所有的源码的介绍,仅以submit()方法为例:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 封装任务 RunnableFuture<Void> ftask = newTaskFor(task, null); // 执行任务 execute(ftask); // 返回 RunnableFuture 对象 return ftask; }
但是在 AbstractExecutorService
是没有对最最重要的方法进行实现的,也就是 execute()
方法。线程池具体是怎么执行的,这个不同的线程池可以有不同的实现,一般都是继承 AbstractExecutorService
(定时任务有其他的接口),我们最最常用的就是ThreadPoolExecutor
。
ThreadPoolExecutor
重点来了!!! ThreadPoolExecutor
一般就是我们平时常用到的线程池类,所谓创建线程池,如果不是定时线程池,就是使用它。
先看ThreadPoolExecutor
的内部结构(属性):
public class ThreadPoolExecutor extends AbstractExecutorService { // 状态控制,主要用来控制线程池的状态,是核心的遍历,使用的是原子类 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 用来表示线程数量的位数(使用的是位运算,一部分表示线程的数量,一部分表示线程池的状态) // SIZE = 32 表示32位,那么COUNT_BITS就是29位 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程池的容量,也就是27位表示的最大值 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 状态量,存储在高位,32位中的前3位 // 111(第一位是符号位,1表示负数),线程池运行中 private static final int RUNNING = -1 << COUNT_BITS; // 000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 private static final int STOP = 1 << COUNT_BITS; // 010 private static final int TIDYING = 2 << COUNT_BITS; // 011 private static final int TERMINATED = 3 << COUNT_BITS; // 取出运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 取出线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } // 用运行状态和线程数获取ctl private static int ctlOf(int rs, int wc) { return rs | wc; } // 任务等待队列 private final BlockingQueue<Runnable> workQueue; // 可重入主锁(保证一些操作的线程安全) private final ReentrantLock mainLock = new ReentrantLock(); // 线程的集合 private final HashSet<Worker> workers = new HashSet<Worker>(); // 在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(), // 传统线程的通信方式,Condition都可以实现,Condition和传统的线程通信没什么区别,Condition的强大之处在于它可以为多个线程间建立不同的Condition private final Condition termination = mainLock.newCondition(); // 最大线程池大小 private int largestPoolSize; // 完成的任务数量 private long completedTaskCount; // 线程工厂 private volatile ThreadFactory threadFactory; // 任务拒绝处理器 private volatile RejectedExecutionHandler handler; // 非核心线程的存活时间 private volatile long keepAliveTime; // 允许核心线程的超时时间 private volatile boolean allowCoreThreadTimeOut; // 核心线程数 private volatile int corePoolSize; // 工作线程最大容量 private volatile int maximumPoolSize; // 默认的拒绝处理器(丢弃任务) private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 运行时关闭许可 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); // 上下文 private final AccessControlContext acc; // 只有一个线程 private static final boolean ONLY_ONE = true; }
线程池状态
从上面的代码可以看出,用一个32位的对象保存线程池的状态以及线程池的容量,高3位是线程池的状态,而剩下的29位,则是保存线程的数量:
// 状态量,存储在高位,32位中的前3位 // 111(第一位是符号位,1表示负数),线程池运行中 private static final int RUNNING = -1 << COUNT_BITS; // 000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 001 private static final int STOP = 1 << COUNT_BITS; // 010 private static final int TIDYING = 2 << COUNT_BITS; // 011 private static final int TERMINATED = 3 << COUNT_BITS;
各种状态之间是不一样的,他们的状态之间变化如下:
- RUNNING:运行状态,可以接受任务,也可以处理任务
- SHUTDOWN:不可以接受任务,但是可以处理任务
- STOP:不可以接受任务,也不可以处理任务,中断当前任务
- TIDYING:所有线程停止
- TERMINATED:线程池的最后状态
Worker 实现
线程池,肯定得有池子,并且是放线程的地方,在 ThreadPoolExecutor
中表现为 Worker
,这是内部类:
线程池其实就是 Worker
(打工人,不断的领取任务,完成任务)的集合,这里使用的是 HashSet
:
private final HashSet<Worker> workers = new HashSet<Worker>();
Worker
怎么实现的呢?
Worker
除了继承了 AbstractQueuedSynchronizer
,也就是 AQS
, AQS
本质上就是个队列锁,一个简单的互斥锁,一般是在中断或者修改 worker
状态的时候使用。
内部引入AQS
,是为了线程安全,线程执行任务的时候,调用的是runWorker(Worker w)
,这个方法不是worker的方法,而是 ThreadPoolExecutor
的方法。从下面的代码可以看出,每次修改Worke
r的状态的时候,都是线程安全的。Worker
里面,持有了一个线程Thread
,可以理解为是对线程的封装。
至于runWorker(Worker w)
是怎么运行的?先保持这个疑问,后面详细讲解。
// 实现 Runnable,封装了线程 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 序列化id private static final long serialVersionUID = 6138294804551838833L; // worker运行的线程 final Thread thread; // 初始化任务,有可能是空的,如果任务不为空的时候,其他进来的任务,可以直接运行,不在添加到任务队列 Runnable firstTask; // 线程任务计数器 volatile long completedTasks; // 指定一个任务让工人忙碌起来,这个任务可能是空的 Worker(Runnable firstTask) { // 初始化AQS队列锁的状态 setState(-1); // 禁止中断直到 runWorker this.firstTask = firstTask; // 从线程工厂,取出一个线程初始化 this.thread = getThreadFactory().newThread(this); } // 实际上运行调用的是runWorker public void run() { // 不断循环获取任务进行执行 runWorker(this); } // 0表示没有被锁 // 1表示被锁的状态 protected boolean isHeldExclusively() { return getState() != 0; } // 独占,尝试获取锁,如果成功返回true,失败返回false protected boolean tryAcquire(int unused) { // CAS 乐观锁 if (compareAndSetState(0, 1)) { // 成功,当前线程独占锁 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 独占方式,尝试释放锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 上锁,调用的是AQS的方法 public void lock() { acquire(1); } // 尝试上锁 public boolean tryLock() { return tryAcquire(1); } // 解锁 public void unlock() { release(1); } // 是否锁住 public boolean isLocked() { return isHeldExclusively(); } // 如果开始可就中断 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
任务队列
除了放线程池的地方,要是任务很多,没有那么多线程,肯定需要一个地方放任务,充当缓冲作用,也就是任务队列,在代码中表现为:
private final BlockingQueue<Runnable> workQueue;
拒绝策略和处理器
计算机的内存总是有限的,我们不可能一直往队列里面增加内容,所以线程池为我们提供了选择,可以选择多种队列。同时当任务实在太多,占满了线程,并且把任务队列也占满的时候,我们需要做出一定的反应,那就是拒绝还是抛出错误,丢掉任务?丢掉哪些任务,这些都是可能需要定制的内容。
如何创建线程池
关于如何创建线程池,其实 ThreadPoolExecutor
提供了构造方法,主要参数如下,不传的话会使用默认的:
- 核心线程数:核心线程数,一般是指常驻的线程,没有任务的时候通常也不会销毁
- 最大线程数:线程池允许创建的最大的线程数量
- 非核心线程的存活时间:指的是没有任务的时候,非核心线程能够存活多久
- 时间的单位:存活时间的单位
- 存放任务的队列:用来存放任务
- 线程工厂
- 拒绝处理器:如果添加任务失败,将由该处理器处理
// 指定核心线程数,最大线程数,非核心线程没有任务的存活时间,时间单位,任务队列 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } // 指定核心线程数,最大线程数,非核心线程没有任务的存活时间,时间单位,任务队列,线程池工厂 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } // 指定核心线程数,最大线程数,非核心线程没有任务的存活时间,时间单位,任务队列,拒绝任务处理器 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } // 最后其实都是调用了这个方法 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
其实,除了显示的指定上面的参数之外,JDK也封装了一些直接创建线程池的方法给我们,那就是Executors
:
// 固定线程数量的线程池,无界的队列 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // 单个线程的线程池,无界的队列,按照任务提交的顺序,串行执行 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } // 动态调节,没有核心线程,全部都是普通线程,每个线程存活60s,使用容量为1的阻塞队列 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // 定时任务线程池 public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
但是一般是不推荐使用上面别人封装的线程池的哈!!!
线程池的底层参数以及核心方法
看完上面的创建参数大家可能会有点懵,但是没关系,一一为大家道来:
可以看出,当有任务进来的时候,先判断核心线程池是不是已经满了,如果还没有,将会继续创建线程。注意,如果一个任务进来,创建线程执行,执行完成,线程空闲下来,这时候再来一个任务,是会继续使用之前的线程,还是重新创建一个线程来执行呢?
答案是重新创建线程,这样线程池可以快速达到核心线程数的规模大小,以便快速响应后面的任务。
如果线程数量已经到达核心线程数,来了任务,线程池的线程又都不是空闲状态,那么就会判断队列是不是满的,倘若队列还有空间,那么就会把任务放进去队列中,等待线程领取执行。
如果任务队列已经满了,放不下任务,那么就会判断线程数是不是已经到最大线程数了,要是还没有到达,就会继续创建线程并执行任务,这个时候创建的是非核心部分线程。
如果已经到达最大线程数,那么就不能继续创建线程了,只能执行拒绝策略,默认的拒绝策略是丢弃任务,我们可以自定义拒绝策略。
值得注意的是,倘若之前任务比较多,创建出了一些非核心线程,那么任务少了之后,领取不到任务,过了一定时间,非核心线程就会销毁,只剩下核心线程池的数量的线程。这个时间就是前面说的keepAliveTime
。
提交任务
提交任务,我们看execute()
,会先获取线程池的状态和个数,要是线程个数还没达到核心线程数,会直接添加线程,否则会放到任务队列,如果任务队列放不下,会继续增加线程,但是不是增加核心线程。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 获取状态和个数 int c = ctl.get(); // 如果个数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 直接添加 if (addWorker(command, true)) return; // 添加失败则继续获取 c = ctl.get(); } // 判断线程池状态是不是运行中,任务放到队列中 if (isRunning(c) && workQueue.offer(command)) { // 再次检查 int recheck = ctl.get(); // 判断线程池是不是还在运行 if (! isRunning(recheck) && remove(command)) // 如果不是,那么就拒绝并移除任务 reject(command); else if (workerCountOf(recheck) == 0) // 如果线程数为0,并且还在运行,那么就直接添加 addWorker(null, false); }else if (!addWorker(command, false)) // 添加任务队列失败,拒绝 reject(command); }
上面的源码中,调用了一个重要的方法:addWorker(Runnable firstTask, boolean core)
,该方法主要是为了增加工作的线程,我们来看看它是如何执行的:
private boolean addWorker(Runnable firstTask, boolean core) { // 回到当前位置重试 retry: for (;;) { // 获取状态 int c = ctl.get(); int rs = runStateOf(c); // 大于SHUTDOWN说明线程池已经停止 // ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) 表示三个条件至少有一个不满足 // 不等于SHUTDOWN说明是大于shutdown // firstTask != null 任务不是空的 // workQueue.isEmpty() 队列是空的 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 工作线程数 int wc = workerCountOf(c); // 是否符合容量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 添加成功,跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // cas失败,重新尝试 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 前面线程计数增加成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建了一个worker,包装了任务 w = new Worker(firstTask); final Thread t = w.thread; // 线程创建成功 if (t != null) { // 获取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次确认状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果线程已经启动,失败 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 新增线程到集合 workers.add(w); // 获取大小 int s = workers.size(); // 判断最大线程池数量 if (s > largestPoolSize) largestPoolSize = s; // 已经添加工作线程 workerAdded = true; } } finally { // 解锁 mainLock.unlock(); } // 如果已经添加 if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { // 如果没有启动 if (! workerStarted) // 失败处理 addWorkerFailed(w); } return workerStarted; }
处理任务
前面在介绍Worker
这个类的时候,我们讲解到其实它的run()
方法调用的是外部的runWorker()
方法,那么我们来看看runWorkder()
方法:
首先,它会直接处理自己的firstTask,这个任务并没有在任务队列里面,而是它自己持有的:
final void runWorker(Worker w) { // 当前线程 Thread wt = Thread.currentThread(); // 第一个任务 Runnable task = w.firstTask; // 重置为null w.firstTask = null; // 允许打断 w.unlock(); boolean completedAbruptly = true; try { // 任务不为空,或者获取的任务不为空 while (task != null || (task = getTask()) != null) { // 加锁 w.lock(); //如果线程池停止,确保线程被中断; //如果不是,确保线程没有被中断。这 //在第二种情况下需要复查处理 // shutdown - now竞赛同时清除中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行之前回调方法(可以由我们自己实现) beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行之后回调方法 afterExecute(task, thrown); } } finally { // 置为null task = null; // 更新完成任务 w.completedTasks++; w.unlock(); } } // 完成 completedAbruptly = false; } finally { // 处理线程退出相关工作 processWorkerExit(w, completedAbruptly); } }
上面可以看到如果当前的任务是null,会去获取一个task,我们看看getTask()
,里面涉及到了两个参数,一个是是不是允许核心线程销毁,另外一个是线程数是不是大于核心线程数,如果满足条件,就从队列中取出任务,如果超时取不到,那就返回空,表示没有取到任务,没有取到任务,就不会执行前面的循环,就会触发线程销毁processWorkerExit()
等工作。
private Runnable getTask() { // 是否超时 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // SHUTDOWN状态继续处理队列中的任务,但是不接收新的任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 线程数 int wc = workerCountOf(c); // 是否允许核心线程超时或者线程数大于核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 减少线程成功,就返回null,后面由processWorkerExit()处理 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果允许核心线程关闭,或者超过了核心线程,就可以在超时的时间内获取任务,或者直接取出任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 如果能取到任务,那就肯定可以执行 if (r != null) return r; // 否则就获取不到任务,超时了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
销毁线程
前面提到,如果线程当前任务为空,又允许核心线程销毁,或者线程超过了核心线程数,等待了一定时间,超时了却没有从任务队列获取到任务的话,就会跳出循环执行到后面的线程销毁(结束)程序。那销毁线程的时候怎么做呢?
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果是突然结束的线程,那么之前的线程数是没有调整的,这里需要调整 if (completedAbruptly) decrementWorkerCount(); // 获取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 完成的任务数 completedTaskCount += w.completedTasks; // 移除线程 workers.remove(w); } finally { // 解锁 mainLock.unlock(); } // 试图停止 tryTerminate(); // 获取状态 int c = ctl.get(); // 比stop小,至少是shutdown if (runStateLessThan(c, STOP)) { // 如果不是突然完成 if (!completedAbruptly) { // 最小值要么是0,要么是核心线程数,要是允许核心线程超时销毁,那么就是0 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果最小的是0或者队列不是空的,那么保留一个线程 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 只要大于等于最小的线程数,就结束当前线程 if (workerCountOf(c) >= min) return; // replacement not needed } // 否则的话,可能还需要新增工作线程 addWorker(null, false); } }
如何停止线程池
停止线程池可以使用shutdown()
或者shutdownNow()
,shutdown()
可以继续处理队列中的任务,而shutdownNow()
会立即清理任务,并返回未执行的任务。
public void shutdown() { // 获取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查停止权限 checkShutdownAccess(); // 更新状态 advanceRunState(SHUTDOWN); // 中断所有线程 interruptIdleWorkers(); // 回调钩子 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } // 立刻停止 public List<Runnable> shutdownNow() { List<Runnable> tasks; // 获取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查停止权限 checkShutdownAccess(); // 更新状态到stop advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 清理队列 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); // 返回任务列表(未完成) return tasks; }
execute()和submit()方法
execute()
方法可以提交不需要返回值的任务,无法判断任务是否被线程池执行是否成功submit()
方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个对象,我们调用get()
方法就可以阻塞,直到获取到线程执行完成的结果,同时我们也可以使用有超时时间的等待方法get(long timeout,TimeUnit unit)
,这样不管线程有没有执行完成,如果到时间,也不会阻塞,直接返回null。返回的是RunnableFuture
对象,继承了Runnable, Future<V>
两个接口:
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
线程池为什么使用阻塞队列?
阻塞队列,首先是一个队列,肯定具有先进先出的属性。
而阻塞,则是这个模型的演化,一般队列,可以用在生产消费者模型,也就是数据共享,有人往里面放任务,有人不断的往里面取出任务,这是一个理想的状态。
但是倘若不理想,产生任务和消费任务的速度不一样,要是任务放在队列里面比较多,消费比较慢,还可以慢慢消费,或者生产者得暂停一下产生任务(阻塞生产者线程)。可以使用 offer(E o, long timeout, TimeUnit unit)
设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue
,则返回失败,也可以使用put(Object)
,将对象放到阻塞队列里面,如果没有空间,那么这个方***阻塞到有空间才会放进去。
如果消费速度快,生产者来不及生产,获取任务的时候,可以使用poll(time)
,有数据则直接取出来,没数据则可以等待time
时间后,返回null
。也可以使用take()
取出第一个任务,没有任务就会一直阻塞到队列有任务为止。
上面说了阻塞队列的属性,那么为啥要用呢?
- 如果产生任务,来了就往队列里面放,资源很容易被耗尽。
- 创建线程需要获取锁,这个一个线程池的全局锁,如果各个线程不断的获取锁,解锁,线程上下文切换之类的开销也比较大,不如在队列为空的时候,然一个线程阻塞等待。
常见的阻塞队列
- ArrayBlockingQueue:基于数组实现,内部有一个定长的数组,同时保存着队列头和尾部的位置。
- LinkedBlockingQueue:基于链表的阻塞对垒,生产者和消费者使用独立的锁,并行能力强,如果不指定容量,默认是无效容量,容易系统内存耗尽。
- DelayQueue:延迟队列,没有大小限制,生产数据不会被阻塞,消费数据会,只有指定的延迟时间到了,才能从队列中获取到该元素。
- PriorityBlockingQueue:基于优先级的阻塞队列,按照优先级进行消费,内部控制同步的是公平锁。
- SynchronousQueue:没有缓冲,生产者直接把任务交给消费者,少了中间的缓存区。
线程池如何复用线程的?执行完成的线程怎么处理
前面的源码分析,其实已经讲解过这个问题了,线程池的线程调用的run()
方法,其实调用的是runWorker()
,里面是死循环,除非获取不到任务,如果没有了任务firstTask并且从任务队列中获取不到任务,超时的时候,会再判断是不是可以销毁核心线程,或者超过了核心线程数,满足条件的时候,才会让当前的线程结束。
否则,一直都在一个循环中,不会结束。
我们知道start()
方法只能调用一次,因此调用到run()
方法的时候,调用外面的runWorker()
,让其在runWorker()
的时候,不断的循环,获取任务。获取到任务,调用任务的run()
方法。
执行完成的线程会调用processWorkerExit()
,前面有分析,里面会获取锁,把线程数减少,从工作线程从集合中移除,移除掉之后,会判断线程是不是太少了,如果是,会再加回来,个人以为是一种补救。
如何配置线程池参数?
一般而言,有个公式,如果是计算(CPU)密集型的任务,那么核心线程数设置为处理器核数-1
,如果是io密集型(很多网络请求),那么就可以设置为2*处理器核数
。但是这并不是一个银弹,一切要从实际出发,最好就是在测试环境进行压测,实践出真知,并且很多时候一台机器不止一个线程池或者还会有其他的线程,因此参数不可设置得太过饱满。
一般 8 核的机器,设置 10-12 个核心线程就差不多了,这一切必须按照业务具体值进行计算。设置过多的线程数,上下文切换,竞争激烈,设置过少,没有办法充分利用计算机的资源。
计算(CPU)密集型消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
io密集型系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
为什么不推荐默认的线程池创建方式?
阿里的编程规范里面,不建议使用默认的方式来创建线程,是因为这样创建出来的线程很多时候参数都是默认的,可能创建者不太了解,很容易出问题,最好通过new ThreadPoolExecutor()
来创建,方便控制参数。默认的方式创建的问题如下:
- Executors.newFixedThreadPool():无界队列,内存可能被打爆
- Executors.newSingleThreadExecutor():单个线程,效率低,串行。
- Executors.newCachedThreadPool():没有核心线程,最大线程数可能为无限大,内存可能还会爆掉。
使用具体的参数创建线程池,开发者必须了解每个参数的作用,不会胡乱设置参数,减少内存溢出等问题。
一般体现在几个问题:
- 任务队列怎么设置?
- 核心线程多少个?
- 最大线程数多少?
- 怎么拒绝任务?
- 创建线程的时候没有名称,追溯问题不好找。
线程池的拒绝策略
线程池一般有以下四种拒绝策略,其实我们可以从它的内部类看出来:
- AbortPolicy: 不执行新的任务,直接抛出异常,提示线程池已满
- DisCardPolicy:不执行新的任务,但是也不会抛出异常,默默的
- DisCardOldSetPolicy:丢弃消息队列中最老的任务,变成新进来的任务
- CallerRunsPolicy:直接调用当前的execute来执行任务
一般而言,上面的拒绝策略都不会特别理想,一般要是任务满了,首先需要做的就是看任务是不是必要的,如果非必要,非核心,可以考虑拒绝掉,并报错提醒,如果是必须的,必须把它保存起来,不管是使用mq消息,还是其他手段,不能丢任务。在这些过程中,日志是非常必要的。既要保护线程池,也要对业务负责。
线程池监控与动态调整
线程池提供了一些API,可以动态获取线程池的状态,并且还可以设置线程池的参数,以及状态:
查看线程池的状态:
修改线程池的状态:
关于这一点,美团的线程池文章讲得很清楚,甚至做了一个实时调整线程池参数的平台,可以进行跟踪监控,线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等等。这里我就不展开了,原文:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html ,这是我们可以参考的思路。
线程池隔离
线程隔离,很多同学可能知道,就是不同的任务放在不同的线程里面运行,而线程池隔离,一般是按照业务类型来隔离,比如订单的处理线程放在一个线程池,会员相关的处理放在一个线程池。
也可以通过核心和非核心来隔离,核心处理流程放在一起,非核心放在一起,两个使用不一样的参数,不一样的拒绝策略,尽量保证多个线程池之间不影响,并且最大可能保住核心线程的运行,非核心线程可以忍受失败。
Hystrix
里面运用到这个技术,Hystrix
的线程隔离技术,来防止不同的网络请求之间的雪崩,即使依赖的一个服务的线程池满了,也不会影响到应用程序的其他部分。
关于作者
秦怀,公众号【秦怀杂货店】作者,技术之路不在一时,山高水长,纵使缓慢,驰而不息。个人写作方向:Java源码解析,JDBC,Mybatis,Spring,redis,分布式,剑指Offer,LeetCode等,认真写好每一篇文章,不喜欢标题党,不喜欢花里胡哨,大多写系列文章,不能保证我写的都完全正确,但是我保证所写的均经过实践或者查找资料。遗漏或者错误之处,还望指正。