简单总结下并行流的基础用法。

private static void singelStream() {

        IntStream.range(1, 100)
                 .peek(ParallelStream::outNum)
                 .count();
}
private static void outNum(int num) {
        System.out.println("num = " + num);
}

上边这段代码是我们经常使用的单线程流模式,并没有使用到流的并行特性。

private static void parallelStream() {
        IntStream.range(1, 100)
                 .parallel()
                 .peek(ParallelStream::outNum)
                 .count();
    }

private static void outNum(int num) {
        System.out.println("num = " + num);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

}

上边这段代码就可以看出来并行流的效果,单行流的打印是从一到一百按照顺序打印的,并行流的打印是八个日志为一组打印出来的,并且是一到一百乱序打印出来的。

那么为什么是八个为一组打印,而不是十个?原因就是并行流默认会按照电脑的逻辑处理器数量作为默认创建线程池的线程数~~

把上面代码稍微修改一下,我们看下默认创建的线程池。

ForkJoinPool.commonPool-worker-4-----num = 81
main-----num = 65
ForkJoinPool.commonPool-worker-2-----num = 90
ForkJoinPool.commonPool-worker-7-----num = 28
ForkJoinPool.commonPool-worker-3-----num = 16
ForkJoinPool.commonPool-worker-6-----num = 7
ForkJoinPool.commonPool-worker-5-----num = 43
ForkJoinPool.commonPool-worker-1-----num = 31

我们可以看到并行流默认使用的是jdk自带的ForkJoinPool.

默认的是当前机器的cpu数,我们可以修改这个参数么?当然可以。

private static void parallelStream() {
        //修改默认线程数
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","10");
        
        IntStream.range(1, 100)
                 .parallel()
                 .peek(ParallelStream::outNum)
                 .count();
}

还有一个问题,使用线程池自然就要遵守线程池中线程的调度规则,我们如果全部使用默认线程池的前提下在很多地方使用并行流,就像是全国只有一个铁路售票处一样,岂不是效率很慢?怎么解决?全国各地开设售票点。放到代码里那就是指定我们自己创建的线程池。

/**
 * 使用靠自己的线程池,不使用默认线程池,防止重要业务被阻塞
 */
private static void parallelStreamUseMyThreadPool() {

        ForkJoinPool myPool = new ForkJoinPool(20);

        myPool.submit(() ->
                IntStream.range(1, 100)
                         .parallel()
                         .peek(ParallelStream::outNum)
                         .count()
        );
        myPool.shutdown();

        //防止主线程退出
        synchronized (myPool){
            try {
                myPool.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
}