本文将会介绍一下java线程池,线程池内容不少,具体漏掉的地方自行网上搜索。

本文内容篇幅较长,主要参考文章:
https://blog.csdn.net/aa1215018028/article/details/82814192(比较全面,拒绝策略,队列,等等很多介绍到的,比较重点)
https://www.cnblogs.com/sachen/p/7401959.html(主要引用4个实现池的简介。以及线程池执行流程)
https://www.cnblogs.com/CarpenterLee/p/9558026.html(主要参考构造函数的参数的解析,提交任务、Future处理异常、装修公司的例子)
https://www.cnblogs.com/zhujiabin/p/5404771.html(4个线程池实现的例子,本文没引用)

【大纲】:
线程是什么?
JDK提供的线程池API
可以自己实现的几个线程池类
ThreadPoolExecutor(英译:线程池执行者)类、参数等
线程池比喻(现实中的例子)
线程池任务执行流程【重点】
任务缓存队列
线程池工厂
拒绝策略
三种提交任务的方式
关于Runnable、Callable和Future、FutureTask
线程池的关闭
线程的状态
线程池实例(已默认帮我们实现好的线程池)
如何选择线程池数量
线程池的正确使用
手动创建线程池有几个注意点

线程池是什么?

线程池可以看做是线程的***。在没有任务时线程处于空闲状态,当请求到来:线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务(而不是销毁)。这样就实现了线程的重用。
平常我们学线程一个一个线程创建时有缺点:线程生命周期的开销非常高(线程创建销毁花费的资源和时间很大)程序的稳定性和健壮性会下降(频繁创建线程可能受到攻击(待了解),而且太多线程对内存消耗过大)所以一般都是用线程池来管理线程的。

JDK提供的线程池API

JDK给我们提供了Excutor(英译:开释者)框架来使用线程池,它是线程池的基础。
Executor提供了一种将“任务提交”与“任务执行”分离开来的机制(解耦)。

下面先看一下各种接口与实现类的关系:(网上参考图,具体看参考链接)
还有一个是JDK1.7新增的线程池:ForkJoinPool线程池

JDK1.7中新增的一个线程池,与ThreadPoolExecutor一样,同样继承了AbstractExecutorService。ForkJoinPool是Fork/Join框架的两大核心类之一。与其它类型的ExecutorService相比,其主要的不同在于采用了工作窃取算法(work-stealing):所有池中线程会尝试找到并执行已被提交到池中的或由其他线程创建的任务。这样很少有线程会处于空闲状态,非常高效。这使得能够有效地处理以下情景:大多数由任务产生大量子任务的情况;从外部客户端大量提交小任务到池中的情况。

(这是工厂类直接提供的现成的线程池,不需要我们自己填参数去new,下面会介绍)

还有一张联系全面点的图:

其中几个实现类都是可以由我们自己创建出(new出)线程池的:


其中我们平时最常用到的是ThreadPoolExecutor,ExecutorService的默认实现。
ScheduledExecutorService接口主要是提供了“延迟“和“定期执行(循环)”的ExecutorService,提供了一些方法安排任务在给定的延时执行或者周期性执行
ScheduledThreadPoolExecutor相当于提供了“延迟”和“周期执行”功能的ThreadPoolExecutor。是继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。


关于上面各别接口和类的区别总结:

Executor接口中定义了execute()方法,用来接收一个Runnable接口的对象,而ExecutorService接口中定义的submit()方法可以接收Runnable和Callable接口对象。
Executor和ExecutorService除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。



ThreadPoolExecutor(英译:线程池执行者)

是默认创建线程池的,我们先要了解他的各项参数,才能进行下面的介绍:

// Java线程池的完整构造函数

