11 ForkJoin

特点:工作窃取

图片说明
FoekJoin代码实例说明:

package com.snowdong;

import java.util.concurrent.RecursiveTask;

/**
 * 求和计算的任务!
 * 3000 6000(ForkJoin) 9000(Stream并行流)
 * // 如何使用 forkjoin
 * // 1、forkjoinPool 通过它来执行
 * // 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
 * // 3. 计算类要继承 ForkJoinTask
 */
public class ForkJoinDemo extends RecursiveTask<Long> {
    private Long start;
    private Long end;
    //临界值
    private Long temp=10000L;
    public ForkJoinDemo(Long start,Long end){
        this.start=start;
        this.end=end;
    }
    //  计算方法
    @Override
    protected Long compute() {
        if((end-start)<temp){
            Long sum=0L;
            for (Long i = start; i < =end; i++) {
                sum +=i;
            }
            return sum;
        }else{//forkJoin 递归
            Long middle=start+(end-start)/2;
            ForkJoinDemo task1=new ForkJoinDemo(start,middle);
            task1.fork(); // 拆分任务,把任务压入线程队列
            ForkJoinDemo task2=new ForkJoinDemo(middle+1,end);
            task2.fork() ;// 拆分任务,把任务压入线程队列
            return task1.join()+task2.join();
        }

    }
}

测试代码说明:

package com.snowdong;
//ForkJion在JDK1.7,并行执行任务,提高效率。大数据量
//大数据:map reduce 把大人物拆分成小任务

import com.sun.xml.internal.ws.api.model.wsdl.WSDLOutput;
import org.w3c.dom.ls.LSOutput;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;


public class TestForkJoin {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //test1();//sum=500000000500000000,时间:7661
        test2();//sum=500000000500000000,时间:7486
        //test3();//sum=500000000500000000,时间:305
    }
    //一般使用方法
    public static void test1(){
        Long sum=0L;
        long startTime= System.currentTimeMillis();
        for (Long i = 1L; i <= 10_0000_0000 ; i++) {
            sum+=i;
        }
        long endTime=System.currentTimeMillis();
        System.out.println("sum="+sum+",时间:"+(endTime-startTime));
    }
    //会使用ForkJoin
    public static void test2() throws ExecutionException, InterruptedException {
        long startTime= System.currentTimeMillis();
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        ForkJoinTask<Long> task=new ForkJoinDemo(0L,10_0000_0000L);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long sum=submit.get();
        long endTime=System.currentTimeMillis();
        System.out.println("sum="+sum+",时间:"+(endTime-startTime));
    }
    //使用Stream并行流
    public static void test3(){
        long startTime=System.currentTimeMillis();
        Long sum= LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);
        long endTime=System.currentTimeMillis();
        System.out.println("sum="+sum+",时间:"+(endTime-startTime));
    }
}

12 异步回调

Future设计的初衷:对将来的某个事件的结果进行建模
待定!!!!!

13 JMM(Java Memory Model)

Volatile:是Java虚拟机提供的轻量级的同步机制(轻锁)

  • 防止指令重排序,保证内存可见性(保证多线程环境下变量的相互可见性和有序性,不保证原子性)
  • 不保证原子性
  • 禁止指令重排

解释:

  • 保证可见性:当某个线程修改volatile变量,JMM会强制将这个修改更新到主存中,并且让其他线程工作内存中存储的副本失效。
  • 如果volatile修饰的变量被修改,会通过总线嗅探告知其他线程工作区,其他线程工作区会把这个变量设置为无效,然后重新读取。
  • 不保证原子性:不保证复合运算的原子性,但是保证单个操作的原子性。
  • 线程的工作内存对应虚拟机栈中的栈帧的局部变量表,主存相当于堆。

JMM:Java的内存模型,不存在的东西,概念!约定!

  • 1.线程解锁前,必须把共享变量立即刷回主存
  • 2.线程加锁前,必须读取主存中的最新值到工作内存中
  • 3.加锁和加锁是同一把锁

图片说明
事实:如果线程B修改了值,但是线程A不能及时可见,导致错误

内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外

  • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
  • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
  • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
  • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
  • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
  • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
  • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
  • write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

JMM对这八种指令的使用,制定了如下规则:

  • 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
  • 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
  • 不允许一个线程将没有assign的数据从工作内存同步回主内存
  • 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作
  • 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
  • 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
  • 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
  • 对一个变量进行unlock操作之前,必须把此变量同步回主内存

图片说明

14 Volatile

14.1 保证可见性

如果volatile修饰的变量被修改,会通过总线嗅探告知其他线程工作区,其他线程工作区会把这个变量设置为无效,然后重新读取。

