11 ForkJoin
特点:工作窃取
FoekJoin代码实例说明:
package com.snowdong;
import java.util.concurrent.RecursiveTask;
/**
* 求和计算的任务!
* 3000 6000(ForkJoin) 9000(Stream并行流)
* // 如何使用 forkjoin
* // 1、forkjoinPool 通过它来执行
* // 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
* // 3. 计算类要继承 ForkJoinTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
//临界值
private Long temp=10000L;
public ForkJoinDemo(Long start,Long end){
this.start=start;
this.end=end;
}
// 计算方法
@Override
protected Long compute() {
if((end-start)<temp){
Long sum=0L;
for (Long i = start; i < =end; i++) {
sum +=i;
}
return sum;
}else{//forkJoin 递归
Long middle=start+(end-start)/2;
ForkJoinDemo task1=new ForkJoinDemo(start,middle);
task1.fork(); // 拆分任务,把任务压入线程队列
ForkJoinDemo task2=new ForkJoinDemo(middle+1,end);
task2.fork() ;// 拆分任务,把任务压入线程队列
return task1.join()+task2.join();
}
}
} 测试代码说明:
package com.snowdong;
//ForkJion在JDK1.7,并行执行任务,提高效率。大数据量
//大数据:map reduce 把大人物拆分成小任务
import com.sun.xml.internal.ws.api.model.wsdl.WSDLOutput;
import org.w3c.dom.ls.LSOutput;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class TestForkJoin {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//test1();//sum=500000000500000000,时间:7661
test2();//sum=500000000500000000,时间:7486
//test3();//sum=500000000500000000,时间:305
}
//一般使用方法
public static void test1(){
Long sum=0L;
long startTime= System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000 ; i++) {
sum+=i;
}
long endTime=System.currentTimeMillis();
System.out.println("sum="+sum+",时间:"+(endTime-startTime));
}
//会使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long startTime= System.currentTimeMillis();
ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinTask<Long> task=new ForkJoinDemo(0L,10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long sum=submit.get();
long endTime=System.currentTimeMillis();
System.out.println("sum="+sum+",时间:"+(endTime-startTime));
}
//使用Stream并行流
public static void test3(){
long startTime=System.currentTimeMillis();
Long sum= LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);
long endTime=System.currentTimeMillis();
System.out.println("sum="+sum+",时间:"+(endTime-startTime));
}
} 12 异步回调
Future设计的初衷:对将来的某个事件的结果进行建模
待定!!!!!
13 JMM(Java Memory Model)
Volatile:是Java虚拟机提供的轻量级的同步机制(轻锁)
- 防止指令重排序,保证内存可见性(保证多线程环境下变量的相互可见性和有序性,不保证原子性)
- 不保证原子性
- 禁止指令重排
解释:
- 保证可见性:当某个线程修改volatile变量,JMM会强制将这个修改更新到主存中,并且让其他线程工作内存中存储的副本失效。
- 如果volatile修饰的变量被修改,会通过总线嗅探告知其他线程工作区,其他线程工作区会把这个变量设置为无效,然后重新读取。
- 不保证原子性:不保证复合运算的原子性,但是保证单个操作的原子性。
- 线程的工作内存对应虚拟机栈中的栈帧的局部变量表,主存相当于堆。
JMM:Java的内存模型,不存在的东西,概念!约定!
- 1.线程解锁前,必须把共享变量立即刷回主存
- 2.线程加锁前,必须读取主存中的最新值到工作内存中
- 3.加锁和加锁是同一把锁
事实:如果线程B修改了值,但是线程A不能及时可见,导致错误
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
14 Volatile
14.1 保证可见性
如果volatile修饰的变量被修改,会通过总线嗅探告知其他线程工作区,其他线程工作区会把这个变量设置为无效,然后重新读取。
代码测试如下:
package com.snowdong;
import java.util.concurrent.TimeUnit;
public class TestVolatile {
//不加volatile,程序就会死循环
//加volatile 可以保证可见性
private volatile static int num=0;
public static void main(String[] args) throws InterruptedException {//main主线程
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" num="+num);
while(num==0){
}
System.out.println(Thread.currentThread().getName()+" num="+num);
}).start();
TimeUnit.SECONDS.sleep(1);
num=1;
System.out.println(Thread.currentThread().getName()+" num="+num);
}
} 14.2 不保证原子性
原子性:不可分割,
具体代码测试:
package com.snowdong;
public class TestVolatile01 {
//定义一个线程共享资源
/**
* 如果在num字段上加上volatile关键字,还是出现问题
* 所以得出volatile是不保证原子性的
*/
private volatile static int num=0;
public static void add(){
num++;//++不是原子操作,有三步
}
public static void main(String[] args) {
//理论上num的结果应该为20000
//但是实际上不一定是
/**
* 解决方案:
* 1、可以在add()方法上加上synchronized同步机制
*/
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while(Thread.activeCount()>2){//肯定有两个线程在执行,main和gc
Thread.yield();
//Thread.yield()是在主线程执行的,意思是只要还有出了GC和main线程之外的线程在跑,...
// 主线程就让出cpu不往下执行
}
System.out.println(Thread.currentThread().getName()+" "+num);
}
} 解释:
java中对基本类型的变量赋值和读取是原子操作,但是想j=i,i++这样操作都不是原子操作的,因为他们进行了多次原子操作。
上述方法编译后字节码如下:
如果不加lock和synchronized,怎么样保证原子性?使用原子类,解决 原子性问题
Java提供java.util.concurrent.atomic包关于一些原子性操作的工具类
具体代码如下:package com.snowdong; import java.util.concurrent.atomic.AtomicInteger; public class TestVolatile01 { //定义一个线程共享资源 /** * 如果在num字段上加上volatile关键字,还是出现问题 * 所以得出volatile是不保证原子性的 */ private volatile static AtomicInteger num=new AtomicInteger(); public static void add(){ //num++;//++不是原子操作,有三步 num.getAndIncrement();//AtomicInteger + 1 方法, CAS(自旋锁) } public static void main(String[] args) { //理论上num的结果应该为20000 //但是实际上不一定是 /** * 解决方案: * 1、可以在add()方法上加上synchronized同步机制,但是其是一个重量级同步机制 * 2、使用java.util.concurrent.atomic包提供的原子性操作的工具类 */ for (int i = 1; i <= 20; i++) { new Thread(()->{ for (int j = 0; j < 1000; j++) { add(); } }).start(); } while(Thread.activeCount()>2){//肯定有两个线程在执行,main和gc Thread.yield(); //Thread.yield()是在主线程执行的,意思是只要还有出了GC和main线程之外的线程在跑,... // 主线程就让出cpu不往下执行 } System.out.println(Thread.currentThread().getName()+" "+num); } }这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在!
14.3 指令重排
指令重排:你写的程序,计算机并不是按照你写的那样去执行的。
- 源代码-->编译器优化的重排--> 指令并行也可能会重排--> 内存系统也会重排---> 执行
- 处理器在进行指令重排的时候,考虑:数据之间的依赖性!
int x = 1; // 1 int y = 2; // 2 x = x + 5; // 3 y = x * x; // 4 我们所期望的:1234 但是可能执行的时候回变成 2134 1324 可不可能是 4123!不可能
可能造成影响的结果: a b x y 这四个值默认都是 0;
| 线程A | 线程B |
|---|---|
| x=a | y=b |
| b=1 | a=2 |
正常的结果: x = 0;y = 0;但是可能由于指令重排
| 线程A | 线程B |
|---|---|
| b=1 | a=2 |
| x=a | y=b |
指令重排导致的诡异结果: x = 2;y = 1;
volatile可以避免指令重排:内存屏障,PCU指令。
作用:
- 保证特定的操作的执行顺序
- 可以保证某些变量的内存可见性
volatile写前加SS屏障,写后加SL屏障;读前加LL屏障,读后加LS屏障
总结:Volatile 是可以保持 可见性。不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!
![]()



京公网安备 11010502036488号