线程池

1简介

  线程池是啥子,干啥使它呀,老子线程使得好好的,非得多次一举,哈哈,想必来这里看这篇文章的都对线程池有点了解。那么我来整理整理线程池的好处吧。

1、线程池的重用

        线程的创建和销毁的开销是巨大的,而通过线程池的重用大大减少了这些不必要的开销,当然既然少了这么多消费内存的开销,其线程执行速度也是突飞猛进的提升。

2、控制线程池的并发数

       初学新手可能对并发这个词语比较陌生,特此我也是结合百度百科和必生所学得出最优解释,万万记着并发可跟并行不一样。

并发:在某个时间段内,多个程序都处在执行和执行完毕之间;但在一个时间点上只有一个程序在运行。头脑风暴:老鹰妈妈喂小雏鹰食物,小雏鹰很多,而老鹰只有一张嘴,她需要一个个喂过去,到最后每个小雏鹰都可以吃到,但是在一个时间点里只能有一个小雏鹰可以吃到美味的食物。

并行:在某个时间段里,每个程序按照自己独立异步的速度执行,程序之间互不干扰。头脑风暴:这就好似老鹰妈妈决定这样喂食太费劲于是为每个小雏鹰请了个保姆,这样子在一个时间点里,每个小雏鹰都可以同时吃到食物,而且互相不干扰。

       回到线程池,控制线程池的并发数可以有效的避免大量的线程池争夺CPU资源而造成堵塞。头脑风暴:还是拿老鹰的例子来讲,妈妈只有一个,要这么一个个喂下去,一些饿坏的小雏鹰等不下去了就要破坏规则,抢在靠前喂食的雏鹰面前,而前面的雏鹰也不是吃软饭的,于是打起来了,场面混乱。老鹰生气了,这么不懂事,谁也别吃了,于是造成了最后谁也没食吃的局面。

3、线程池可以对线程进行管理

        线程池可以提供定时、定期、单线程、并发数控制等功能。比如通过ScheduledThreadPool线程池来执行S秒后,每隔N秒执行一次的任务。

2线程池类

在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,我们就提出了线程池的概念。

 

线程池:Java中开辟出了一种管理线程的概念,这个概念叫做线程池,从概念以及应用场景中,我们可以看出,线程池的好处,就是可以方便的管理线程,也可以减少内存的消耗。

 

那么,我们应该如何创建一个线程池那?Java中已经提供了创建线程池的一个类:Executor

而我们创建时,一般使用它的子类:ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,  

                              int maximumPoolSize,  

                              long keepAliveTime,  

                              TimeUnit unit,  

                              BlockingQueue<Runnable> workQueue,  

                              ThreadFactory threadFactory,  

                              RejectedExecutionHandler handler)

线程池中的corePoolSize就是线程池中的核心线程数量,这几个核心线程,只是在没有用的时候,也不会被回收,maximumPoolSize就是线程池中可以容纳的最大线程的数量,而keepAliveTime,就是线程池中除了核心线程之外的其他的最长可以保留的时间,因为在线程池中,除了核心线程即使在无任务的情况下也不能被清除,其余的都是有存活时间的,意思就是非核心线程可以保留的最长的空闲时间,而util,就是计算这个时间的一个单位,workQueue,就是等待队列,任务可以储存在任务队列中等待被执行,执行的是FIFIO原则(先进先出)。threadFactory,就是创建线程的线程工厂,最后一个handler,是一种拒绝策略,我们可以在任务满了知乎,拒绝执行某些任务。

有图我们可以看出,任务进来时,首先执行判断,判断核心线程是否处于空闲状态,如果不是,核心线程就先就执行任务,如果核心线程已满,则判断任务队列是否有地方存放该任务,若果有,就将任务保存在任务队列中,等待执行,如果满了,在判断最大可容纳的线程数,如果没有超出这个数量,就开创非核心线程执行任务,如果超出了,就调用handler实现拒绝策略。

3四种常见的线程池:

CachedThreadPool:可缓存的线程池,该线程池中没有核心线程,非核心线程的数量为Integer.max_value,就是无限大,当有需要时创建线程来执行任务,没有需要时回收线程,适用于耗时少,任务量大的情况。

 

带缓冲线程池,从构造看核心线程数为0,最大线程数为Integer最大值大小,超过0个的空闲线程在60秒后销毁,SynchronousQueue这是一个直接提交的队列,意味着每个新任务都会有线程来执行,如果线程池有可用线程则执行任务,没有的话就创建一个来执行,线程池中的线程数不确定,一般建议执行速度较快较小的线程,不然这个最大线程池边界过大容易造成内存溢出。

/**
  * Creates a thread pool that creates new threads as needed, but
  * will reuse previously constructed threads when they are
  * available.  These pools will typically improve the performance
  * of programs that execute many short-lived asynchronous tasks.
  */
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

SecudleThreadPool:周期性执行任务的线程池,按照某种特定的计划执行线程中的任务,有核心线程,但也有非核心线程,非核心线程的大小也为无限大。适用于执行周期性的任务。

 

