本文将会介绍一下java线程池,线程池内容不少,具体漏掉的地方自行网上搜索。
任务缓存队列
线程池工厂
拒绝策略
三种提交任务的方式
关于Runnable、Callable和Future、FutureTask
线程池的关闭
线程的状态
线程池实例(已默认帮我们实现好的线程池)
如何选择线程池数量
线程池的正确使用
手动创建线程池有几个注意点
线程池是什么?
JDK提供的线程池API
Executor提供了一种将“任务提交”与“任务执行”分离开来的机制(解耦)。
JDK1.7中新增的一个线程池,与ThreadPoolExecutor一样,同样继承了AbstractExecutorService。ForkJoinPool是Fork/Join框架的两大核心类之一。与其它类型的ExecutorService相比,其主要的不同在于采用了工作窃取算法(work-stealing):所有池中线程会尝试找到并执行已被提交到池中的或由其他线程创建的任务。这样很少有线程会处于空闲状态,非常高效。这使得能够有效地处理以下情景:大多数由任务产生大量子任务的情况;从外部客户端大量提交小任务到池中的情况。
其中几个实现类都是可以由我们自己创建出(new出)线程池的:
关于上面各别接口和类的区别总结:
ThreadPoolExecutor(英译:线程池执行者)
// Java线程池的完整构造函数
这些参数中,比较容易引起问题的有corePoolSize, maximumPoolSize, workQueue以及handler:
- corePoolSize和maximumPoolSize设置不当会影响效率,甚至耗尽线程;
- workQueue设置不当容易导致OOM;
- handler设置不当会导致提交任务时抛出异常。
线程池比喻(现实中的例子)
线程池和装修公司
以运营一家装修公司做个比喻。公司在办公地点等待客户来提交装修请求;公司有固定数量的正式工以维持运转;旺季业务较多时,新来的客户请求会被排期,比如接单后告诉用户一个月后才能开始装修;当排期太多时,为避免用户等太久,公司会通过某些渠道(比如人才市场、熟人介绍等)雇佣一些临时工(注意,招聘临时工是在排期排满之后);如果临时工也忙不过来,公司将决定不再接收新的客户,直接拒单。
线程池就是程序中的“装修公司”,代劳各种脏活累活。上面的过程对应到线程池上:
// Java线程池的完整构造函数 public ThreadPoolExecutor( int corePoolSize, // 正式工数量 int maximumPoolSize, // 工人数量上限,包括正式工和临时工 long keepAliveTime, TimeUnit unit, // 临时工游手好闲的最长时间,超过这个时间将被解雇 BlockingQueue<Runnable> workQueue, // 排期队列 ThreadFactory threadFactory, // 招人渠道 RejectedExecutionHandler handler // 拒单方式 )
线程池任务执行流程:【重点】
- 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
- 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
- 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
- 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
- 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
- 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
任务缓存队列
即workQueue,它用来存放等待执行的任务。workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
1)有界任务队列ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;比如假定线程池的最小线程数为4,最大为8所用的ArrayBlockingQueue最大为10。随着任务到达并被放到队列中,线程池中最多运行4个线程(即最小线程数)。即使队列完全填满,也就是说有10个处于等待状态的任务,ThreadPoolExecutor也只会利用4个线程。如果队列已满,而又有新任务进来,此时才会启动一个新线程,这里不会因为队列已满而拒接该任务,相反会启动一个新线程。新线程会运行队列中的第一个任务,为新来的任务腾出空间。这个算法背后的理念是:该池大部分时间仅使用核心线程(4个),即使有适量的任务在队列中等待运行。这时线程池就可以用作节流阀。如果挤压的请求变得非常多,这时该池就会尝试运行更多的线程来清理;这时第二个节流阀—最大线程数就起作用了。
2)无界任务队列LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;一般如果线程池任务队列采用LinkedBlockingQueue队列的话,那么不会拒绝任何任务(因为队列大小没有限制),这种情况下,ThreadPoolExecutor最多仅会按照最小线程数来创建线程,也就是说线程池大小被忽略了。会有消耗很大的内存
3)直接提交队列synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。(可以理解为队列大小为0,与无界任务的无限大相反)
线程池工厂
Executors的线程池如果不指定线程工厂会使用Executors中的DefaultThreadFactory,默认线程池工厂创建的线程都是非守护线程。使用自定义的线程工厂可以做很多事情,比如可以跟踪线程池在何时创建了多少线程,也可以自定义线程名称和优先级。如果将
新建的线程都设置成守护线程,当主线程退出后,将会强制销毁线程池。
Daemon(守护线程)例子:https://blog.csdn.net/aa1215018028/article/details/82814192
守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。
当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。在线程启动之前使用 setDaemon() 方法可以将一个线程设置为守护线程。
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MICROSECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {public Thread newThread(Runnable r) {Thread t = new Thread(r);System.out.println("创建线程"+t);return t;}});
拒绝策略
AbortPolicy:丢弃任务并抛出RejectedExecutionExceptionCallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
DiscardPolicy:丢弃任务,不做任何处理。
创建线程池代码例子:
三种提交任务的方式:
提交方式 | 是否关心返回结果 |
---|---|
Future<T> submit(Callable<T> task) | 是 |
void execute(Runnable command) | 否 |
Future<?> submit(Runnable task) | 否,虽然返回Future,但是其get()方法总是返回null |
关于Runnable、Callable和Future、FutureTask
可以向线程池提交的任务有两种:Runnable和Callable(都是接口),二者的区别如下:
- 方法签名不同,void Runnable.run(), V Callable.call() throws Exception【继承Runnable要实现run方法,继承Callable要实现call方法】Callable就是Runnable的扩展
- 是否允许有返回值,Callable允许有返回值
- 是否允许抛出异常,Callable允许抛出异常,即通过throws抛出,Runnable还是可以直接在Run方法里处理异常的,但是不能抛出外面。
FutureTask提供了2个构造器:
1 2 3 4 | public FutureTask(Callable<V> callable) { } public FutureTask(Runnable runnable, V result) { } |
线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务列表
线程的状态:
-
RUNNING:线程池能够接受新任务,以及对新添加的任务进行处理。
-
SHUTDOWN:线程池不可以接受新任务,但是可以对已添加的任务进行处理。
-
STOP:线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
-
TIDYING:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
-
TERMINATED:线程池彻底终止的状态。
扩展ThreadPoolExecutor
在执行任务的线程中将调用beforeExecute和afterExecute,这些方法中还可以添加日志,计时,监视或统计收集的功能,例子:https://blog.csdn.net/aa1215018028/article/details/82814192
线程池实例(已默认帮我们实现好的线程池)
ExecutorService ... = Executors.new...();
newCachedThreadPool:
- 底层:返回ThreadPoolExecutor实例,corePoolSize为0;maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为60L;unit为TimeUnit.SECONDS;workQueue为SynchronousQueue(同步队列)
- 通俗:当有新任务到来,直接插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。
- 适用:执行很多短期异步的小程序或者负载较轻的服务器
newFixedThreadPool:(要指定长度)
- 底层:返回ThreadPoolExecutor实例,接收参数为所设定线程数量nThread,corePoolSize为nThread,maximumPoolSize为nThread;keepAliveTime为0L(不限时);unit为:TimeUnit.MILLISECONDS;WorkQueue为:new LinkedBlockingQueue<Runnable>() 无界阻塞队列
- 通俗:创建可容纳固定数量线程的池子,每个线程的存活时间是无限的,当池子满了就不在添加线程了;如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)
- 适用:执行长期的任务,性能好很多
newSingleThreadExecutor:
- 底层:FinalizableDelegatedExecutorService包装的ThreadPoolExecutor实例,corePoolSize为1;maximumPoolSize为1;keepAliveTime为0L(不限时);unit为:TimeUnit.MILLISECONDS;workQueue为:new LinkedBlockingQueue<Runnable>() 无界阻塞队列
- 通俗:创建只有一个线程的线程池,且线程的存活时间是无限的;当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)
- 适用:一个任务一个任务执行的场景
NewScheduledThreadPool:(要指定长度)
- 底层:创建ScheduledThreadPoolExecutor实例,corePoolSize为传递来的参数,maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为0L(不限时);unit为:TimeUnit.NANOSECONDS;workQueue为:new DelayedWorkQueue() 一个按超时时间升序排序的队列
- 通俗:创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue队列中,这是一种按照超时时间排序的队列结构
- 适用:周期性执行任务的场景,可以设置某个任务延迟几秒后每几秒执行一次。
ScheduledExecutorService比Timer更安全,功能更强大
1)newFixedThreadPool和newSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。2)newCachedThreadPool和newScheduledThreadPool:主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
杂谈【参考:https://blog.csdn.net/aa1215018028/article/details/82814192,CSDN博主「aa1215018028」】
如何选择线程池数量
当然线程池的大小也不需要做的太过于精确,只需要避免过大和过小的情况。一般来说,确定线程池的大小需要考虑CPU的数量,内存大小,任务是计算密集型还是IO密集型等因素
NCPU = CPU的数量
UCPU = 期望对CPU的使用率 0 ≤ UCPU ≤ 1
W/C = 等待时间与计算时间的比率
如果希望处理器达到理想的使用率,那么线程池的最优大小为:
线程池大小=NCPU *UCPU(1+W/C)
在Java中使用
int ncpus = Runtime.getRuntime().availableProcessors();
获取CPU的数量。
多线程的使用和线程数的设置
开发中我们经常会使用到线程池来处理一些业务,而在不新增设备的情况下,我们所能使用的线程资源又不是无线的,那么高并发、任务执行时间短的业务怎样使用线程池?还有并发不高、任务执行时间长的业务怎样使用线程池?并发高、业务执行时间长的业务怎样使用线程池?
接下来我们进行一一分析:
1:高并发、任务执行时间短的业务,CPU密集型的,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换
2:并发不高、任务执行时间长的业务这就需要区分开看了:
a)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务
b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,和(1)一样吧,线程池中的线程数设置得少一些,减少线程上下文的切换
(其实从一二可以看出无论并发高不高,对于业务中是否是cou密集还是I/O密集的判断都是需要的当前前提是你需要优化性能的前提下)
3:并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,可以引入三方中间件进行异步操作。
线程池的正确使用
以下阿里编码规范里面说的一段话:
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
手动创建线程池有几个注意点
2.合理配置阻塞时间过长的任务。如果任务阻塞时间过长,那么即使不出现死锁,线程池的性能也会变得很糟糕。在Java并发包里可阻塞方法都同时定义了限时方式和不限时方式。例如
Thread.join,BlockingQueue.put,CountDownLatch.await等,如果任务超时,则标识任务失败,然后中止任务或者将任务放回队列以便随后执行,这样,无论任务的最终结果是否成功,这种办法都能够保证任务总能继续执行下去。
3.设置合理的线程池大小。只需要避免过大或者过小的情况即可,上文的公式线程池大小=NCPU *UCPU(1+W/C)。
4.选择合适的阻塞队列。newFixedThreadPool和newSingleThreadExecutor都使用了无界的阻塞队列,无界阻塞队列会有消耗很大的内存,如果使用了有界阻塞队列,它会规避内存占用过大的问题,但是当任务填满有界阻塞队列,新的任务该怎么办?在使用有界队列是,需要选择合适的拒绝策略,队列的大小和线程池的大小必须一起调节。对于非常大的或者无界的线程池,可以使用SynchronousQueue来避免任务排队,以直接将任务从生产者提交到工作者线程。
private static ExecutorService createDefaultExecutorService(Args args) {
SynchronousQueue executorQueue = new SynchronousQueue();
return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS,
executorQueue);
}