文章索引
一.线程池简介
查阅过juc包或者在一开始学习javaSE的时候,大家就应该对线程池有一定的了解。比如从一开始就用Executor框架中Executors去创建四种线程池,它主要用于提供线程池相关的操作,如下所示。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
线程池 : 可以说是一个保存了多个线程的集合,它方便管理线程,也减少了线程的创建和销毁的内存消耗,更好地提高性能,在添加任务,线程执行完任务之后,会将线程归还给线程池,实现线程复用。
Executor框架 : 从JDK1.5开始,为了把工作单元与执行机制分离开,Executor框架诞生了,他是一个用于统一创建与运行的接口。Executor框架实现的就是线程池的功能。
此图来自于Java并发——Executor框架详解(Executor框架结构与框架成员)
这里只是想提醒一下所有的线程池最终都来自于一个构造方法,创建的都是ThreadPoolExecutor对象。
二.Executors创建四种线程池
Executor框架中Executors提供了四种静态API创建不同种类的线程池。
-
public static ExecutorService newFixedThreadPool()
返回一个固定线程数量的线程池
-
public static ExecutorService newSingleThreadExecutor()
返回一个只有一个现成的线程池
-
public static ExecutorService newCachedThreadPool()
返回一个可以根据实际情况调整线程数量的线程池
-
public static ScheduledExecutorService newScheduledThreadPool()
返回一个定长线程池,支持定时及周期性任务执行。
四种线程池的区别
首先说明了他们的创建方法都是静态工厂模式,里面最终都是在一个构造方法里创建的。
参数说明 :
- corePoolSize : 核心线程数
- maximumPoolSize : 最大线程数
- keepAliveTime : 线程池空闲时,线程存活的时间
- unit :时间单位
- workQueue : 阻塞队列
- threadFactory : 线程工厂
- handler : 线程拒绝策略
执行流程说明 :
这些参数都有什么用呢?首先当创建完线程池后,添加任务超过核心线程数时,会将任务添加进阻塞队列,当阻塞队列也满了之后才会开始创建新的线程,直到达到最大线程数,如果此时再进入新的任务则会采取线程拒绝策略。
当任务逐渐减少,目前执行的任务小于核心线程数时,此时多出核心线程数的线程会根据线程存活时间进行销毁线程,但是核心线程数不会被销毁。
具体也可参考java并发编程-线程池(二)ThreadPoolExecutor参数详解
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1.FixedThreadPool
首先来看它的静态方法
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数 = 最大线程数 = 传入参数nThread
线程存活时间为0L
阻塞队列使用LinkedBlockingQueue,一个共享的无界队列
2.SingleThreadExecutor
首先来看它的静态方法
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
线程池中最多同时只有一个线程活跃
同一时刻只有一个任务执行
其他所有线程放入阻塞队列中
3.CachedThreadPool
CachedThreadPool是一个根据需要创建线程的线程池。
首先来看它的静态方法
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数-0
最大线程数-Integer.MAX_VALUE
一个线程如果在 60还没有被使用的话会被移除线程池
阻塞队列使用SynchronousQueue
参考线程池之CachedThreadPool学习
4.ScheduledThreadPool
ScheduledThreadPool是一个能实现定时、周期性任务的线程池。
首先来看它的静态方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//-----↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓-----
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
核心线程数 corePoolSIze
最大线程数Integer.MAX_VALUE
阻塞队列使用DelayedWorkQueue优先级队列
超过corePoolSize的线程在执行完任务后即终止
参考线程池之ScheduledThreadPool学习
四种拒绝策略
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。(默认)
- ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
阿里巴巴开发手册创建线程池
在阿里巴巴开发手册中有提到 :
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
所以线程池的创建最好直接使用ThreadPoolExecutor的构造方法来创建线程池。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 0L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
三.线程池的线程复用
线程是通过start方法来执行的,如果连续执行两次会报出异常,那么线程池中是如何使线程复用执行多个不同的任务的?
我们直接从threadPoolExecutor的执行任务的execute方法入手来看看吧。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当前工作线程小于corePoolSize,新建work线程并返回
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果大于等于corePoolSize,添加任务到队列。并进行二次确认(确认队列是否关闭,进行回滚)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//添加队列失败后,则尝试新建非core线程,失败则拒绝任务。
else if (!addWorker(command, false))
reject(command);
}
发现里面调用了多次addWorker方法,在点进去看看addWorker方法。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
大致上看,应该是创建新的线程并start启动,new了一个Worker类,它是ThreadPoolExecutor中的内部类,并且实现了Runnable接口。有由final修饰的thread,在上面这段代码中用t接收,并且在后面调用start方法运行起来。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
...
final Thread thread; // 工作线程
Runnable firstTask;
...
}
既然start肯定执行它的run方法,它的run方法中调用了runWorker方法,传参是自身。
public void run() {
runWorker(this);
}
再看看看runWorker方法。根据源码的注释中可知,worker通过while循环反复从队列中获取任务,并且执行。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
那总而言之,就是在初始化线程池后,添加初始任务时会通过addWorker(),在里面new Worker(task)创建线程并传入任务,添加到workers集合中,并且启动worker线程,它的run方法中是调用了runWorker方法,在该方法中会反复循环在队列中获取任务,然后执行task.run(),因为不是调用start方法,所以没有重启创建线程,而是在当前worker线程上执行任务,从而实现了线程池的线程复用。