调度线程池,即按一定的周期执行任务,即定时任务,对ThreadPoolExecutor进行了包装而已。

/**
 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

SingleThreadPool:只有一条线程来执行任务,适用于有顺序的任务的应用场景。

 

单线程线程池,核心线程数和最大线程数均为1,空闲线程存活0毫秒同样无意思,意味着每次只执行一个线程,多余的先存储到工作队列,一个一个执行,保证了线程的顺序执行。

/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue. (Note however that if this single
 * thread terminates due to a failure during execution prior to
 * shutdown, a new one will take its place if needed to execute
 * subsequent tasks.)  Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

FixedThreadPool:定长的线程池,有核心线程,核心线程的即为最大的线程数量,没有非核心线程

 

固定线程池,核心线程数和最大线程数固定相等,而空闲存活时间为0毫秒,说明此参数也无意义,工作队列为最大为Integer.MAX_VALUE大小的阻塞队列。当执行任务时,如果线程都很忙,就会丢到工作队列等有空闲线程时再执行,队列满就执行默认的拒绝策略。

/**
  * Creates a thread pool that reuses a fixed number of threads
  * operating off a shared unbounded queue, using the provided
  * ThreadFactory to create new threads when needed. 
  */
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
}

线程池的七个参数

在Java中,我们一般通过集成Thread类和实现Runnnable接口,调用线程的start()方法实现线程的启动。但如果并发的数量很多,而且每个线程都是执行很短的时间便结束了,那样频繁的创建线程和销毁进程会大大的降低系统运行的效率。线程池正是为了解决多线程效率低的问题而产生的,他使得线程可以被复用,就是线程执行结束后不被销毁,而是可以继续执行其他任务。(这里可以用tomcat做例子进行思考)

java.uitl.concurrent.ThreadPoolExecutor类是线程池中核心的一个类,在ThreadPoolExecutor中提供了四个构造方法。通过源码可以发现,前面三个的构造器最后都是调用了第四个构造器进行初始化。

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}
参数理解:

corePollSize:核心线程数。在创建了线程池后,线程中没有任何线程,等到有任务到来时才创建线程去执行任务。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

maximumPoolSize:最大线程数。表明线程中最多能够创建的线程数量。

keepAliveTime:空闲的线程保留的时间。

TimeUnit:空闲线程的保留时间单位。

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
BlockingQueue<Runnable>:阻塞队列,存储等待执行的任务。参数有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue可选。

ThreadFactory:线程工厂,用来创建线程

RejectedExecutionHandler:队列已满,而且任务量大于最大线程的异常处理策略。有以下取值

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务!

线程池参数设置技巧

一、ThreadPoolExecutor的重要参数

  • corePoolSize:核心线程数
    • 核心线程会一直存活,即使没有任务需要执行
    • 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
    • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
  • queueCapacity:任务队列容量(阻塞队列)
    • 当核心线程数达到最大时,新任务会放在队列中排队等待执行
  • maxPoolSize:最大线程数
    • 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
    • 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
  • keepAliveTime:线程空闲时间
    • 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
    • 如果allowCoreThreadTimeout=true,则会直到线程数量=0
  • allowCoreThreadTimeout:允许核心线程超时
  • rejectedExecutionHandler:任务拒绝处理器
    • 两种情况会拒绝处理任务:
      • 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
      • 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
    • 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
    • ThreadPoolExecutor类有几个内部实现类来处理这类情况:
      • AbortPolicy 丢弃任务,抛运行时异常
      • CallerRunsPolicy 执行任务
      • DiscardPolicy 忽视,什么都不会发生
      • DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
    • 实现RejectedExecutionHandler接口,可自定义处理器

二、ThreadPoolExecutor执行顺序:

     线程池按以下行为执行任务

  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满
    1. 若线程数小于最大线程数,创建线程
    2. 若线程数等于最大线程数,抛出异常,拒绝任务

三、如何设置参数

  • 默认值
    • corePoolSize=1
    • queueCapacity=Integer.MAX_VALUE
    • maxPoolSize=Integer.MAX_VALUE
    • keepAliveTime=60s
    • allowCoreThreadTimeout=false
    • rejectedExecutionHandler=AbortPolicy()
  • 如何来设置
    • 需要根据几个值来决定
      • tasks :每秒的任务数,假设为500~1000
      • taskcost:每个任务花费时间,假设为0.1s
      • responsetime:系统允许容忍的最大响应时间,假设为1s
    • 做几个计算
      • corePoolSize = 每秒需要多少个线程处理? 
        • threadcount = tasks/(1/taskcost) =tasks*taskcout =  (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
        • 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
      • queueCapacity = (coreSizePool/taskcost)*responsetime
        • 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
        • 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
      • maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
        • 计算可得 maxPoolSize = (1000-80)/10 = 92
        • (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
      • rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
      • keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
    • 以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件(呵呵)和优化代码,降低taskcost来处理。