一、parallelStream是什么

Java8中提供了能够更方便处理集合数据的Stream类,其中parallelStream()方法能够充分利用多核CPU的优势,使用多线程加快对集合数据的处理速度。不熟悉Stream类的同学,可以先参考我的另外一篇文章Java8中Stream的常用方法

parallelStream()方法的源码如下:

/**
     * @return a possibly parallel {@code Stream} over the elements in this
     * collection
     * @since 1.8
     */ default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true);
    }

从注释的@return a possibly parallel可以看得出来,parallelStream()并不是一定返回一个并行流,有可能parallelStream()全是由主线程顺序执行的。


二、parallelStream内部使用了哪些线程

以一个简单的例子,来看看parallelStream内部到底使用了哪些线程

Integer[] array = new Integer[]{1, 2, 3, 4, 5};
        Arrays.asList(array).parallelStream().forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " num:" + i);
        });

输出结果如下:

可以看得出来,结果是乱序输出的,且参与并行处理的线程有主线程以及ForkJoinPool中的worker线程


三、Fork/Join框架

注:本文不会深入研究Fork/Join框架的源码以及与线程池的异同点,只是仅仅解开parallelStream的面纱,后续会有更深入的文章去讲解Fork/Join框架的原理。

parallelStream的底层是基于ForkJoinPool的,ForkJoinPool实现了ExecutorService接口,因此和线程池的关系微妙。(对线程池不熟悉的同学,可以参考我的另外一篇文章说说线程池

ForkJoinPool和ExecutorService的继承关系如图所示:

Fork/Join框架主要采用分而治之的理念来处理问题,对于一个比较大的任务,首先将它拆分(fork)为两个小任务task1与task2。

使用新的线程thread1去处理task1,thread2去处理task2。

如果thread1认为task1还是太大,则继续往下拆分成新的子任务task3与task4。

thread2认为task2任务量不大,则立即进行处理,形成结果result2。

之后将task3和task4的处理结果合并(join)成result1,最后将result1与result2合并成最后的结果。

用图来描述可能更加直观:

下面用一个示例代码,计算出1到10000的和(实际上应该取到一个很大的数字,这里为了演示方便就到10000就结束)来演示ForkJoinPool的简单使用。

package com.qcy.testStream; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.stream.IntStream; /**
 * @author qcy
 * @create 2020/08/13 21:31:45
 */ public class Task extends RecursiveTask<Integer> { //起始数 private Integer startNum; //结束数 private Integer endNum; //最大间隔数 private Integer maxInterval; public Task(Integer startNum, Integer endNum, Integer maxInterval) { this.startNum = startNum; this.endNum = endNum; this.maxInterval = maxInterval;
    } @Override protected Integer compute() { if (endNum - startNum < maxInterval) { //任务足够小,不需要拆分 return IntStream.rangeClosed(startNum, endNum).sum();
        } //需要拆分任务 int middleNum = (startNum + endNum) % 2 == 0 ? (startNum + endNum) / 2 : (startNum + endNum - 1) / 2;

        Task t1 = new Task(startNum, middleNum, maxInterval);
        Task t2 = new Task(middleNum + 1, endNum, maxInterval); //使用invokeAll,能让这两个任务被并行处理 invokeAll(t1, t2); //使用t1.fork()、t2.fork()则让这两个任务串行处理 return t1.join() + t2.join();
    } public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        Task task = new Task(1, 10000, 100);
        ForkJoinTask<Integer> future = pool.submit(task);
        System.out.println(future.get());
    }
}

当我们使用默认的不带参数的方法构造ForkJoinPool时,默认最大的线程并行数量是当前CPU的核数。在一定程度上,这样做能够减少线程上下文切换的次数。

public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }

当然,我们可以使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=x,其中x为ForkJoinPool中的线程数量,当设定为2时,则上述计算1到10000之和的任务,总共只有两个线程来处理任务。

注意此参数是全局的,会影响其他parallelStream中的线程总数。

但是对于第一个例子,一共会有3个线程来进行处理,多出来那个线程是主线程。如图所示:


四、使用parallelStream的一些注意点

(1)parallelStream并行流一定要使用线程安全的对象,比如有这样的一个场景

List<Integer> list = new ArrayList<>();
        IntStream.rangeClosed(1, 10000).parallel().forEach(i -> list.add(i));

执行就立即报错了:

ArrayList本身就是一个线程不安全的容器,在多线程的操作下,扩容操作可能会导致产生数组越界的异常。

此时,要么使用线程安全的容器,比如Vector,要么使用collect完成串行收集。

List<Integer> collect = IntStream.rangeClosed(1, 10000)
                .parallel()
                .boxed()
                .collect(Collectors.toList());

(2)线程关联的ThreadLocal将会失效

(不熟悉ThreadLocal的同学,可以参考我的另外一篇文章浅谈ThreadLocal

这一点从第二小节就可以看出,主线程参与到parallelStream中的任务处理的过程中。如果我们处理的任务方法中包含对ThreadLocal的处理,可能除主线程之外的所有线程都获取不到自己的线程局部变量,加之ForkJoinPool中的线程是反复使用的,线程关联的ThreadLocal会发生共用的情况。

所以我的建议是,parallelStream中就不要使用ThreadLocal了,要么在任务处理方法中,第一行先进行ThreadLocal.set(),之后再由ThreadLocal.get()获取到自己的线程局部变量。

非要用ThreadLocal的话,为了规避使用不当而带来内存泄漏的风险,可以参考我的这篇文章ThreadLocal使用不好,小心造成内存泄露!

(3)使用parallelStream也不一定会提升性能

在CPU资源紧张的时候,使用并行流可能会带来频繁的线程上下文切换,导致并行流执行的效率还没有串行执行的效率高。