11 ForkJoin
特点:工作窃取
FoekJoin代码实例说明:
package com.snowdong; import java.util.concurrent.RecursiveTask; /** * 求和计算的任务! * 3000 6000(ForkJoin) 9000(Stream并行流) * // 如何使用 forkjoin * // 1、forkjoinPool 通过它来执行 * // 2、计算任务 forkjoinPool.execute(ForkJoinTask task) * // 3. 计算类要继承 ForkJoinTask */ public class ForkJoinDemo extends RecursiveTask<Long> { private Long start; private Long end; //临界值 private Long temp=10000L; public ForkJoinDemo(Long start,Long end){ this.start=start; this.end=end; } // 计算方法 @Override protected Long compute() { if((end-start)<temp){ Long sum=0L; for (Long i = start; i < =end; i++) { sum +=i; } return sum; }else{//forkJoin 递归 Long middle=start+(end-start)/2; ForkJoinDemo task1=new ForkJoinDemo(start,middle); task1.fork(); // 拆分任务,把任务压入线程队列 ForkJoinDemo task2=new ForkJoinDemo(middle+1,end); task2.fork() ;// 拆分任务,把任务压入线程队列 return task1.join()+task2.join(); } } }
测试代码说明:
package com.snowdong; //ForkJion在JDK1.7,并行执行任务,提高效率。大数据量 //大数据:map reduce 把大人物拆分成小任务 import com.sun.xml.internal.ws.api.model.wsdl.WSDLOutput; import org.w3c.dom.ls.LSOutput; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; public class TestForkJoin { public static void main(String[] args) throws ExecutionException, InterruptedException { //test1();//sum=500000000500000000,时间:7661 test2();//sum=500000000500000000,时间:7486 //test3();//sum=500000000500000000,时间:305 } //一般使用方法 public static void test1(){ Long sum=0L; long startTime= System.currentTimeMillis(); for (Long i = 1L; i <= 10_0000_0000 ; i++) { sum+=i; } long endTime=System.currentTimeMillis(); System.out.println("sum="+sum+",时间:"+(endTime-startTime)); } //会使用ForkJoin public static void test2() throws ExecutionException, InterruptedException { long startTime= System.currentTimeMillis(); ForkJoinPool forkJoinPool=new ForkJoinPool(); ForkJoinTask<Long> task=new ForkJoinDemo(0L,10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task); Long sum=submit.get(); long endTime=System.currentTimeMillis(); System.out.println("sum="+sum+",时间:"+(endTime-startTime)); } //使用Stream并行流 public static void test3(){ long startTime=System.currentTimeMillis(); Long sum= LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum); long endTime=System.currentTimeMillis(); System.out.println("sum="+sum+",时间:"+(endTime-startTime)); } }
12 异步回调
Future设计的初衷:对将来的某个事件的结果进行建模
待定!!!!!
13 JMM(Java Memory Model)
Volatile:是Java虚拟机提供的轻量级的同步机制(轻锁)
- 防止指令重排序,保证内存可见性(保证多线程环境下变量的相互可见性和有序性,不保证原子性)
- 不保证原子性
- 禁止指令重排
解释:
- 保证可见性:当某个线程修改volatile变量,JMM会强制将这个修改更新到主存中,并且让其他线程工作内存中存储的副本失效。
- 如果volatile修饰的变量被修改,会通过总线嗅探告知其他线程工作区,其他线程工作区会把这个变量设置为无效,然后重新读取。
- 不保证原子性:不保证复合运算的原子性,但是保证单个操作的原子性。
- 线程的工作内存对应虚拟机栈中的栈帧的局部变量表,主存相当于堆。
JMM:Java的内存模型,不存在的东西,概念!约定!
- 1.线程解锁前,必须把共享变量立即刷回主存
- 2.线程加锁前,必须读取主存中的最新值到工作内存中
- 3.加锁和加锁是同一把锁
事实:如果线程B修改了值,但是线程A不能及时可见,导致错误
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
14 Volatile
14.1 保证可见性
如果volatile修饰的变量被修改,会通过总线嗅探告知其他线程工作区,其他线程工作区会把这个变量设置为无效,然后重新读取。
代码测试如下:
package com.snowdong; import java.util.concurrent.TimeUnit; public class TestVolatile { //不加volatile,程序就会死循环 //加volatile 可以保证可见性 private volatile static int num=0; public static void main(String[] args) throws InterruptedException {//main主线程 new Thread(()->{ System.out.println(Thread.currentThread().getName()+" num="+num); while(num==0){ } System.out.println(Thread.currentThread().getName()+" num="+num); }).start(); TimeUnit.SECONDS.sleep(1); num=1; System.out.println(Thread.currentThread().getName()+" num="+num); } }
14.2 不保证原子性
原子性:不可分割,
具体代码测试:
package com.snowdong; public class TestVolatile01 { //定义一个线程共享资源 /** * 如果在num字段上加上volatile关键字,还是出现问题 * 所以得出volatile是不保证原子性的 */ private volatile static int num=0; public static void add(){ num++;//++不是原子操作,有三步 } public static void main(String[] args) { //理论上num的结果应该为20000 //但是实际上不一定是 /** * 解决方案: * 1、可以在add()方法上加上synchronized同步机制 */ for (int i = 1; i <= 20; i++) { new Thread(()->{ for (int j = 0; j < 1000; j++) { add(); } }).start(); } while(Thread.activeCount()>2){//肯定有两个线程在执行,main和gc Thread.yield(); //Thread.yield()是在主线程执行的,意思是只要还有出了GC和main线程之外的线程在跑,... // 主线程就让出cpu不往下执行 } System.out.println(Thread.currentThread().getName()+" "+num); } }
解释:
java中对基本类型的变量赋值和读取是原子操作,但是想j=i,i++这样操作都不是原子操作的,因为他们进行了多次原子操作。
上述方法编译后字节码如下:
如果不加lock和synchronized,怎么样保证原子性?使用原子类,解决 原子性问题
Java提供java.util.concurrent.atomic包关于一些原子性操作的工具类
具体代码如下:package com.snowdong; import java.util.concurrent.atomic.AtomicInteger; public class TestVolatile01 { //定义一个线程共享资源 /** * 如果在num字段上加上volatile关键字,还是出现问题 * 所以得出volatile是不保证原子性的 */ private volatile static AtomicInteger num=new AtomicInteger(); public static void add(){ //num++;//++不是原子操作,有三步 num.getAndIncrement();//AtomicInteger + 1 方法, CAS(自旋锁) } public static void main(String[] args) { //理论上num的结果应该为20000 //但是实际上不一定是 /** * 解决方案: * 1、可以在add()方法上加上synchronized同步机制,但是其是一个重量级同步机制 * 2、使用java.util.concurrent.atomic包提供的原子性操作的工具类 */ for (int i = 1; i <= 20; i++) { new Thread(()->{ for (int j = 0; j < 1000; j++) { add(); } }).start(); } while(Thread.activeCount()>2){//肯定有两个线程在执行,main和gc Thread.yield(); //Thread.yield()是在主线程执行的,意思是只要还有出了GC和main线程之外的线程在跑,... // 主线程就让出cpu不往下执行 } System.out.println(Thread.currentThread().getName()+" "+num); } }
这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在!
14.3 指令重排
指令重排:你写的程序,计算机并不是按照你写的那样去执行的。
- 源代码-->编译器优化的重排--> 指令并行也可能会重排--> 内存系统也会重排---> 执行
- 处理器在进行指令重排的时候,考虑:数据之间的依赖性!
int x = 1; // 1 int y = 2; // 2 x = x + 5; // 3 y = x * x; // 4 我们所期望的:1234 但是可能执行的时候回变成 2134 1324 可不可能是 4123!不可能
可能造成影响的结果: a b x y 这四个值默认都是 0;
线程A | 线程B |
---|---|
x=a | y=b |
b=1 | a=2 |
正常的结果: x = 0;y = 0;但是可能由于指令重排
线程A | 线程B |
---|---|
b=1 | a=2 |
x=a | y=b |
指令重排导致的诡异结果: x = 2;y = 1;
volatile可以避免指令重排:内存屏障,PCU指令。
作用:
- 保证特定的操作的执行顺序
- 可以保证某些变量的内存可见性
volatile写前加SS屏障,写后加SL屏障;读前加LL屏障,读后加LS屏障
总结:Volatile 是可以保持 可见性。不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!
![]()