线程池:
简单的理解就是拥有很多线程的容器,为什么需要线程池?因为线程是一个轻量级的工具,其创建和关闭依然需要花费时间,当需要创建线程的时候,就可以从线程池获得空闲线程,关闭线程变成了向线程池归还线程。
在Java.util.concurrent包中,ThreadPoolExecutor表示一个线程池,Excutors类则扮演线程池工厂的角色,通过Executors可以取得一个拥有特定功能的线程池。ThreadPoolExecutor类实现了Executor接口,可通过这个接口,任何Runnable的对象都可以被ThreadPoolExecutor线程池调度。
线程接口和实现类之间的关系如下图:(用的是ppt画的UML图,因为我的UML软件过期了。。。)
下面主要介绍几个Executors类的主要方法:
⑴newFixedThreadPool(int numberThreads):创建一个指定线程数量的线程池对象,线程池的数量固定不变,当请求线程时,如果有空闲的线程,则分配线程,如果没有,则进入任务队列中,直到有线程归还,有空闲线程:
package com.dong.TestThreadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* FixedThreadPool创建特定数量的线程池,如果有空闲线程,则分配,没有,线程则进行等待;
*
* 结果分析:
* pool-1-thread-1: is running
pool-1-thread-5: is running
pool-1-thread-2: is running
pool-1-thread-4: is running
pool-1-thread-3: is running
pool-1-thread-4: has running end ,spent time : 0
pool-1-thread-2: has running end ,spent time : 0
pool-1-thread-5: has running end ,spent time : 0
pool-1-thread-5: is running
pool-1-thread-5: has running end ,spent time : 0
pool-1-thread-5: is running
pool-1-thread-5: has running end ,spent time : 0
pool-1-thread-5: is running
pool-1-thread-5: has running end ,spent time : 0
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-4: is running
pool-1-thread-4: has running end ,spent time : 15
pool-1-thread-2: is running
pool-1-thread-2: has running end ,spent time : 15
pool-1-thread-3: has running end ,spent time : 0
* -- 线程数
* 线程号 1 2 3 4
* 1 OF
* 2 OF OF
* 3 OF
* 4 OF OF
* 5 OF OF OF OF
* @author liuD
*
*/
public class testFixedThreadPool {
public static void main(String[] args) {
ThreadObj to = new ThreadObj();
//创建有5个线程的线程池;
ExecutorService threadpool = Executors.newFixedThreadPool(5);
for(int i = 0 ; i < 10 ; i++) {
//创建10个线程;
threadpool.submit(to);
}
}
}
class ThreadObj implements Runnable{
public void run() {
long start = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ": " +" is running ");
long end = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName()+": has running end ,spent time : " + (end - start) );
}
}
⑵newSingleThreadExecutor():返回一个只有一个线程的线程池,如果有多个线程访问,则同一时刻只能有一个线程获得资源,其他线程等待,进入等待队列
代码和上面的代码相同,只需改动创建
//创建有1个线程的线程池;
ExecutorService threadpool = Executors.newSingleThreadExecutor();
结果分析:
线程都是同步进行,有序的等待上一个线程将资源使用完毕
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
⑶newCachedThreadPool():返回一个根据实际需求大小的线程池,线程池的数量不确定,若有空闲,则优先使用空闲线程,如果没有,则会创建新的线程。
代码和上面的代码相同,只需改动创建
ExecutorService threadpool = Executors.newCachedThreadPool();
运行结果:
pool-1-thread-2: is running
pool-1-thread-2: has running end ,spent time : 0
pool-1-thread-1: is running
pool-1-thread-1: has running end ,spent time : 0
pool-1-thread-3: is running
pool-1-thread-3: has running end ,spent time : 0
pool-1-thread-6: is running
pool-1-thread-6: has running end ,spent time : 0
pool-1-thread-8: is running
pool-1-thread-8: has running end ,spent time : 0
pool-1-thread-9: is running
pool-1-thread-9: has running end ,spent time : 0
pool-1-thread-10: is running
pool-1-thread-10: has running end ,spent time : 0
pool-1-thread-4: is running
pool-1-thread-4: has running end ,spent time : 0
pool-1-thread-5: is running
pool-1-thread-5: has running end ,spent time : 0
pool-1-thread-7: is running
pool-1-thread-7: has running end ,spent time : 0
⑷newScheduledThreadPool():返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度,ScheduledExecutorService并不一定会立即安排执行任务,而是在指定的时间,对任务进行调度,
ScheduledExecutorService的主要方法:
public ScheduledFuture<?> scheduleAtFixedRate(Runnablecommand,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
scheduleAtFixedRate()方法:创建一个周期性任务,任务开始于给定的初始延时,后续的任务按照给定的周期进行,第一个任务:initialDelay+period时执行,第二个是initialDelay+2*period进行;
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
package com.dong.TestThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* scheduleWithFixedDelay 周期性的执行任务调用;
* 结果分析:
* 1540699969340 i 'am running
1540699971355 i 'am running
1540699973357 i 'am running
1540699975359 i 'am running
1540699977361 i 'am running
1540699979362 i 'am running
1540699981365 i 'am running
1540699983351 i 'am running
......
*
*
* @author liuD
*
*/
public class testScheduleExecutorService {
public static void main(String[] args) {
ScheduledExecutorService ser = Executors.newScheduledThreadPool(10);
ser.scheduleAtFixedRate(
new Runnable() {
public void run() {
System.out.println(System.currentTimeMillis() + " i \'am running ");
}
}
, 0, 2, TimeUnit.SECONDS);
}
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
创建并执行一个周期性的任务,后续任务会按照给定的延时进行,即上一个任务的结束时间到下一个任务的开始实践的时间差。
package com.dong.TestThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* scheduleWithFixedDelay 周期性的执行任务调用;
* 3秒一循环;
* 结果分析:
*1540701081155 i 'am running
1540701084163 i 'am running
1540701087184 i 'am running
1540701090190 i 'am running
1540701093209 i 'am running
1540701096214 i 'am running
1540701099234 i 'am running
1540701102239 i 'am running
1540701105258 i 'am running
......
* @author liuD
*
*/
public class testScheduleExecutorService {
public static void main(String[] args) {
ScheduledExecutorService ser = Executors.newScheduledThreadPool(10);
ser.scheduleWithFixedDelay(
new Runnable() {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + " i \'am running ");
}
}
, 0, 2, TimeUnit.SECONDS);
}
}
如果周期过段,那么任务就会在上一个任务结束后,立即被调用,
Fork/Join框架;
将大的任务分解成小的任务然后分阶段处理小的任务,然后将结果进行合并,采用分治的策略处理问题,Jdk提供了一个ForkJoinPool线程池,对于多个线程进行分治处理。注意,线程池的优化,提交的任务与线程数的对比不是一对一的关系,一个线程可能对应多个任务,每个线程拥有一个任务队列,在实际中,线程之间可能互相帮助,当一个线程空闲时,它会从别的线程的任务队列的底部拿出任务帮助执行,而线程执行自己的任务的时候,是从顶部开始拿;
ForkJoinPool的submit方法:
private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task)
接受ForkJoinTask任务,即支持fork()和join()的任务, fork()即用来创建子任务,join()即等待其他任务执行完毕在进行执行,
ForkJoinTask类有两个子类, 一个是没有返回值的RecursiveAction和有返回值的RecursiveTask。
package com.dong.TestThreadPool;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 使用ForkJoinPool线程池来计算很多数的和
* @author 代码来自《Java高并发程序设计》
*
*/
public class CountTask extends RecursiveTask<Long> {
//线程的最大处理能力
private static final int THRESHOLD = 10000;
//任务开始和结束序号
private long start;
private long end;
public CountTask(long start,long end) {
this.start = start;
this.end = end;
}
public Long compute() {
long sum = 0;
//判断总的任务量是否能够被线程一次性处理,如果可以,就不用创建子任务,否则,创建子线程
boolean canCompute = (end - start)< THRESHOLD;
if(canCompute) {
for(long i = start;i <= end; i++) {
sum += i;
}
}else {
//将任务分解成100个小任务
long step = (start+end )/100;
ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
long pos = start;
for(int i = 0;i<100;i++) {
long lastOne = pos + step;
if(lastOne > end )
lastOne = end;
CountTask subTask = new CountTask(pos,lastOne);
pos += step +1;
subTasks.add(subTask);
//提交子任务
subTask.fork();
}
for(CountTask t : subTasks) {
//等待所有任务,将结果合并
sum += t.join();
}
}
return sum;
}
public static void main(String [] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0,200000L);
//将任务提交给线程池,线程池返回一个携带结果的任务
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
//通过get方法可以得到最终结果;
long res = result.get();
System.out.println("sum = " + res);
}catch(InterruptedException e) {
e.printStackTrace();
}catch(ExecutionException e) {
e.printStackTrace();
}
}
}
本来是写一个从一个包含几十万个数组中找到最大数的一个ForkJoinPool的例子,但是,结果一直有问题,所以就直接引用书中的例子,
注意:这个任务数如果过多,或造成栈异常,内存不足等问题,可通过设置JVM的堆栈大小来解决问题。
最后:内容来自《Java高并发程序设计》 作者葛一鸣 郭超 由衷感谢作者为我们提供书籍内容;