前言
应用服务器,都需要处理从客户端发起的任务请求,这些任务往往具有高密度,短时间的特性,无论通过什么方式在服务器得到client 请求后,服务器就需要独立的处理这个客户请求。针对这个的问题,线程池提供了处理系统性能和大用户量请求之间的矛盾的方法,通过对多个任务重用已经存在的线程对象,降低了对线程对象创建和销毁的开销,由于当客户请求到了时,线程对象已经存在,可以提高请求的响应时间从而整体的提高了系统服务的表现。线程使应用能够更加充分合理的协调利用cpu 、内存、网络、i/o等系统资源。
线程的创建需要开辟虚拟机栈,本地方法栈、程序计数器等线程私有的内存空间。在线程的销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源,增加并发编程的风险。另外,在服务器负载过大的时候,如何让新的线程等待或者友好的拒绝服务?这些丢失线程自身无法解决的。所以需要通过线程池协调多个线程,并实现类似主次线程隔离、定时执行、周期执行等任务。
线程池的作用
- 利用线程池管理并复用线程、控制最大并发数等。
- 实现任务线程队列缓存策略和拒绝机制。
- 实现某些与时间相关的功能,如定时执行、周期执行等。
- 隔离线程环境。比如,交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔开,避免个服务线程互相影响。
线程池的执行流程
JDK1.8中的线程池
Java中可以通过Executors工厂类来创建6种线程池,分别为:newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool,newScheduledThreadPool,newSingleThreadScheduledExecutor,newWorkStealingPool
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:
public class TestThreadPool { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 20; i++) { executorService.execute(new Task()); } executorService.shutdown(); } } class Task implements Runnable { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ": running!"); } }
在线程池中保持二个线程可以同时执行,但是注意,并不是说线程池中永远都是这二个线程,只是说可以同时存在的线程数,当某个线程执行结束后,会有新的线程进来打印结果为:
其源码是调用了ThreadPoolExecutor来创建的线程池,其核心线程数和最大线程数都为设定的值。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public class TestThreadPool { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 20; i++) { executorService.execute(new Task()); } executorService.shutdown(); } } class Task implements Runnable { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ": running!"); } }
打印结果为:
其源码是调用了ThreadPoolExecutor来创建的线程池,其核心线程数和最大线程数都为1。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newCachedThreadPool
创建一个可缓存线程池,应用中存在的线程数可以无限大。如下:
public class TestThreadPool { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 20; i++) { executorService.execute(new Task()); } } } class Task implements Runnable { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ": running!"); } }
打印结果为:
我们跟进到newCachedThreadPool的源码中,发现newCachedThreadPool的创建本质上还是通过ThreadPoolExecutor来创建的,只不过将核心线程数设置为0,最大线程数设置为Integer的最大值。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newScheduledThreadPool
newScheduledThreadPool是一个可以执行周期性任务的线程池,我们通过Executor工厂类创建该线程池,并且可以指定其核心线程数以及创建线程的工厂类。通过newScheduledThreadPool可以执行延时任务和定时任务,延时任务如下:
public class TestThreadPool02 { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); System.out.println(System.currentTimeMillis()); scheduledExecutorService.schedule(()->{ System.out.println("延迟三秒执行"); System.out.println(System.currentTimeMillis()); }, 3, TimeUnit.SECONDS); scheduledExecutorService.shutdown(); } }
打印结果为:
执行定时任务:
public class TestThreadPool02 { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); System.out.println(System.currentTimeMillis()); scheduledExecutorService.scheduleAtFixedRate(()->{ System.out.println("1----延时1s执行,每3s执行一次"); System.out.println(System.currentTimeMillis()); }, 1, 3, TimeUnit.SECONDS); } }
打印结果如下:
ScheduledExecutorService有两个方法可以完成执行定时任务,分别是scheduledAtFixedRate和scheduleWithFixedDelay,两个方法的区别为:
- scheduleAtFixedRate:不管上次任务执行是否结束,都会严格按照定时时间执行。所以有可能存在同一个任务同时多个执行的问题。
- scheduleWithFixedDelay:只会等上次任务执行结束之后,才会开始按照定时时间继续执行。所以同一个任务只会有一个在执行。
newSingleThreadScheduledExecutor
newSingleThreadScheduledExecutor与newScheduledThreadPool相似,只不过其核心线程数为1;
newWorkStealingPool
newWorkStealingPool适合使用在很耗时的操作,但是newWorkStealingPool不是ThreadPoolExecutor的扩展,它是新的线程池类ForkJoinPool的扩展,但是都是在统一的一个Executors类中实现,由于能够合理的使用CPU进行对任务操作(并行操作),所以适合使用在很耗时的任务中。
ThreadPoolExecutor参数
通过上面的线程池,我们发现所有特性的线程池都是通过ThreadPoolExecutor来创建的。并且在阿里巴巴开发手册里面明确指出不建议使用Executors来创建线程池。
所以,我们了解ThreadPoolExecutor中的相关从参数是很有必要的。在ThreadPoolExecutor的源码中,有许多的ThreadPoolExecutor的构造函数,但是所有的构造函数最终都是调用的下面这个构造函数,下面对其中的7个参数进行相关的解释。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize
线程池中核心线程数:核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。
maximumPoolSize
线程池最大线程数:当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
keepAliveTime
线程存活时间:该存活时间指的是超过核心线程数的线程的存活时间,当这些线程空闲时,线程池将会对其进行销毁回收。
unit
线程存活时间单位:该类是一个枚举类,只要是指定线程存活时间的具体单位,可以为秒,分,小时等等。
workQueue
保存任务的阻塞队列,阻塞队列往往用于生产者-消费者问题中。
threadFactory
线程工厂:创建线程的工厂,一般使用默认的工程,不做改变。
handler
拒绝策略:当线程池中的线程数量已经达到了maximumPoolSize,并且保存任务的阻塞队列也满了,则执行拒绝策略。
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃任务但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:有调用线程处理该任务。
阻塞队列
JDK提供了7种阻塞队列,ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,DelayQueue,SynchronousQueue,LinkedTransferQueue和LinkedBlockingDeque;
当阻塞队列为空时,从队列中获取元素的操作将会被阻塞。
当阻塞队列为满时,从队列里添加元素的操作将会被阻塞。
ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部, 维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。 而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
LinkedBlockingQueue
基于链表的阻塞队列,同ArrayListBlockingQueue类似, 其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了, 才能够从队列中获取到该元素。DelayQueue是一 一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一 个超时未响应的连接队列。
PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果-方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品, 因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高-些(可以批量买卖) ;但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:如果采用公平模式: SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认) : SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式。
LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
相比于其他阻塞队列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法,以first结尾的方法,表示插入、获取获移除双端队列的第一个元素。以last结尾的方法,表示插入、获取获移除双端队列的最后一个元素。
LinkedBlockingDeque是可选容量的,在初始化时可以设置容量防止其过度膨胀,如果不设置,默认容量大小为Integer.MAX_VALUE。