Future的缺点

虽然Future可以异步的执行任务,但是还是有很多缺点:

  • 没有办法回调,需要手动的调用
  • 执行一组任务需要等待所有的任务执行完

CompletionService简介

CompletionService的实现目标是任务先完成可优先获取到,即结果按照完成先后顺序排序。

ExecutorCompletionService类,该类只有三个成员变量:

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    ....
}
  • 可以看到ExecutorCompletionService主要是增强executor线程池的。
  • Task包装后被塞入completionQueue,当Task结束,其Future就可以从completionQueue中获取到。

执行流程为

基本使用

public class CompletionServiceTest {

    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //开启3个线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            int taskCount = 10;
            // 结果集
            List<Integer> list = new ArrayList<Integer>();

            // 1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs);

            // 2.添加任务
            for(int i=0;i<taskCount;i++){
                completionService.submit(new Task(i+1));
            }

            // 3.获取结果
            for(int i=0;i<taskCount;i++){
                Integer result = completionService.take().get();
                list.add(result);
            }

            System.out.println("list="+list);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            exs.shutdown();
        }

    }

    static class Task implements Callable<Integer> {
        Integer i;

        public Task(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==5) {
                Thread.sleep(5000);
            }else{
                Thread.sleep(1000);
            }
            System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!");
            return i;
        }

    }
}

结果

线程:pool-1-thread-1任务i=1,执行完成!
线程:pool-1-thread-2任务i=2,执行完成!
线程:pool-1-thread-3任务i=3,执行完成!
线程:pool-1-thread-4任务i=4,执行完成!
线程:pool-1-thread-1任务i=7,执行完成!
线程:pool-1-thread-3任务i=8,执行完成!
线程:pool-1-thread-4任务i=9,执行完成!
线程:pool-1-thread-2任务i=6,执行完成!
线程:pool-1-thread-1任务i=10,执行完成!
线程:pool-1-thread-5任务i=5,执行完成!
list=[1, 2, 3, 4, 7, 8, 9, 6, 10, 5]
  • 可以发现结果顺序就是任务完成的顺序

阻塞和非阻塞获取

public Future<V> take()throws InterruptedException

public Future<V> poll()
public Future<V> poll(long timeout,TimeUnit unit) throws InterruptedException

阻塞获取

take方法回使调用者阻塞,可以保证一定会有Future取出

非阻塞获取

poll方***去查看是否有任务完成,有则取出;没有,就会返回一个null