线程池系列一:线程池作用及Executors方法讲解
线程池的作用:
线程池作用就是限制系统中执行线程的数量。
根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程 排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程 池中有等待的工作线程,就可以开始运行了;否则进入等待队列。
为什么要用线程池:
- 减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
- 可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)
ThreadGroup与ThreadPoolExecutor的区别
我自己的理解也是一直以为ThreadGroup就是ThreadPoolExecutor(线程池),这是一个非常大的误会,最近把两者仔细分析了下。 线程组表示一个线程的集合。此外,线程组也可以包含其他线程组。线程组构成一棵树,在树中,除了初始线程组外,每个线程组都有一个父线程组。允许线程访问 有关自己的线程组的信息,但是不允许它访问有关其线程组的父线程组或其他任何线程组的信息;线程消耗包括内存和其它系统资源在内的大量资源。除了 Thread 对象所需的内存之外,每个线程都需要两个可能很大的执行调用堆栈。除此以外,JVM 可能会为每个 Java 线程创建一个本机线程,这些本机线程将消耗额外的系统资源。最后,虽然线程之间切换的调度开销很小,但如果有很多线程,环境切换也可能严重地影响程序的性 能。线程池是因为线程的生成关闭很浪费资源 所以不要频繁的操作 线程次 就是管理线程的地方 不用了它可以让它休眠也就是他替你管理线程 而且比你管理的要好的多。线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其 好处是,因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调 整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。
Executor详解:
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。ThreadPoolExecutor是Executors类的底层实现。我们先介绍下Executors。
Sun在Java5中,对Java线程的类库做了大量的扩展,其中线程池就是Java5的新特征之一,除了线程池之外,还有很多多线程相关的内容,为多线程的编程带来了极大便利。为了编写高效稳定可靠的多线程程序,线程部分的新增内容显得尤为重要。
有关Java5线程新特征的内容全部在java.util.concurrent下面,里面包含数目众多的接口和类,熟悉这部分API特征是一项艰难的学习过程。目前有关这方面的资料和书籍都少之又少,大所属介绍线程方面书籍还停留在java5之前的知识层面上。
当然新特征对做多线程程序没有必须的关系,在java5之前通用可以写出很优秀的多线程程序。只是代价不一样而已。
线程池的基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。
在Java5之前,要实现一个线程池是相当有难度的,现在Java5为我们做好了一切,我们只需要按照提供的API来使用,即可享受线程池带来的极大便利。
Java5的线程池分好多种:固定尺寸的线程池、可变尺寸连接池。
在使用线程池之前,必须知道如何去创建一个线程池,在Java5中,需要了解的是java.util.concurrent.Executors类的API,这个类提供大量创建连接池的静态方法,是必须掌握的。
实例:
一、固定大小的线程池
- import java.util.concurrent.Executors;
- import java.util.concurrent.ExecutorService;
- /**
- * Java线程:线程池-
- *
- * @author Administrator 2009-11-4 23:30:44
- */
- public class Test {
- public static void main(String[] args) {
- //创建一个可重用固定线程数的线程池
- ExecutorService pool = Executors.newFixedThreadPool(2);
- //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
- Thread t1 = new MyThread();
- Thread t2 = new MyThread();
- Thread t3 = new MyThread();
- Thread t4 = new MyThread();
- Thread t5 = new MyThread();
- //将线程放入池中进行执行
- pool.execute(t1);
- pool.execute(t2);
- pool.execute(t3);
- pool.execute(t4);
- pool.execute(t5);
- //关闭线程池
- pool.shutdown();
- }
- }
- class MyThread extends Thread{
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName()+"正在执行。。。");
- }
- }
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
Process finished with exit code 0
二、单任务线程池
在上例的基础上改一行创建pool对象的代码为:
//创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
- ExecutorService pool = Executors.newSingleThreadExecutor();
输出结果为:
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
Process finished with exit code 0
对于以上两种连接池,大小都是固定的,当要加入的池的线程(或者任务)超过池最大尺寸时候,则入此线程池需要排队等待。
一旦池中有线程完毕,则排队等待的某个线程会入池执行。
三、可变尺寸的线程池
与上面的类似,只是改动下pool的创建方式:
//创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
- ExecutorService pool = Executors.newCachedThreadPool();
pool-1-thread-5正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-4正在执行。。。
pool-1-thread-3正在执行。。。
pool-1-thread-2正在执行。。。
Process finished with exit code 0
四、延迟连接池
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- /**
- * Java线程:线程池-
- *
- * @author Administrator 2009-11-4 23:30:44
- */
- public class Test {
- public static void main(String[] args) {
- //创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
- ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
- //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
- Thread t1 = new MyThread();
- Thread t2 = new MyThread();
- Thread t3 = new MyThread();
- Thread t4 = new MyThread();
- Thread t5 = new MyThread();
- //将线程放入池中进行执行
- pool.execute(t1);
- pool.execute(t2);
- pool.execute(t3);
- //使用延迟执行风格的方法
- pool.schedule(t4, 10, TimeUnit.MILLISECONDS);
- pool.schedule(t5, 10, TimeUnit.MILLISECONDS);
- //关闭线程池
- pool.shutdown();
- }
- }
- class MyThread extends Thread {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName() + "正在执行。。。");
- }
- }
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
Process finished with exit code 0
五、单任务延迟连接池
在四代码基础上,做改动
//创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
- ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-1正在执行。。。
Process finished with exit code 0
六、自定义线程池
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- /**
- * Java线程:线程池-自定义线程池
- *
- * @author Administrator 2009-11-4 23:30:44
- */
- public class Test {
- public static void main(String[] args) {
- //创建等待队列
- BlockingQueue bqueue = new ArrayBlockingQueue(20);
- //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
- ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,2,TimeUnit.MILLISECONDS,bqueue);
- //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
- Thread t1 = new MyThread();
- Thread t2 = new MyThread();
- Thread t3 = new MyThread();
- Thread t4 = new MyThread();
- Thread t5 = new MyThread();
- Thread t6 = new MyThread();
- Thread t7 = new MyThread();
- //将线程放入池中进行执行
- pool.execute(t1);
- pool.execute(t2);
- pool.execute(t3);
- pool.execute(t4);
- pool.execute(t5);
- pool.execute(t6);
- pool.execute(t7);
- //关闭线程池
- pool.shutdown();
- }
- }
- class MyThread extends Thread {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName() + "正在执行。。。");
- try {
- Thread.sleep(100L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
pool-1-thread-1正在执行。。。
pool-1-thread-2正在执行。。。
Process finished with exit code 0
创建自定义线程池的构造方法很多,本例中参数的含义如下:
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue)
用给定的初始参数和默认的线程工厂及处理程序创建新的 ThreadPoolExecutor。使用 Executors 工厂方法之一比使用此通用构造方法方便得多。
参数:
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
抛出:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于零,或者 maximumPoolSize 小于或等于零,或者 corePoolSize 大于 maximumPoolSize。
NullPointerException - 如果 workQueue 为 null
自定义连接池稍微麻烦些,不过通过创建的ThreadPoolExecutor线程池对象,可以获取到当前线程池的尺寸、正在执行任务的线程数、工作队列等等。
线程池系列二:ThreadPoolExecutor讲解
一、简介
1)线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
参数讲解:
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
ThreadPoolExecutor.AbortPolicy()
直接抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy()
重试添加当前的任务,他会自动重复调用execute()方法,交由调用者线程来执行此Runnable任务
ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务
ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务
2)一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时 :
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
二、多线程例子
- package demo;
- import java.io.Serializable;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class TestThreadPool2
- {
- private static int produceTaskSleepTime = 2;
- private static int produceTaskMaxNumber = 10;
- public static void main(String[] args)
- {
- // 构造一个线程池
- ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
- new ThreadPoolExecutor.DiscardOldestPolicy());
- for (int i = 1; i <= produceTaskMaxNumber; i++)
- {
- try
- {
- // 产生一个任务,并将其加入到线程池
- String task = "task@ " + i;
- System.out.println("put " + task);
- threadPool.execute(new ThreadPoolTask(task));
- // 便于观察,等待一段时间
- Thread.sleep(produceTaskSleepTime);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
- }
- /**
- * 线程池执行的任务
- */
- class ThreadPoolTask implements Runnable, Serializable
- {
- private static final long serialVersionUID = 0;
- private static int consumeTaskSleepTime = 2000;
- // 保存任务所需要的数据
- private Object threadPoolTaskData;
- ThreadPoolTask(Object tasks)
- {
- this.threadPoolTaskData = tasks;
- }
- public void run()
- {
- // 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句
- System.out.println(Thread.currentThread().getName());
- System.out.println("start .." + threadPoolTaskData);
- try
- {
- // //便于观察,等待一段时间
- Thread.sleep(consumeTaskSleepTime);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- threadPoolTaskData = null;
- }
- public Object getTask()
- {
- return this.threadPoolTaskData;
- }
- }
说明:
1、在这段程序中,一个任务就是一个Runnable类型的对象,也就是一个ThreadPoolTask类型的对象。
2、一般来说任务除了处理方式外,还需要处理的数据,处理的数据通过构造方法传给任务。
3、在这段程序中,main()方法相当于一个残忍的领导,他派发出许多任务,丢给一个叫 threadPool的任劳任怨的小组来做。
这个小组里面队员至少有两个,如果他们两个忙不过来,任务就被放到任务列表里面。
如果积压的任务过多,多到任务列表都装不下(超过3个)的时候,就雇佣新的队员来帮忙。但是基于成本的考虑,不能雇佣太多的队员,至多只能雇佣 4个。
如果四个队员都在忙时,再有新的任务,这个小组就处理不了了,任务就会被通过一种策略来处理,我们的处理方式是不停的派发,直到接受这个任务为止(更残忍!呵呵)。
因为队员工作是需要成本的,如果工作很闲,闲到 3SECONDS都没有新的任务了,那么有的队员就会被解雇了,但是,为了小组的正常运转,即使工作再闲,小组的队员也不能少于两个。
4、通过调整 produceTaskSleepTime和 consumeTaskSleepTime的大小来实现对派发任务和处理任务的速度的控制,改变这两个值就可以观察不同速率下程序的工作情况。
5、通过调整4中所指的数据,再加上调整任务丢弃策略,换上其他三种策略,就可以看出不同策略下的不同处理方式。
三、一个用队列处理线程池例子
- package demo;
- import java.util.Queue;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class ThreadPoolExecutorTest
- {
- private static int queueDeep = 4;
- public void createThreadPool()
- {
- /*
- * 创建线程池,最小线程数为2,最大线程数为4,线程池维护线程的空闲时间为3秒,
- * 使用队列深度为4的有界队列,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,
- * 然后重试执行程序(如果再次失败,则重复此过程),里面已经根据队列深度对任务加载进行了控制。
- */
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueDeep),
- new ThreadPoolExecutor.DiscardOldestPolicy()); //这里采取的是抛弃旧的任务
- // 向线程池中添加 10 个任务
- for (int i = 0; i < 10; i++)
- {
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- while (getQueueSize(tpe.getQueue()) >= queueDeep)
- {
- System.out.println("队列已满,等3秒再添加任务");
- try
- {
- Thread.sleep(3000);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- TaskThreadPool ttp = new TaskThreadPool(i);
- System.out.println("put i:" + i);
- tpe.execute(ttp);
- }
- tpe.shutdown();
- }
- private synchronized int getQueueSize(Queue queue)
- {
- return queue.size();
- }
- public static void main(String[] args)
- {
- ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
- test.createThreadPool();
- }
- class TaskThreadPool implements Runnable
- {
- private int index;
- public TaskThreadPool(int index)
- {
- this.index = index;
- }
- public void run()
- {
- System.out.println(Thread.currentThread() + " index:" + index);
- try
- {
- Thread.sleep(3000);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- }
- }
说明:
这里执行的结果为:
put i:0
Thread[pool-1-thread-1,5,main] index:0
put i:1
Thread[pool-1-thread-2,5,main] index:1
put i:2
put i:3
put i:4
put i:5
队列已满,等3秒再添加任务
Thread[pool-1-thread-1,5,main] index:2
Thread[pool-1-thread-2,5,main] index:3
put i:6
put i:7
队列已满,等3秒再添加任务
Thread[pool-1-thread-1,5,main] index:4
Thread[pool-1-thread-2,5,main] index:5
put i:8
put i:9
Thread[pool-1-thread-1,5,main] index:6
Thread[pool-1-thread-2,5,main] index:7
Thread[pool-1-thread-1,5,main] index:8
Thread[pool-1-thread-2,5,main] index:9
ps:这里是当队列已满时线程就一直等待了,不会再新创建线程,所以一直就只有1和2两个线程来执行。
如果把
- while (getQueueSize(tpe.getQueue()) >= queueDeep){}
这一段去掉,那么执行结果为:
put i:0
Thread[pool-1-thread-1,5,main] index:0
put i:1
Thread[pool-1-thread-2,5,main] index:1
put i:2
put i:3
put i:4
put i:5
put i:6
Thread[pool-1-thread-3,5,main] index:6
put i:7
Thread[pool-1-thread-4,5,main] index:7
put i:8
put i:9
Thread[pool-1-thread-1,5,main] index:4
Thread[pool-1-thread-2,5,main] index:5
Thread[pool-1-thread-3,5,main] index:8
Thread[pool-1-thread-4,5,main] index:9
ps:这个执行顺序是0,1两个任务先进来,分别由线程1,2来执行,然后2,-5进来,队列满,6任务进来,因为队列已满,且1,2线程还未执行完,没有可用的线程,所以创建新的线程来运行6。7任务同理。然后8任务进来,队列已满,且1,2,3,4线程未执行完,线程数又等于了最多4个线程的限制,这时看线程池的执行策略为DiscardOldestPolicy,就是抛弃旧的任务,故开始进队列的2任务被抛弃,3任务同理,8,9任务进入队列,然后这时1-4线程已经执行完自己的任务,开始执行队列中的4,5,8,9
如果更改执行策略,那么相应的结果也会不一样,如果不希望有任务被抛弃,那么可以采用CallerRunsPolicy()策略。
线程池系列三:结合线程池实现Socket
Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了 Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。
一、简介
本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:
1. 建立监听端口。
2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。
这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销 毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器 模型将如下:
1. 建立监听端口,创建线程池。
2. 发现有新连接,使用线程池来执行服务任务。
3. 服务完毕,释放线程到线程池。
下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。
初始化
初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态 方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个 java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方 法来建立线程池。
ExecutorService pool = Executors.newFixedThreadPool(10);
表示新建了一个线程池,线程池里面有10个线程为任务队列服务。
使用ServerSocket对象来初始化监听端口。
private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);
服务新连接
当有新连接建立时,accept返回时,将服务任务提交给线程池执行。
while(true){
Socket socket = serverListenSocket.accept();
pool.execute(new ServiceThread(socket));
}
这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。
服务任务
服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此 ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个 线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:
private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
int ret = 0;
try{
lock.lock();
ret = count;
}finally{
lock.unlock();
}
return ret;
}
private void increaseCount(){
try{
lock.lock();
++count;
}finally{
lock.unlock();
}
}
二、服务器端的完整实现
服务器端的完整实现代码如下:
- package demo;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.io.Serializable;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.RejectedExecutionHandler;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.ReentrantLock;
- public class Server
- {
- private static int produceTaskSleepTime = 100;
- private static int consumeTaskSleepTime = 1200;
- private static int produceTaskMaxNumber = 100;
- private static final int CORE_POOL_SIZE = 2;
- private static final int MAX_POOL_SIZE = 100;
- private static final int KEEPALIVE_TIME = 3;
- private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
- private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
- private static final String HOST = "127.0.0.1";
- private static final int PORT = 19527;
- private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
- // private ThreadPoolExecutor serverThreadPool = null;
- private ExecutorService pool = null;
- private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
- private ServerSocket serverListenSocket = null;
- private int times = 5;
- public void start()
- {
- // You can also init thread pool in this way.
- /*
- * serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue, rejectedExecutionHandler);
- */
- pool = Executors.newFixedThreadPool(10);
- try
- {
- serverListenSocket = new ServerSocket(PORT);
- serverListenSocket.setReuseAddress(true);
- System.out.println("I'm listening");
- while (times-- > 0)
- {
- Socket socket = serverListenSocket.accept();
- String welcomeString = "hello";
- // serverThreadPool.execute(new ServiceThread(socket, welcomeString));
- pool.execute(new ServiceThread(socket));
- }
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- cleanup();
- }
- public void cleanup()
- {
- if (null != serverListenSocket)
- {
- try
- {
- serverListenSocket.close();
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- // serverThreadPool.shutdown();
- pool.shutdown();
- //调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法,那么线程池会一直保持下去,以便随时添加新的任务。interrupt():只有阻塞(sleep,wait,join的线程调用他们的interrupt()才起作用,正在运行的线程不起作用也不抛异常)
- }
- public static void main(String args[])
- {
- Server server = new Server();
- server.start();
- }
- }
- class ServiceThread implements Runnable, Serializable
- {
- private static final long serialVersionUID = 0;
- private Socket connectedSocket = null;
- private String helloString = null;
- private static int count = 0;
- private static ReentrantLock lock = new ReentrantLock();
- ServiceThread(Socket socket)
- {
- connectedSocket = socket;
- }
- public void run()
- {
- increaseCount();
- int curCount = getCount();
- helloString = "hello, id = " + curCount + "\r\n";
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<String> future = executor.submit(new TimeConsumingTask());
- DataOutputStream dos = null;
- try
- {
- dos = new DataOutputStream(connectedSocket.getOutputStream());
- dos.write(helloString.getBytes());
- try
- {
- dos.write("let's do soemthing other.\r\n".getBytes());
- String result = future.get();
- dos.write(result.getBytes());
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- catch (ExecutionException e)
- {
- e.printStackTrace();
- }
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- finally
- {
- if (null != connectedSocket)
- {
- try
- {
- connectedSocket.close();
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- if (null != dos)
- {
- try
- {
- dos.close();
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- executor.shutdown();
- }
- }
- private int getCount()
- {
- int ret = 0;
- try
- {
- lock.lock();
- ret = count;
- }
- finally
- {
- lock.unlock();
- }
- return ret;
- }
- private void increaseCount()
- {
- try
- {
- lock.lock();
- ++count;
- }
- finally
- {
- lock.unlock();
- }
- }
- }
- class TimeConsumingTask implements Callable<String>
- {
- public String call() throws Exception
- {
- System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
- return "ok, here's the result: It takes me lots of time to produce this result";
- }
- }
ps:
这里重点介绍下Future 和 Callable。Callable接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是Runnable不会返回结果,并且无法抛出经过检查的异常。Callable可以和Future配合使用,用Future的get方法可以取得Callable中返回的值。
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other.\r\n".getBytes());
String result = future.get();
dos.write(result.getBytes());
使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则无需等待即可获得结果,如果还在执行,get会阻塞直到它完成。
如上所示,在new TimeConsumingTask()后线程不会阻塞,而是在submit任务后继续执行dos.write().... 等操作,
然后再想取得结果的时候用future.get()方法取得。
其中TimeConsumingTask实现了Callable接口
- class TimeConsumingTask implements Callable {
- public String call() throws Exception {
- System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
- return "ok, here's the result: It takes me lots of time to produce this result";
- }
- }
这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数,其作用类似于Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。
Fature实现了3个功能:1.获取任务的结果2.取消任务3.获得任务进行状态(完成还是被取消)。
方法介绍:
boolean cancel(boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
boolean isCancelled() 任务是否已经取消,任务正常完成前将其取消,则返回 true
boolean isDone() 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
V get() throws InterruptedException, ExecutionException 等待任务执行结束,然后获得V类型的结果。
InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,
还会抛出CancellationException V
get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同
上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException