JUC并发编程

环境配置

alt

alt

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
</dependency>

1. 什么是JUC

源码+官方文档

alt

https://docs.oracle.com/javase/8/docs/api/?xd_co_f=47c934d9-e663-4eba-819c-b726fc2d0847

java.util工具包、包、分类

业务:普通的线程代码 Thread

Runnable:没有返回值,效率相比与Callable相对较低!

2. 线程和进程

进程:一个程序,QQ、music程序的集合;

一个进程往往包含多个线程,至少包含一个!

java默认有两个线程:main 、GC,开线程方式有Thread、Runnable、Callable

并发、并行

并发:多个线程操作同一个资源

  • CPU一核,模拟出来多条线程,快速交替

并行:

  • CPU多核,多个线程同时执行

    //获取CPU的核数
    //CPU密集型   IO密集型
    System.out.println(Runtime.getRuntime().availableProcessors());
    

并发编程的本质:充分利用CPU的资源

线程状态

public enum State {
    //创建、新生
    NEW,
    //运行
    RUNNABLE,
    //阻塞
    BLOCKED,
    //等待
    WAITING,
    //超时等待
    TIMED_WAITING,
    //终止
    TERMINATED;
}

wait和sleep区别

  1. 来自不同的类

    • wait:Object
    • sleep:Thread
  2. 关于锁的释放

    wait会释放锁,sleep不会释放锁

线程就是一个单独的资源类,没有任何附属的操作!!!!!!!!!!!

3. Lock锁

Lock l = ...;
 l.lock();
 try {
   // 业务代码
 } finally {
   l.unlock();
 }
#所有已知的实现类:
1.ReentrantLock:可重入锁, 
2.ReentrantReadWriteLock.ReadLock:读锁, 
3.ReentrantReadwriteLock.WriteLock:写锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
//NonfairSync:非公平锁,可以插队(默认)
//FairSync:公平锁,一定要排队

synchronized和Lock的区别

  1. synchronized是内置的java关键字;Lock是一个Java类
  2. synchronized无法判断获取锁的状态;Lock可以判断是否获取到了锁
  3. synchronized会自动释放锁;Lock锁必须手动释放锁,如果不释放,产生死锁
  4. synchronized会产生阻塞,没有获取到锁的线程会等待;Lock锁就不一定会等待下去
  5. synchronized可重入锁,不可以中断,非公平;Lock可重入锁,可以判断锁,可以选择是否公平锁
  6. synchronized适合锁少量的代码同步;Lock适合锁大量的同步代码

4. 生产者和消费者问题

synchronized版本

package com.kaka.demo;

/**
 * 线程之间的通信问题:生产这和消费者问题
 * 线程交替执行 A B操作同一个变量 num=0
 * A num+1
 * B num-1
 */
public class Test02 {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
    }
}


class Data{
    private int number;
    //+1
    public synchronized void increment() throws InterruptedException {
        if(number!=0){
            //等待
            this.wait();
        }
        number++;
        System.out.println(Thread.currentThread().getName()+":线程:"+number);
        //通知其他线程,+1完毕
        this.notifyAll();
    }
    //-1
    public synchronized void decrement() throws InterruptedException {
        if(number==0){
            //等待
            this.wait();
        }
        number--;
        System.out.println(Thread.currentThread().getName()+":线程:"+number);
        //通知其他线程,-1完毕
        this.notifyAll();
    }
}

上述逻辑,如果超过两个线程,那么就会出现问题:虚假唤醒

alt

优化方法:if变为while

public synchronized void increment() throws InterruptedException {
    while(number!=0){//条件判断修改为while循环判断
        //等待
        this.wait();
    }
    number++;
    System.out.println(Thread.currentThread().getName()+":线程:"+number);
    //通知其他线程,+1完毕
    this.notifyAll();
}

JUC版本

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }

alt

package com.kaka.demo;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 线程之间的通信问题:生产这和消费者问题
 * 线程交替执行 A B操作同一个变量 num=0
 * A num+1
 * B num-1
 */
public class test03 {
    public static void main(String[] args) {
        Data2 data = new Data2();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"C").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"D").start();
    }
}

class Data2{
    private int number;
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    //+1
    public void increment() throws InterruptedException {
        lock.lock();
        try {
            while(number!=0){
                //等待
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName()+":线程:"+number);
            //通知其他线程,+1完毕
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    //-1
    public  void decrement() throws InterruptedException {
        lock.lock();
        try {
            while(number==0){
                //等待
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName()+":线程:"+number);
            //通知其他线程,-1完毕
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}
有序执行

上述版本是无序的,如何精准唤醒,然后顺序执行业务,这是Lock相比于synchronized的提升之处

package com.kaka.demo;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 线程之间的通信问题:生产这和消费者问题
 * A->B->C->A 顺序循环执行
 */
public class test0 {
    public static void main(String[] args) {
        Data0 data = new Data0();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printA();
            }
        },"A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printB();
            }
        },"B").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data.printC();
            }
        },"C").start();
    }
}
class Data0{
    Lock lock = new ReentrantLock();
    Condition condition1 = lock.newCondition();
    Condition condition2 = lock.newCondition();
    Condition condition3 = lock.newCondition();
    private int num = 1; //1A 2B 3C
    public void printA(){
        try {
            lock.lock();
            while(num!=1){
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName()+"线程:数字="+num);
            num = 2;
            condition2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void printB(){
        try {
            lock.lock();
            while(num!=2){
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName()+"线程:数字="+num);
            num = 3;
            condition3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void printC(){
        try {
            lock.lock();
            while(num!=3){
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName()+"线程:数字="+num);
            num = 1;
            condition1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

5.8锁现象

package com.kaka.lock8;
import java.util.concurrent.TimeUnit;
public class Test1 {
    /**
     * 关于锁的八个问题:
     * 1、标准情况下,两个线程谁先打印?  发短信-打电话
     * 2、send方法延迟4秒,哪个先打印?  发短信-打电话
     * 3、新增普通方法hello,哪个先打印?  打招呼-发短信-打电话
     * 4、两个对象,两个同步方法,哪个先打印?  打电话-打招呼-发短信
     * 5、增加两个静态方法.哪个先打印?    打招呼-发短信-打电话
     * 6、两个对象,两个静态方法,哪个先打印?  打招呼-发短信-打电话
     * 7、增加普通同步方法game,哪个先打印?  玩游戏-打招呼-发短信
     * 8、两个对象,一个静态同步方法send,一个普通同步方法,哪个先打印? 玩游戏-打招呼-发短信
     */
    public static void main(String[] args) {
        Phone phone = new Phone();
        Phone phone2 = new Phone();
        new Thread(()->{
            phone.send();
        },"线程A").start();
        try {
            TimeUnit.SECONDS.sleep(1);//休眠用这个
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(()->{
            phone2.game();
        },"线程B").start();
        new Thread(()->{
            phone.hello();
        },"线程C").start();
    }
}
class Phone{
    //synchronized 锁的对象是方法的调用者
    //两个方法用的是同一个锁,谁先拿到谁执行
    //static 静态方法
    //类一加载就有了!锁的是Class对象
    public static synchronized void send(){
        try {
            TimeUnit.SECONDS.sleep(4);//休眠用这个
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信:"+Thread.currentThread().getName());
    }
    public static synchronized void call(){
        System.out.println("打电话:"+Thread.currentThread().getName());
    }
    public void hello(){
        System.out.println("打招呼:"+Thread.currentThread().getName());
    }
    public synchronized void game(){
        System.out.println("玩游戏:"+Thread.currentThread().getName());
    }
}

6. 集合类不安全

List

多线程操作集合:java.util.ConcurrentModificationException

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListUnsafe {
    public static void main(String[] args) {
        List<String> list = new CopyOnWriteArrayList<>();
        /**
         * 解决方案:
         * 1. List<String> list = new Vector<>();
         * 2. List<String> list = Collections.synchronizedList(new ArrayList<>());
         * 3. List<String> list = new CopyOnWriteArrayList<>();
         *      底层:private transient volatile Object[] array;
         *  CopyOnWrite:写入时复制,COW 计算机程序设计领域的一种优化策略
         * 多个线程调用的时候,list读取的时候固定,写入会覆盖
         * 在写入的时候避免覆盖造成数据问题
         * 
         * CopyOnWriteArrayList和 Vector相比优势在哪里?
         * synchronized效率略低,而CopyOnWrite用的是Lock锁
         */
        for (int i = 1; i < 20; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,5));
                System.out.println(list);
            },String.valueOf(i).concat("线程")).start();
        }
    }
}

Set

  1. Set set = Collections.synchronizedSet(new HashSet<>());
  2. Set set = new CopyOnWriteArraySet<>();

HashSet的底层本质就是HashMap:

private static final Object PRESENT = new Object();

public HashSet() {
    map = new HashMap<>();
}

public boolean add(E e) {
    return map.put(e, PRESENT)==null;
}

HashMap

Map<String,Object> map = new HashMap<>();
/**
 * 上述定义等价于  
 * Map<String,Object> map = new HashMap<String,Object>(16,0.75f);
 */

Map<String,Object> map = Collections.synchronizedMap(new HashMap<>());
Map<String,Object> map = new Hashtable<>();
Map<String,Object> map = new ConcurrentHashMap<>();

7. Callable

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/**
*FutureTask 是 Runnable子类
*/
public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());//适配类
        new Thread(futureTask,"A").start();
        new Thread(futureTask,"B").start();
        Integer integer = futureTask.get();//get方法会产生阻塞,需要放到最后或者使用异步通信处理
        System.out.println(integer);
    }
}
class MyThread implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("call".concat(Thread.currentThread().getName()));
        return 1024;
    }
}
//=============================----
callA  
1024
//=================================
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
//第二次执行时,由于state不是new,所以直接返回,没有执行callable对象的call方法

8. 常用辅助类

8.1 CountDownLatch-减法计数器

import java.util.concurrent.CountDownLatch;

public class FuZhu {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"执行完毕"+countDownLatch.getCount());
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();//等待计数器归零,然后向下执行,还可以设置超时时间await(long timeout, TimeUnit unit)
        System.out.println("执行完毕");
    }
}

8.2. CyclicBarrier-加法计数器

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class FuZhu {
    public static void main(String[] args) throws InterruptedException {
        String[] sArr = {"卡卡罗特","贝吉塔","布儿玛","库林","比克","布罗利","弗利沙"};
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("出来吧,神龙");
        });
        for (int i = 1; i < 8; i++) {
            int temp = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"收集了"+temp+"颗龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },sArr[i-1]).start();
        }
    }
}
//=====================================
贝吉塔收集了2颗龙珠
卡卡罗特收集了1颗龙珠
布儿玛收集了3颗龙珠
库林收集了4颗龙珠
比克收集了5颗龙珠
布罗利收集了6颗龙珠
弗利沙收集了7颗龙珠
出来吧,神龙

Process finished with exit code 0

8.3 Semaphore-信号量

import java.util.concurrent.*;

public class FuZhu {
    public static void main(String[] args) throws InterruptedException {
        String[] sArr = {"卡卡罗特","贝吉塔","布儿玛","库林","比克","布罗利","弗利沙"};

        //线程数量:停车位,可以看做是限流量,每次只限3个线程
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i < 8; i++) {
            int temp = i;
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"抢到龙珠");
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"扔了龙珠");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            },sArr[i-1]).start();
        }
    }
}
//==========================================
卡卡罗特抢到龙珠
贝吉塔抢到龙珠
布儿玛抢到龙珠
贝吉塔扔了龙珠
库林抢到龙珠
卡卡罗特扔了龙珠
比克抢到龙珠
布儿玛扔了龙珠
布罗利抢到龙珠
库林扔了龙珠
布罗利扔了龙珠
比克扔了龙珠
弗利沙抢到龙珠
弗利沙扔了龙珠

9. 读写锁

ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//写锁(独占锁):一次只能被一个线程占有
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
//读锁(共享锁):多个线程可以同时占有
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
writeLock.lock();
writeLock.unlock();
readLock.lock();
readLock.unlock();

10. 阻塞队列

alt

使用场景:多线程并发处理,线程池

四组API(新增、移除、队首):

  • 抛出异常:add、remove() 、element ---->Queue full + NoSuchElementException
  • 不抛异常:offer、poll()、peek ---->false、null
  • 阻塞等待:put 、take()
  • 超时等待:offer(a,12,TimeUnit.xxx),poll(12,TimeUnit.xxx)

SynchronousQueue同步队列

alt

SynchronousQueue:不能存储元素,put了一个元素,就必须从里面take出来,否则不能再put进去。

11. 线程池

池化技术

  • 程序的运行,本质就是占用系统的资源!优化资源使用技术!
  • 线程池、连接池、内存池、对象池
  • 事先准备好资源,若有人使用,直接来拿,用完还回来。

线程池好处

  1. 降低资源的消耗
  2. 提高响应的速度
  3. 方便管理

线程复用、可以控制最大并发数、管理线程

==不推荐使用Executors创建,而是通过底层的方法ThreadPoolExecutor==

//ExecutorService threadPool = Executors.newSingleThreadExecutor(); //单个线程
//ExecutorService threadPool = Executors.newFixedThreadPool(4);//固定数量线程
ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩的
//上面三个方法底层都是调用了ThreadPoolExecutor方法,只不过参数有区别
try {
    for (int i = 0; i < 100; i++) {
        threadPool.execute(()->{
            System.out.println(Thread.currentThread().getName());
        });
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    TimeUnit.SECONDS.sleep(1);
    threadPool.shutdownNow();
}
Executors弊端
  • FixedThreadPool和SingleThreadExecutor:允许请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,造成oom
  • CachedThreadPool和ScheduledThreadPool:允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,造成oom

七大参数

public ThreadPoolExecutor(int corePoolSize,//核心线程数
                              int maximumPoolSize,//最大核心线程数
                              long keepAliveTime,//非核心线程超时多长时间没有人使用就会释放
                              TimeUnit unit,//超时单位
                              BlockingQueue<Runnable> workQueue,//阻塞队列
                              ThreadFactory threadFactory,//线程工厂,创建线程了,一般不用动
                              RejectedExecutionHandler handler) {//拒绝策略
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

拒绝策略

AbortPolicy:抛出异常

throw new RejectedExecutionException("Task " + r.toString() +
                                     " rejected from " +
                                     e.toString());

CallerRunsPolicy:由调用线程执行,main线程

if (!e.isShutdown()) {
    r.run();
}

DiscardOldestPolicy:队首poll出去,执行新线程

if (!e.isShutdown()) {
    e.getQueue().poll();
    e.execute(r);
}

DiscardPolicy:不抛出异常,任务也不会执行,直接丢掉

{}没有任何其他操作

最大线程数

  • CPU密集型,几核就是几,可以保持CPU的效率最高!

    //获取CPU核数
    {
       Runtime.getRuntime().availableProcessors() 
    }
    
  • IO密集型,判断程序中有几个IO消耗大的任务数,大于这个数即可,一般2倍

12. 四大函数式接口

新时代的程序员:Lambda表达式、链式编程、函数式接口、Stream流式计算

函数式接口:只有一个方法的接口

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
//超级多的FunctionalInterface
//简化编程模型、在新版本的框架底层大量应用
//forEach(Consumer<? super E> action)  (消费者类的函数式接口)

!alt

Function-函数型接口

@FunctionalInterface
public interface Function<T, R> {
    R apply(T t);
//输入参数T,输出结果R
    
//应用
Function function = new Function<String,String>() {
    @Override
    public String apply(String str) {
        return str;
    }
};
//Lambda表达式改写
Function function = (str)->{return str;};

Predicate-断定型接口

@FunctionalInterface
public interface Predicate<T> {
    boolean test(T t);
//输入参数T,输出结果是boolean
Predicate<String> predicate = new Predicate<String>() {
    @Override
    public boolean test(String str) {
        return str.isEmpty();
    }
};

Predicate<String> predicate = (str)->{return str.isEmpty();};

Consumer-消费型接口

@FunctionalInterface
public interface Consumer<T> {
    void accept(T t);
}
//只有入参,不需要返回

Supplier-提供型接口

@FunctionalInterface
public interface Supplier<T> {
    T get();
}
//无需入参,返回参数T

13. Stream流式计算

public class testStream {
    public static void main(String[] args) throws InterruptedException {
        /**
         * 要求:
         *  有5个用户,筛选:
         *      1.ID为偶数
         *      2.年龄大于23
         *      3.用户名转成大写字母
         *      4.用户名字母倒排
         *      5.只输出一个用户
         */
        User u1 = new User(1,"a",21);
        User u2 = new User(2,"b",22);
        User u3 = new User(3,"c",23);
        User u4 = new User(4,"d",24);
        User u5 = new User(6,"e",25);

        Arrays.asList(u1,u2,u3,u4,u5).stream()
                .filter((u)->{return u.getId()%2==0;})
                .filter((u)->{return u.getAge()>23;})
                .map((u)->{
                    u.setName(u.getName().toUpperCase());
                    return u;
                })
                .sorted((x,y)->{ return y.getName().compareTo(x.getName());})
                .limit(1)
                .forEach(System.out::println);
    }
}

@Data
@AllArgsConstructor
@ToString
class User{
    private int id;
    private String name;
    private int age;
}

14. ForkJoin

ForkJoin在JDK1.7,并行执行任务!提高效率,大数据量时~

大数据:Map Reduce(把大任务分成小任务)

特点:工作窃取

任务保存在双端队列中,如果中间有线程完成任务,则会从其他线程取出来未完成任务继续执行,提高效率

JDK1.8的并行流效率更高

System.out.println(LongStream.rangeClosed(1,10000000L).parallel().reduce(0,Long::sum));

15. 异步回调

CompletableFuture

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
    System.out.println("ok");
    int i = 1/0;
    return 200;
});

System.out.println(future.whenComplete((t,u)->{
    System.out.println("t:"+t);
    System.out.println("u:"+u);
}).exceptionally((e)->{
    return 500;
}).get());
//=====================================
ok
t:null
u:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
500

16. JMM

java内存模型,一系列的概念和约定

关于JMM同步的约定

  1. 线程解锁前,必须把共享变量立即刷回主存
  2. 线程加锁前,必须读取主存中的最新值到工作内存中
  3. 加锁和解锁是同一把锁

线程:工作内存 主存

8个操作:

read+load、use+assign、store+write、lock+unlock

alt

JMM的八种交互操作(每个操作都为原子操作)

  • lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
  • unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
  • read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
  • load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
  • use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
  • assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
  • store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
  • write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

三、对八种操作的规则

  • ​ 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
  • ​ 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
  • ​ 不允许一个线程将没有assign的数据从工作内存同步回主内存
  • ​ 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
  • ​ 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
  • ​ 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
  • ​ 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
  • ​ 对一个变量进行unlock操作之前,必须把此变量同步回主内存

17. volatile

java虚拟机提供的轻量级同步机制

  • 保证可见性
  • 不保证原子性
  • 禁止指令重拍

使用原子类保证原子性

alt

//volatile不保证原子性,需要和JUC中的原子类AtomicInteger一起使用
private volatile static AtomicInteger num = new AtomicInteger();

public static void add(){
    //num++;//不是原子操作
    num.getAndIncrement();//AtomicInteger加一的方法    CAS
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
    for (int i = 0; i < 20; i++) {
        new Thread(()->{
            for (int j = 0; j < 1000; j++) {
                add();
            }
        }).start();
    }
    while (Thread.activeCount()>2){
        Thread.yield();
    }
    System.out.println(num);//20000
}
Unsafe类
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

指令重排

什么是指令重排:我们写的程序,计算机并不是按照我们写的那样去执行的

源代码-》编译器优化的重排-》指令并行也有可能重排-》内存系统也会重排-》执行

==处理器重排的时候,会考虑数据之间的依赖性==

volatile可以避免指令重排

内存屏障,cpu指令:

  1. 保证特定的操作的执行顺序
  2. 可以保某些变量的内存可见性(利用这些特性volatile实现了可见性)

18. 单例模式

饿汉式

/**
 * 饿汉式单例
 */
public class HunSingleton {
    private HunSingleton(){};
    private final static HunSingleton hunSingleton = new HunSingleton();
    public static HunSingleton getInstance(){
        return hunSingleton;
    }
}

懒汉式

一般懒汉式:多线程并发会有问题

/**
 * 一般懒汉式:多线程并发会有问题
 */
public class LazySingleton {
    public LazySingleton() {}
    private static LazySingleton lazySingleton;
    
    public static LazySingleton getInstance(){
        if(lazySingleton == null){
            lazySingleton = new LazySingleton();
        }
        return lazySingleton;
    }
}

双重检测锁懒汉(DCL)-反射可以破坏单例

/**
 * 双重检测锁模式懒汉--DCL   double check lock
 */
public class LazySingleton {
    public LazySingleton() {}
    private static volatile LazySingleton lazySingleton;
    
    public static LazySingleton getInstance(){
        if(lazySingleton == null){
            synchronized (LazySingleton.class){
                if(lazySingleton == null){
                    lazySingleton = new LazySingleton();
                }
            }
        }
        return lazySingleton;
    }
}

下面这个不是原子性操作
lazySingleton = new LazySingleton();
1.分配内存空间
2.执行构造方法,初始化对象
3.把这个对象指向这个空间
    
    123 没有问题,如果是132,3执行完的时候有一个线程进来了,那么会获取到未完成构造的对象,这是有问题的
所以用volatile修饰

三重检测锁DCL-反射依然可以破坏,但是难度增加

public class LazySingleton {
	//这个参数的类型和判断方式可以采用一些加密算法,使得破解难度增加,但是还是有可能破解
    private static Boolean key = false;

    public LazySingleton() {
        if(key){
            throw new RuntimeException("No reflecting Exception");
        }else{
            key = true;
        }
    }
    private static LazySingleton lazySingleton;

    public static LazySingleton getInstance(){
        if(lazySingleton == null){
            synchronized (LazySingleton.class){
                if(lazySingleton == null){
                    lazySingleton = new LazySingleton();
                }
            }
        }
        return lazySingleton;
    }
    
    
    /**
     * 下面这段代码就可以破坏三重的单例模式
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        Field key = LazySingleton.class.getDeclaredField("key");
        key.setAccessible(true);

        Constructor<LazySingleton> constructor = LazySingleton.class.getDeclaredConstructor(null);
        constructor.setAccessible(true);

        LazySingleton lazySingleton = constructor.newInstance();
        System.out.println(lazySingleton);

        key.set(lazySingleton,false);

        LazySingleton lazySingleton2 = constructor.newInstance();
        System.out.println(lazySingleton2);
    }
    //==================================================
    //com.kaka.demo.LazySingleton@74a14482
	//com.kaka.demo.LazySingleton@677327b6
}

枚举类单例

public enum EnumSingle {
    INSTANCE;
    public EnumSingle getInstance(){
        return INSTANCE;
    }
}
class Test{
    public static void main(String[] args) throws Exception {
        EnumSingle instance1 = EnumSingle.INSTANCE;
        System.out.println(instance1);
        Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);
        declaredConstructor.setAccessible(true);
        EnumSingle enumSingle = declaredConstructor.newInstance();
        System.out.println(enumSingle);
    }
}
//=================================================
Exception in thread "main" java.lang.NoSuchMethodException: com.kaka.demo.EnumSingle.<init>()
    at java.lang.Class.getConstructor0(Class.java:3082)
	at java.lang.Class.getDeclaredConstructor(Class.java:2178)
	at com.kaka.demo.Test.main(EnumSingle.java:11)

如上所示,抛出异常没有无参构造,用jad工具反编译

PS F:\java开发\workspace\juc\target\classes\com\kaka\demo> jad -sjava .\EnumSingle.class
Parsing .\EnumSingle.class... Generating EnumSingle.java
public final class EnumSingle extends Enum
{

    public static EnumSingle[] values()
    {
        return (EnumSingle[])$VALUES.clone();
    }

    public static EnumSingle valueOf(String name)
    {
        return (EnumSingle)Enum.valueOf(com/kaka/demo/EnumSingle, name);
    }

    private EnumSingle(String s, int i)
    {
        super(s, i);
    }

    public EnumSingle getInstance()
    {
        return INSTANCE;
    }

    public static final EnumSingle INSTANCE;
    private static final EnumSingle $VALUES[];

    static 
    {
        INSTANCE = new EnumSingle("INSTANCE", 0);
        $VALUES = (new EnumSingle[] {
            INSTANCE
        });
    }
}

只有一个有参构造,

Constructor<EnumSingle> declaredConstructor = 
    EnumSingle.class.getDeclaredConstructor(String.class,int.class);

//==========================================================
Exception in thread "main" java.lang.IllegalArgumentException: Cannot reflectively create enum objects
	at java.lang.reflect.Constructor.newInstance(Constructor.java:416)
	at com.kaka.demo.Test.main(EnumSingle.java:13)

综上所述,枚举类是安全的单例

19. CAS

CAS,英文为Compare And Swap,中文意思为:比较并交换,它是一种无锁原子操作算法。大致过程是这样的:CAS包含三个参数,V、E和N。V表示待更新的变量,E是预期值,N表示新值。仅仅当V的值为E,才将V的值变更为N,否则不做任何处理。V的值不为E,必然是因为其他线程已经对V进行了修改。

CAS的返回结果为一个boolean值,true表示更新成功,false表示更新失败。

用CAS会使得代码变得更复杂一些,但是因为其天生的乐观特性(总是认为绝大多数情况能更新成功),所以天生地对线程竞争具有免疫性。同时,其优越的性能也比锁要高效很多。

//java.util.concurrent.atomic.AtomicInteger#getAndIncrement
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

//sun.misc.Unsafe
//Java无法操作内存,但是可以通过native调用C++操作内存,Unsafe类就是这个类
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
	//自旋锁
    return var5;
}

CAS是CPU的并发原语

CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,则执行操作!如果不是就一直循环!

20. 原子引用-解决ABA问题-版本号

public class CASDemo {
    //注意:如果泛型是包装类,对象引用会出现问题
    static AtomicStampedReference<Integer> s = new AtomicStampedReference<>(10,1);
    public static void main(String[] args) {
        new Thread(()->{
            int stamp = s.getStamp();
            System.out.println("kaka1d"+stamp+"#"+s.getReference());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(s.compareAndSet(10, 12, s.getStamp(), s.getStamp() + 1));
            System.out.println("kaka1s"+s.getStamp()+"#"+s.getReference());
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(s.compareAndSet(12, 10, s.getStamp(), s.getStamp() + 1));
            System.out.println("kaka1a"+s.getStamp()+"#"+s.getReference());
        },"a1").start();
        new Thread(()->{
            int stamp = s.getStamp();
            System.out.println("kaka2"+stamp+"#"+s.getReference());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(s.compareAndSet(10, 11, s.getStamp(), s.getStamp() + 1));
            System.out.println("kaka2"+s.getStamp()+"#"+s.getReference());
        },"a1").start();
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println("final"+s.getStamp()+"#"+s.getReference());
    }
}
//===============================================================
kaka21#10
kaka1d1#10
true
kaka1s2#12
false
kaka22#12
true
kaka1a3#10
final3#10
Process finished with exit code 0

21. 锁

公平锁和非公平锁

  • 公平锁:不能插队,必须先来后到!
  • 非公平锁:可以插队,默认都是非公平锁!
public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

可重入锁(递归锁)

锁的操作粒度是”线程”,而不是调用(至于为什么要这样,下面解释).同一个线程再次进入同步代码的时候.可以使用自己已经获取到的锁,这就是可重入锁。

可重入锁主要用在线程需要多次进入临界区代码时,需要使用可重入锁。具体的例子,比如上文中提到的一个synchronized方法需要调用另一个synchronized方法时。

为每个锁关联一个获取计数器和一个所有者线程,当计数值为0的时候,这个锁就没有被任何线程持有。

当线程请求一个未被持有的锁时,JVM将记下锁的持有者,并且将获取计数值置为1,如果同一个线程再次获取这个锁,计数值将递增,退出一次同步代码块,计算值递减,当计数值为0时,这个锁就被释放。

***ps:可重入是指对同一线程而言。***某个线程获得了某个锁,可以再次获得而不会产生死锁。

自旋锁-spinlock

线程1获得了锁,线程2只能用自旋锁等待,线程1释放锁,自旋锁结束!

自旋锁本质就是一个使线程等待的Boolean标志,一般用while关键字实现。

死锁

  1. 使用jps -l 定位进程号

    F:\java开发\workspace\juc>jps -l
    13584 org.jetbrains.jps.cmdline.Launcher
    4548 sun.tools.jps.Jps
    3436
    
  2. 使用jstack pid(进程号,由jps获得)找到死锁问题