public ThreadPoolExecutor(
int corePoolSize, // 线程池长期维持的线程数,即使线程处于空闲状态,也不会回收。
int maximumPoolSize, // 线程总数的上限
long keepAliveTime, TimeUnit unit, // 超过corePoolSize的线程的idle时长, 超过这个时间,多余的线程会被回收。 unit是时间单位
BlockingQueue<Runnable> workQueue, // 任务的排队队列
ThreadFactory threadFactory, // 新线程的产生方式
RejectedExecutionHandler handler // 拒绝策略

这些参数中,比较容易引起问题的有corePoolSize, maximumPoolSize, workQueue以及handler:

  • corePoolSize和maximumPoolSize设置不当会影响效率,甚至耗尽线程;
  • workQueue设置不当容易导致OOM;
  • handler设置不当会导致提交任务时抛出异常。


线程池比喻(现实中的例子)

直接看介绍可能比较朦胧,但是后面将讲的基本都是依靠在这些参数的基础上的,所以先引用一个例子来讲解这些参数:

线程池和装修公司

以运营一家装修公司做个比喻。公司在办公地点等待客户来提交装修请求;公司有固定数量的正式工以维持运转;旺季业务较多时,新来的客户请求会被排期,比如接单后告诉用户一个月后才能开始装修;当排期太多时,为避免用户等太久,公司会通过某些渠道(比如人才市场、熟人介绍等)雇佣一些临时工(注意,招聘临时工是在排期排满之后);如果临时工也忙不过来,公司将决定不再接收新的客户,直接拒单。

线程池就是程序中的“装修公司”,代劳各种脏活累活。上面的过程对应到线程池上:

// Java线程池的完整构造函数 public ThreadPoolExecutor( int corePoolSize, // 正式工数量 int maximumPoolSize, // 工人数量上限,包括正式工和临时工 long keepAliveTime, TimeUnit unit,
 // 临时工游手好闲的最长时间,超过这个时间将被解雇
  BlockingQueue<Runnable> workQueue, // 排期队列
  ThreadFactory threadFactory, // 招人渠道
  RejectedExecutionHandler handler // 拒单方式 ) 


线程池任务执行流程:【重点】

  1. 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
  2. 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
  3. workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
  4. 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
  5. 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
  6. 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

任务缓存队列

即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

1)有界任务队列ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;比如假定线程池的最小线程数为4,最大为8所用的ArrayBlockingQueue最大为10。随着任务到达并被放到队列中,线程池中最多运行4个线程(即最小线程数)。即使队列完全填满,也就是说有10个处于等待状态的任务,ThreadPoolExecutor也只会利用4个线程。如果队列已满,而又有新任务进来,此时才会启动一个新线程,这里不会因为队列已满而拒接该任务,相反会启动一个新线程。新线程会运行队列中的第一个任务,为新来的任务腾出空间。这个算法背后的理念是:该池大部分时间仅使用核心线程(4个),即使有适量的任务在队列中等待运行。这时线程池就可以用作节流阀。如果挤压的请求变得非常多,这时该池就会尝试运行更多的线程来清理;这时第二个节流阀—最大线程数就起作用了。

2)无界任务队列LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;一般如果线程池任务队列采用LinkedBlockingQueue队列的话,那么不会拒绝任何任务(因为队列大小没有限制),这种情况下,ThreadPoolExecutor最多仅会按照最小线程数来创建线程,也就是说线程池大小被忽略了。会有消耗很大的内存

3)直接提交队列synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。(可以理解为队列大小为0,与无界任务的无限大相反)


线程池工厂

Executors的线程池如果不指定线程工厂会使用Executors中的DefaultThreadFactory,默认线程池工厂创建的线程都是非守护线程。

使用自定义的线程工厂可以做很多事情,比如可以跟踪线程池在何时创建了多少线程,也可以自定义线程名称和优先级。如果将

新建的线程都设置成守护线程,当主线程退出后,将会强制销毁线程池。

Daemon(守护线程)例子:https://blog.csdn.net/aa1215018028/article/details/82814192

守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。

当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程在线程启动之前使用 setDaemon() 方法可以将一个线程设置为守护线程。


新建线程池工厂的线程池的例子:
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
System.out.println("创建线程"+t);
return  t;
}
});


拒绝策略

AbortPolicy:丢弃任务并抛出RejectedExecutionException

CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

DiscardPolicy:丢弃任务,不做任何处理。



创建线程池代码例子:

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.DiscardPolicy());



三种提交任务的方式:

创建了线程池,那还得像线程池中提交任务,然后线程池才会创建线程去运行这些任务:
提交方式 是否关心返回结果
Future<T> submit(Callable<T> task)
void execute(Runnable command)
Future<?> submit(Runnable task) 否,虽然返回Future,但是其get()方法总是返回null
excute()方法中添加任务的方式是使用addWorker()方法,具体可参考:https://blog.csdn.net/aa1215018028/article/details/82814192

关于Runnable、Callable和Future、FutureTask

可以向线程池提交的任务有两种:Runnable和Callable(都是接口),二者的区别如下:

  1. 方法签名不同,void Runnable.run(), V Callable.call() throws Exception【继承Runnable要实现run方法,继承Callable要实现call方法】Callable就是Runnable的扩展
  2. 是否允许有返回值,Callable允许有返回值
  3. 是否允许抛出异常,Callable允许抛出异常,即通过throws抛出,Runnable还是可以直接在Run方法里处理异常的,但是不能抛出外面