代码测试如下:

package com.snowdong;

import java.util.concurrent.TimeUnit;

public class TestVolatile {
    //不加volatile,程序就会死循环
    //加volatile 可以保证可见性
    private volatile static int num=0;
    public static void main(String[] args) throws InterruptedException {//main主线程
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+" num="+num);
            while(num==0){

            }
            System.out.println(Thread.currentThread().getName()+" num="+num);
        }).start();

        TimeUnit.SECONDS.sleep(1);
        num=1;
        System.out.println(Thread.currentThread().getName()+" num="+num);
    }
}

14.2 不保证原子性

原子性:不可分割,

具体代码测试:

package com.snowdong;

public class TestVolatile01 {
    //定义一个线程共享资源
    /**
     * 如果在num字段上加上volatile关键字,还是出现问题
     * 所以得出volatile是不保证原子性的
     */
    private volatile static int num=0;
    public static void add(){
        num++;//++不是原子操作,有三步
    }

    public static void main(String[] args) {
        //理论上num的结果应该为20000
        //但是实际上不一定是
        /**
         * 解决方案:
         * 1、可以在add()方法上加上synchronized同步机制
         */
        for (int i = 1; i <= 20; i++) {
            new Thread(()->{
                for (int j = 0; j < 1000; j++) {
                    add();
                }
            }).start();
        }
        while(Thread.activeCount()>2){//肯定有两个线程在执行,main和gc
            Thread.yield();
            //Thread.yield()是在主线程执行的,意思是只要还有出了GC和main线程之外的线程在跑,...
            // 主线程就让出cpu不往下执行
        }
        System.out.println(Thread.currentThread().getName()+" "+num);
    }
}

解释:

  • java中对基本类型的变量赋值和读取是原子操作,但是想j=i,i++这样操作都不是原子操作的,因为他们进行了多次原子操作。
    上述方法编译后字节码如下:
    图片说明
    如果不加lock和synchronized,怎么样保证原子性?

  • 使用原子类,解决 原子性问题

  • Java提供java.util.concurrent.atomic包关于一些原子性操作的工具类
    具体代码如下:

    package com.snowdong;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class TestVolatile01 {
     //定义一个线程共享资源
     /**
      * 如果在num字段上加上volatile关键字,还是出现问题
      * 所以得出volatile是不保证原子性的
      */
     private volatile static AtomicInteger num=new AtomicInteger();
     public static void add(){
         //num++;//++不是原子操作,有三步
         num.getAndIncrement();//AtomicInteger + 1 方法, CAS(自旋锁)
     }
    
     public static void main(String[] args) {
         //理论上num的结果应该为20000
         //但是实际上不一定是
         /**
          * 解决方案:
          * 1、可以在add()方法上加上synchronized同步机制,但是其是一个重量级同步机制
          * 2、使用java.util.concurrent.atomic包提供的原子性操作的工具类
          */
         for (int i = 1; i <= 20; i++) {
             new Thread(()->{
                 for (int j = 0; j < 1000; j++) {
                     add();
                 }
             }).start();
         }
         while(Thread.activeCount()>2){//肯定有两个线程在执行,main和gc
             Thread.yield();
             //Thread.yield()是在主线程执行的,意思是只要还有出了GC和main线程之外的线程在跑,...
             // 主线程就让出cpu不往下执行
         }
         System.out.println(Thread.currentThread().getName()+" "+num);
     }
    }

    这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在!

14.3 指令重排

指令重排:你写的程序,计算机并不是按照你写的那样去执行的。

  • 源代码-->编译器优化的重排--> 指令并行也可能会重排--> 内存系统也会重排---> 执行
  • 处理器在进行指令重排的时候,考虑:数据之间的依赖性!
int x = 1; // 1
int y = 2; // 2
x = x + 5; // 3
y = x * x; // 4
我们所期望的:1234 但是可能执行的时候回变成 2134 1324
可不可能是 4123!不可能

可能造成影响的结果: a b x y 这四个值默认都是 0;

线程A 线程B
x=a y=b
b=1 a=2

正常的结果: x = 0;y = 0;但是可能由于指令重排

线程A 线程B
b=1 a=2
x=a y=b

指令重排导致的诡异结果: x = 2;y = 1;

volatile可以避免指令重排:内存屏障,PCU指令。
作用:

  • 保证特定的操作的执行顺序
  • 可以保证某些变量的内存可见性
    volatile写前加SS屏障,写后加SL屏障;读前加LL屏障,读后加LS屏障

总结:Volatile 是可以保持 可见性。不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!
图片说明

15 单例模式