Fork/Join是什么?

Fork意思是分叉,Join为合并。Fork/Join是一个将任务分割并行运行,然后将最终结果合并成为大任务的结果的框架,父任务可以分割成若干个子任务,子任务可以继续分割,提供我们一种方便的并行任务功能,满足实际场景的业务需求,思想类似于MapReduce。任务的分割必须保证子任务独立,不会相互依赖结果。

 

从哪里开始?

Fork/Join框架主要有如下接口和类:

  • ForkJoinPool:一个线程池,用于执行调度分割的任务,实现了ExecutorService接口。提供三种执行任务的方式: 
1、execute:最原生的执行方式,以异步执行,并且无返回结果。
2、submit:异步执行,有返回结果,返回结果是封装后的Future对象。
3、invoke和invokeAll:异步执行,有返回结果,会等待所有任务执行执行完成,返回的结果为无封装的泛型T。
  • ForkJoinTask:抽象的分割任务,提供以分叉的方式执行,以及合并执行结果。
  • RecursiveAction:异步任务,无返回结果。通常自定义的任务要继承,并重写compute方法,任务执行的就是compute方法。
  • RecursiveTask:异步任务,有返回结果。通常自定义的任务要继承,并重写compute方法,任务执行的就是compute方法。

核心类图

从核心类图看出,要想开始一个分割的并行任务,可以创建一个ForkJoinPool线程池,同时创建无返回结果的任务RecursiveAction或有返回结果的任务RecursiveTask,最后调用线程池ForkJoinPool的execute或submit或invoke方法执行任务,完成后合并结果。

实例

我们以一个有返回结果的并行任务实例进行测试。计算从起始值到结束值得连续数的累加结果,利用Fork/Join框架。并对比普通计算和并行计算的耗时差异。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class TestForkJoinPool {
    public static void main(String[] args) {
        long end = 10000000L;
        long t1 = System.currentTimeMillis();
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, end);
        long sum = pool.invoke(task);
        System.out.println(sum);
        long t2 = System.currentTimeMillis();
        long res = 0;
        for (long i = 0; i <= end; i++) {
            res += i;
        }
        long t3 = System.currentTimeMillis();
        System.out.println(res);
        System.out.println(t2 - t1);
        System.out.println(t3 - t2);
    }
    
    static class ForkJoinSumCalculate extends RecursiveTask<Long> {

        private long start;
        private long end;
        private static final long THURSHOLD = 10000L;  // 拆分临界值
        // 有参构造器
        public ForkJoinSumCalculate(long start, long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            long length = end - start;
            if (length <= THURSHOLD) {
                long sum = 0;
                for (long i = start; i <= end; i++) {
                    sum += i;

                }
                return sum;
            } else {

                long middle = (start + end) / 2;
                ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
                left.fork();
                ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
                right.fork();
                return left.join() + right.join();


            }

        }
    }

}

 输出结果如下

当将end 赋值 1000000000L的时候,计算结果如下

 由结果可知,当计算一个很耗时的功能的时候,用ForkJoin确实可以提升计算速度。

work-stealing规则

在Java的API说明中提到,ForkJoinPool线程池与ThreadPoolExecutor线程池不同的地方在于,ForkJoinPool善于利用窃取工作执行加快任务的总体执行速度。实际上,在ForkJoinPool线程池中,若一个工作线程的任务队列为空没有任务执行时,便从其他工作线程中获取任务主动执行。为了实现工作窃取,在工作线程中维护了双端队列,窃取任务线程从队尾获取任务,被窃取任务线程从队头获取任务。这种机制充分利用线程进行并行计算,减少了线程竞争。但是当队列中只存在一个任务了时,两个线程去取反而会造成资源浪费。



原文地址