- 线程池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
简介
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。
线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。
半同步/半异步模式又称为生产者消费者模式,是比较常见的实现方式,比较简单。分为同步层、队列层、异步层三层。同步层的主线程处理工作任务并存入工作队列,工作线程从工作队列取出任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。由于线程间有数据通信,因此不适于大数据量交换的场合。
领导者跟随者模式,在线程池中的线程可处在3种状态之一:领导者leader、追随者follower或工作者processor。任何时刻线程池只有一个领导者线程。事件到达时,领导者线程负责消息分离,并从处于追随者线程中选出一个来当继任领导者,然后将自身设置为工作者状态去处置该事件。处理完毕后工作者线程将自身的状态置为追随者。这一模式实现复杂,但避免了线程间交换任务数据,提高了CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供了领导者跟随者模式实现。
线程池的伸缩性对性能有较大的影响。
创建太多线程,将会浪费一定的资源,有些线程未被充分使用。
销毁太多线程,将导致之后浪费时间再次创建它们。
创建线程太慢,将会导致长时间的等待,性能变差。
销毁线程太慢,导致其它线程资源饥饿。组成部分
服务器程序利用线程技术响应客户请求已经司空见惯,可能您认为这样做效率已经很高,但您有没有想过优化一下使用线程的方法。该文章将向您介绍服务器程序如何利用线程池来优化性能并提供一个简单的线程池实现。
1、线程池管理器(ThreadPoolManager):用于创建并管理线程池
2、工作线程(WorkThread): 线程池中线程
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
4、任务队列:用于存放没有处理的任务。提供一种缓冲机制。功能
应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以轮循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,.NET框架为每个进程提供了一个线程池,一个线程池有若干个等待操作状态,当一个等待操作完成时,线程池中的辅助线程会执行回调函数。线程池中的线程由系统管理,程序员不需要费力于线程管理,可以集中精力处理应用程序任务。应用范围
1、需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
2、对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3、接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。
1、线程池的优势
(1)、降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
(2)、提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
(3)方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场))。
(4)提供更强大的功能,延时定时线程池。2、线程池的主要参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
1、corePoolSize(线程池基本大小):当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize时,(除了利用提交新任务来创建和启动线程(按需构造),也可以通过 prestartCoreThread() 或 prestartAllCoreThreads() 方法来提前启动线程池中的基本线程。)
2、maximumPoolSize(线程池最大大小):线程池所允许的最大线程个数。当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。
3、keepAliveTime(线程存活保持时间)当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数。
4、workQueue(任务队列):用于传输和保存等待执行任务的阻塞队列。
5、threadFactory(线程工厂):用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。
6、handler(线程饱和策略):当线程池和队列都满了,再加入线程会执行此策略。3、线程池流程
1、判断核心线程池是否已满,没满则创建一个新的工作线程来执行任务。已满则。
2、判断任务队列是否已满,没满则将新提交的任务添加在工作队列,已满则。
3、判断整个线程池是否已满,没满则创建一个新的工作线程来执行任务,已满则执行饱和策略。
(1、判断线程池中当前线程数是否大于核心线程数,如果小于,在创建一个新的线程来执行任务,如果大于则
2、判断任务队列是否已满,没满则将新提交的任务添加在工作队列,已满则。
3、判断线程池中当前线程数是否大于最大线程数,如果小于,则创建一个新的线程来执行任务,如果大于,则执行饱和策略。)4、线程池为什么需要使用(阻塞)队列?
1、因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换。
另外回到了非线程池缺点中的第1点:
2、创建线程池的消耗较高。
3、线程池创建线程需要获取mainlock这个全局锁,影响并发效率,阻塞队列可以很好的缓冲。5、线程池为什么要使用阻塞队列而不使用非阻塞队列?
阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。
当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。
使得在线程不至于一直占用cpu资源。
(线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如下:)while (task != null || (task = getTask()) != null) {}
不用阻塞队列也是可以的,不过实现起来比较麻烦而已,有好用的为啥不用呢?
6、如何配置线程池
- CPU密集型任务
尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。 - IO密集型任务
可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。 - 混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。
因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
7、java中提供的线程池
Executors类提供了4种不同的线程池:newCachedThreadPool
newFixedThreadPool
newScheduledThreadPool
newSingleThreadExecutor1 newCachedThreadPool 用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。(可以使得任务快速得到执行,因为任务时间执行短,可以很快结束,也不会造成cpu过度切换) 2 newFixedThreadPool 创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。(保证线程数可控,不会造成线程过多,导致系统负载更为严重) 3 newSingleThreadExecutor 创建一个单线程的线程池,适用于需要保证顺序执行各个任务。 4 newScheduledThreadPool 适用于执行延时或者周期性任务。 8、execute()和submit()方法
- execute(),执行一个任务,没有返回值。
- submit(),提交一个线程任务,有返回值。
submit(Callabletask)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。一般使用FutureTask+Callable配合使用(IntentService中有体现)。
submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值。
submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null。
示例
//线程池 //线程池示例 using System; using System.Threading; public classTest { //存放要计算的数值的字段 static double number1 = -1; static double number2 = -1; public static void Main() { //获取线程池的最大线程数和维护的最小空闲线程数 int maxThreadNum, minThreadNum; int portThreadNum; ThreadPool.GetMaxThreads(out maxThreadNum, out portThreadNum); ThreadPool.GetMinThreads(out minThreadNum, out portThreadNum); Console.WriteLine("最大线程数:{0}", maxThreadNum); Console.WriteLine("最小线程数:{0}", minThreadNum); //函数变量值 int x=15600; //启动第一个任务:计算x的8次方 Console.WriteLine("启动第一个任务:计算{0}的8次方。", x); ThreadPool.QueueUserWorkItem(new WaitCallback(TaskProc1), x); //启动第二个任务:计算x的8次方根 Console.WriteLine("启动第二个任务:计算{0}的8次方根。", x); ThreadPool.QueueUserWorkItem(new WaitCallback(TaskProc2), x); //等待,直到两个数值都完成计算 while(number1 == -1 || number2 == -1); //打印计算结果 Console.WriteLine("y({0}) = {1}", x, number1 + number2); Console.Read(); } //启动第一个任务:计算x的8次方 static void TaskProc1(object o) { number1 = Math.Pow(Convert.ToDouble(o), 8); } //启动第二个任务:计算x的8次方根 static void TaskProc2(object o) { number2 = Math.Pow(Convert.ToDouble(o), 1.0/8.0); } }
//池结构 [HostProtection(SecurityAction.LinkDemand,Synchronization=true,ExternalThreading=true)] public static class ThreadPool { [Obsolete("ThreadPool.BindHandle(IntPtr)hasbeendeprecated.PleaseuseThreadPool.BindHandle(SafeHandle)instead.",false),SecurityPermission(SecurityAction.Demand,Flags=SecurityPermissionFlag.UnmanagedCode)] public static bool BindHandle(IntPtr osHandle) { if(osHandle == null){throw new ArgumentNullException("osHandle");} bool flag = false; bool success = false; RuntimeHelpers.PrepareConstrainedRegions(); try { osHandle.DangerousAddRef(ref success); flag = BindIOCompletionCallbackNative(osHandle.DangerousGetHandle()); } finally { if(success) osHandle.DangerousRelease(); } return flag; }