Callable是JDK1.5时加入的接口,作为Runnable的一种补充,允许有返回值,允许抛出异常。

下面看看Callable接口的设计

可以看出其中用到的泛型主要是用来限制返回值类型的,Callable一般配合线程池的submit使用,该方法有返回值。


Future 是一个接口一般我们认为是Callable的返回值,但他其实代表的是任务的生命周期(当然了,它是能获取得到Callable的返回值的)【没抛出异常便是返回值,有异常则是异常】
线程池的处理结果、以及处理过程中的异常都被包装到Future中,并在调用Future.get()方法时获取,执行过程中的异常会被包装成ExecutionException。
submit()方法回产生Future对象,它用Callable返回结果的特定类型进行了参数化。可以用isDone()方法来查询Future是否已经完成,当任务完成时,它具有一个结果,可以调用get()方法获取该结果。也可以不用isDone()进行检查就直接调用get(),在这种情况下,get()将阻塞,直至结果准备就绪。还可以在试图调用get()来获取结果之前,先调用具有超时的get(),或者调用isDone()来查看任务是否完成。其他方法可参考:https://www.cnblogs.com/dolphin0520/p/3949310.html
单个任务超时设置V Future.get(long timeout, TimeUnit unit)方法可以指定等待的超时时间,超时未完成会抛出TimeoutException。

关于获取多个结果的,可以使用CompletionService封装线程池,通过CompletionService.take()方法一个个获取出来,具体用法自己查。
多个任务的超时设置,则应该采用CountDownLatch(倒计时)来控制了。

FutureTask实现了RunnableFuture接口RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

FutureTask提供了2个构造器:

1
2
3
4
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}
Future和FutureTask的使用代码(简单实例,代码不完全)

Future:
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();                                                //task是自定义的callable的实现类,具体自己定义
Future<Integer> result = executor.submit(task);            //Future接收结果
executor.shutdown();
System.out.println("task运行结果"+result.get());            //get取出
FutureTask:
ExecutorService executor = Executors.newCachedThreadPool();  //也可以不用线程池,直接用Thread也行,futureTask相当于线程任务和结果的封装的结果
Task task = new Task(); //task是自定义的callable的实现类,具体自己定义
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);   //FutureTask封装起来整个任务
executor.submit(futureTask);
executor.shutdown();
System.out.println("task运行结果"+result.get()); //get取出




线程池的关闭

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务列表

调用shutdown()后,线程池状态立刻变为SHUTDOWN,而调用shutdownNow(),线程池状态立刻变为STOP


线程的状态:

  • RUNNING:线程池能够接受新任务,以及对新添加的任务进行处理。

  • SHUTDOWN:线程池不可以接受新任务,但是可以对已添加的任务进行处理。

  • STOP:线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务

  • TIDYING:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。

  • TERMINATED:线程池彻底终止的状态


状态转化:




扩展ThreadPoolExecutor

ThreadPoolExecutor是可以拓展的,它提供了几个可以在子类中改写的方法:beforeExecute,afterExecute和terimated。

在执行任务的线程中将调用beforeExecute和afterExecute,这些方法中还可以添加日志,计时,监视或统计收集的功能,例子:https://blog.csdn.net/aa1215018028/article/details/82814192



线程池实例(已默认帮我们实现好的线程池)

上面我们提到过一个Executors工厂类,他其中包含了java提供给我们的几个实现好的线程池,我们可以直接用,下面分别介绍一下他们的特点:
用法:
ExecutorService ... = Executors.new...();

newCachedThreadPool

  • 底层:返回ThreadPoolExecutor实例corePoolSize0maximumPoolSizeInteger.MAX_VALUEkeepAliveTime60LunitTimeUnit.SECONDSworkQueueSynchronousQueue(同步队列)
  • 通俗:当有新任务到来,直接插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。
  • 适用:执行很多短期异步的小程序或者负载较轻的服务器

newFixedThreadPool:(要指定长度)

  • 底层:返回ThreadPoolExecutor实例,接收参数为所设定线程数量nThreadcorePoolSizenThreadmaximumPoolSizenThreadkeepAliveTime0L(不限时)unit为:TimeUnit.MILLISECONDSWorkQueue为:new LinkedBlockingQueue<Runnable>() 无界阻塞队列
  • 通俗:创建可容纳固定数量线程的池子,每个线程的存活时间是无限的,当池子满了就不在添加线程了;如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)
  • 适用:执行长期的任务,性能好很多

