一、parallelStream是什么
Java8中提供了能够更方便处理集合数据的Stream类,其中parallelStream()方法能够充分利用多核CPU的优势,使用多线程加快对集合数据的处理速度。不熟悉Stream类的同学,可以先参考我的另外一篇文章Java8中Stream的常用方法
parallelStream()方法的源码如下:
/** * @return a possibly parallel {@code Stream} over the elements in this * collection * @since 1.8 */ default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true); }
从注释的@return a possibly parallel可以看得出来,parallelStream()并不是一定返回一个并行流,有可能parallelStream()全是由主线程顺序执行的。
二、parallelStream内部使用了哪些线程
以一个简单的例子,来看看parallelStream内部到底使用了哪些线程
Integer[] array = new Integer[]{1, 2, 3, 4, 5}; Arrays.asList(array).parallelStream().forEach(i -> { System.out.println(Thread.currentThread().getName() + " num:" + i); });
输出结果如下:
可以看得出来,结果是乱序输出的,且参与并行处理的线程有主线程以及ForkJoinPool中的worker线程
三、Fork/Join框架
注:本文不会深入研究Fork/Join框架的源码以及与线程池的异同点,只是仅仅解开parallelStream的面纱,后续会有更深入的文章去讲解Fork/Join框架的原理。
parallelStream的底层是基于ForkJoinPool的,ForkJoinPool实现了ExecutorService接口,因此和线程池的关系微妙。(对线程池不熟悉的同学,可以参考我的另外一篇文章说说线程池)
ForkJoinPool和ExecutorService的继承关系如图所示:
Fork/Join框架主要采用分而治之的理念来处理问题,对于一个比较大的任务,首先将它拆分(fork)为两个小任务task1与task2。
使用新的线程thread1去处理task1,thread2去处理task2。
如果thread1认为task1还是太大,则继续往下拆分成新的子任务task3与task4。
thread2认为task2任务量不大,则立即进行处理,形成结果result2。
之后将task3和task4的处理结果合并(join)成result1,最后将result1与result2合并成最后的结果。
用图来描述可能更加直观:
下面用一个示例代码,计算出1到10000的和(实际上应该取到一个很大的数字,这里为了演示方便就到10000就结束)来演示ForkJoinPool的简单使用。
package com.qcy.testStream; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.stream.IntStream; /** * @author qcy * @create 2020/08/13 21:31:45 */ public class Task extends RecursiveTask<Integer> { //起始数 private Integer startNum; //结束数 private Integer endNum; //最大间隔数 private Integer maxInterval; public Task(Integer startNum, Integer endNum, Integer maxInterval) { this.startNum = startNum; this.endNum = endNum; this.maxInterval = maxInterval; } @Override protected Integer compute() { if (endNum - startNum < maxInterval) { //任务足够小,不需要拆分 return IntStream.rangeClosed(startNum, endNum).sum(); } //需要拆分任务 int middleNum = (startNum + endNum) % 2 == 0 ? (startNum + endNum) / 2 : (startNum + endNum - 1) / 2; Task t1 = new Task(startNum, middleNum, maxInterval); Task t2 = new Task(middleNum + 1, endNum, maxInterval); //使用invokeAll,能让这两个任务被并行处理 invokeAll(t1, t2); //使用t1.fork()、t2.fork()则让这两个任务串行处理 return t1.join() + t2.join(); } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); Task task = new Task(1, 10000, 100); ForkJoinTask<Integer> future = pool.submit(task); System.out.println(future.get()); } }
当我们使用默认的不带参数的方法构造ForkJoinPool时,默认最大的线程并行数量是当前CPU的核数。在一定程度上,这样做能够减少线程上下文切换的次数。
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); }
当然,我们可以使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=x,其中x为ForkJoinPool中的线程数量,当设定为2时,则上述计算1到10000之和的任务,总共只有两个线程来处理任务。
注意此参数是全局的,会影响其他parallelStream中的线程总数。
但是对于第一个例子,一共会有3个线程来进行处理,多出来那个线程是主线程。如图所示:
四、使用parallelStream的一些注意点
(1)parallelStream并行流一定要使用线程安全的对象,比如有这样的一个场景
List<Integer> list = new ArrayList<>(); IntStream.rangeClosed(1, 10000).parallel().forEach(i -> list.add(i));
执行就立即报错了:
ArrayList本身就是一个线程不安全的容器,在多线程的操作下,扩容操作可能会导致产生数组越界的异常。
此时,要么使用线程安全的容器,比如Vector,要么使用collect完成串行收集。
List<Integer> collect = IntStream.rangeClosed(1, 10000) .parallel() .boxed() .collect(Collectors.toList());
(2)线程关联的ThreadLocal将会失效
(不熟悉ThreadLocal的同学,可以参考我的另外一篇文章浅谈ThreadLocal)
这一点从第二小节就可以看出,主线程参与到parallelStream中的任务处理的过程中。如果我们处理的任务方法中包含对ThreadLocal的处理,可能除主线程之外的所有线程都获取不到自己的线程局部变量,加之ForkJoinPool中的线程是反复使用的,线程关联的ThreadLocal会发生共用的情况。
所以我的建议是,parallelStream中就不要使用ThreadLocal了,要么在任务处理方法中,第一行先进行ThreadLocal.set(),之后再由ThreadLocal.get()获取到自己的线程局部变量。
非要用ThreadLocal的话,为了规避使用不当而带来内存泄漏的风险,可以参考我的这篇文章ThreadLocal使用不好,小心造成内存泄露!
(3)使用parallelStream也不一定会提升性能
在CPU资源紧张的时候,使用并行流可能会带来频繁的线程上下文切换,导致并行流执行的效率还没有串行执行的效率高。