newSingleThreadExecutor:

  • 底层:FinalizableDelegatedExecutorService包装的ThreadPoolExecutor实例corePoolSize1maximumPoolSize1keepAliveTime0L(不限时)unit为:TimeUnit.MILLISECONDSworkQueue为:new LinkedBlockingQueue<Runnable>() 无界阻塞队列
  • 通俗:创建只有一个线程的线程池,且线程的存活时间是无限的;当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)
  • 适用:一个任务一个任务执行的场景

NewScheduledThreadPool:(要指定长度)

  • 底层:创建ScheduledThreadPoolExecutor实例corePoolSize为传递来的参数maximumPoolSizeInteger.MAX_VALUEkeepAliveTime0L(不限时)unit为:TimeUnit.NANOSECONDSworkQueue为:new DelayedWorkQueue() 一个按超时时间升序排序的队列
  • 通俗:创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue队列中,这是一种按照超时时间排序的队列结构
  • 适用:周期性执行任务的场景,可以设置某个任务延迟几秒后每几秒执行一次。

    ScheduledExecutorService比Timer更安全,功能更强大


缺点:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
什么是OOM OOM,全称“Out Of Memory”,翻译成中文就是“内存用完了”






杂谈【参考:https://blog.csdn.net/aa1215018028/article/details/82814192CSDN博主「aa1215018028」


如何选择线程池数量

线程池的大小决定着系统的性能,过大或者过小的线程池数量都无法发挥最优的系统性能。

当然线程池的大小也不需要做的太过于精确,只需要避免过大和过小的情况。一般来说,确定线程池的大小需要考虑CPU的数量,内存大小,任务是计算密集型还是IO密集型等因素

NCPU = CPU的数量

UCPU = 期望对CPU的使用率 0 ≤ UCPU ≤ 1

W/C = 等待时间与计算时间的比率

如果希望处理器达到理想的使用率,那么线程池的最优大小为:

线程池大小=NCPU *UCPU(1+W/C)

在Java中使用

int ncpus = Runtime.getRuntime().availableProcessors();
获取CPU的数量。

多线程的使用和线程数的设置

原文链接:https://blog.csdn.net/s1amduncan/article/details/95788892
      开发中我们经常会使用到线程池来处理一些业务,而在不新增设备的情况下,我们所能使用的线程资源又不是无线的,那么高并发、任务执行时间短的业务怎样使用线程池?还有并发不高、任务执行时间长的业务怎样使用线程池?并发高、业务执行时间长的业务怎样使用线程池?

接下来我们进行一一分析:
1:高并发、任务执行时间短的业务,CPU密集型的,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换
2:并发不高、任务执行时间长的业务这就需要区分开看了:
    a)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务
    b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,和(1)一样吧,线程池中的线程数设置得少一些,减少线程上下文的切换
    (其实从一二可以看出无论并发高不高,对于业务中是否是cou密集还是I/O密集的判断都是需要的当前前提是你需要优化性能的前提下)
3:并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,可以引入三方中间件进行异步操作。




线程池的正确使用


以下阿里编码规范里面说的一段话:

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。



手动创建线程池有几个注意点

1.任务(即Runnable和Calable)独立。如何任务依赖于其他任务,那么可能产生死锁。例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。

2.合理配置阻塞时间过长的任务。如果任务阻塞时间过长,那么即使不出现死锁,线程池的性能也会变得很糟糕。在Java并发包里可阻塞方法都同时定义了限时方式和不限时方式。例如

Thread.join,BlockingQueue.put,CountDownLatch.await等,如果任务超时,则标识任务失败,然后中止任务或者将任务放回队列以便随后执行,这样,无论任务的最终结果是否成功,这种办法都能够保证任务总能继续执行下去。

3.设置合理的线程池大小。只需要避免过大或者过小的情况即可,上文的公式线程池大小=NCPU *UCPU(1+W/C)。

4.选择合适的阻塞队列。newFixedThreadPool和newSingleThreadExecutor都使用了无界的阻塞队列,无界阻塞队列会有消耗很大的内存,如果使用了有界阻塞队列,它会规避内存占用过大的问题,但是当任务填满有界阻塞队列,新的任务该怎么办?在使用有界队列是,需要选择合适的拒绝策略,队列的大小和线程池的大小必须一起调节。对于非常大的或者无界的线程池,可以使用SynchronousQueue来避免任务排队,以直接将任务从生产者提交到工作者线程。





下面是Thrift框架处理socket任务所使用的一个线程池,可以看一下FaceBook的工程师是如何自定义线程池的。

private static ExecutorService createDefaultExecutorService(Args args) {
SynchronousQueue executorQueue = new SynchronousQueue();

return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
executorQueue);
}