1. 基本概念

  • 程序

    计算机指令和数据的集合,是静态的。

  • 进程

    一个程序运行后,就是一个进程,是动态的,具有生命周期。

  • 线程

    进程可以细分为多个线程,是程序内部的一条执行路径。

2. 线程的生命周期

图片说明

3. 线程的创建

3.1 方式一:继承 Thread

继承 Thread 类创建线程的步骤如下:

  1. 创建一个类,继承 Thread 类,同时重写 Thread 类的 run() 方法。

    class MyThread extends Thread{
        @Override
        public void run() {
            // 线程代码
        }
    }
  1. 创建 MyThread 类的对象,然后调用 start() 方法让线程处于就绪状态。

    Thread t1 = new MyThread();
    t1.start();

3.2 方式二:实现 Runnable 接口

实现 Runnable 接口创建线程的步骤如下:

  1. 创建一个类,实现 Runnable 接口。

    class MyThread implements Runnable{
    
        @Override
        public void run() {
            // 线程代码
        }
    }
  2. 创建 MyThread 类的对象,然后创建 Thread 类对象,把 MyThread 类的对象作为参数传入到 Thread 类的构造器中,最后调用 Thread 对象的 start() 让线程处于就绪状态。

    MyThread mt = new MyThread();
    Thread t1 = new Thread(mt);
    t1.start();

3.3 方式三:实现 Callable 接口

实现 Callable 接口创建线程的步骤如下:

  1. 声明一个类,实现 Callable 接口。实现 Callble 接口需要传入一个泛型参数,这个泛型参数就是返回值的类型。

    class Cal implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
    
            int sum = 0;
            for(int i = 0; i < 1000;i++){
                sum +=i;
            }
    
            return sum;
        }
    }
  1. 创建 Cal 对象和 FutrueTask 对象,其中需要把 Cal 对象当作参数传入 FutureTask 的构造器。

    Cal cal = new Cal();
    FutureTask<Integer> futureTask = new FutureTask<>(cal);
  1. 创建 Thread 对象,并把 FutureTask 对象当作参数传入 Thread 构造器,同时调用 Thread 对象的 start() 方法让线程处于就绪状态。

    new Thread(futureTask).start();
  1. 调用 FutureTask 对象的 get() 方法,获得线程的返回值。

    try {
       // 获取返回值
       int sum = futureTask.get();
       System.out.println("计算的结果为:"+sum);
    } catch (InterruptedException e) {
       e.printStackTrace();
    } catch (ExecutionException e) {
       e.printStackTrace();
    }

使用 Callable 方式创建线程的优点:

  1. 创建的线程具有返回值和可以抛出异常。

4. 线程退出的方法

  1. 【不推荐】调用线程对象的 stop() 方法

    调用线程对象的 stop() 方法,会让对应的线程突然终止,无法知道线程做了哪些工作,又有哪些工作没有做,且还不能做线程的清理工作。

  2. 【推荐】设置线程的退出标志

    volatile关键字:让声明的变量在多个线程中具有可见性,保证线程读取此变量的值时都是最新修改的

定义一个使用 volatile 修饰的变量,提供对此变量修改的方法,通过修改此变量的值达到退出线程的目的。

class Main{
    public static void main(String[] args){
        MyThread mt = new MyThread();
        Thread t1 = new Thread(mt, "线程一");

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        try {
            // 退出线程
            mt.setRunning(false);
            // 等待 t1 线程退出
            t1.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("退出所以线程");
    }
}

class Mythread implements Runnable{
    private volatile boolean running = true;

    public void setRunning(boolean running) {
        this.running = running;
    }

    @Override
    public void run() {
        while(running){
            // 线程代码
     }
    }
}

5. 常用方法

方法 功能
sleep() 静态方法,让当前线程阻塞一定的时间。
join() 如果 a 线程里调用了 b 线程对象的 join() ,a 线程就会阻塞,直到 b 线程结束运行,a 线程才会切换到就绪状态。
yield() 放弃 CPU 的使用权,进入就绪状态。
setName() 设置线程的名称。
setPriority() 设置线程的优先级,有三个内置的值,分别为:NORM_PRIORITY, MIN_PRIORITY, MAX_PRIORITY 。

6. 线程的同步

6.1 线程的安全问题

当多个线程对同一个数据(共享数据)进行操作时,就会出现线程的安全问题,例如下面的示例:

public class Main {
    public static void main(String[] args) {
        Windows wr1 = new Windows(100);
        Thread t1 = new Thread(wr1, "窗口一");
        Thread t2 = new Thread(wr1, "窗口二");
        Thread t3 = new Thread(wr1, "窗口三");

        t1.start();
        t2.start();
        t3.start();
    }
}

class Windows implements Runnable{
    private int tickets;

    public Windows(int ticket){
        this.tickets = ticket;
    }

    @Override
    public void run(){
        while(true){
            if(tickets > 0){
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                tickets--;
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread().getName()+":"+tickets+"票");
            }else{
                break;
            }
        }
    }
}

假设线程一进入到运行状态时,tickets == 1 ,然后执行 sleep(); 进入阻塞状态 。线程二进入到运行状态,此时 tickets == 1 ,所以也执行 sleep 进入阻塞状态。当线程一再次进入运行状态时,会执行 tickets--;sleep(); 代码再次进入阻塞状态。紧接着线程二进入运行状态时,执行 tickets--,代码执行完后 tickets == -1 ,tickets 此时的值显然是不对的,出现这个问题的原因时:多个线程对共享数据进行操作时,一个线程还没有操作完成,另外一个线程就进来操作了。解决方法就是:当一个线程操作共享数据时,只要没有操作完成,其他线程就无法进行操作

6.2 线程的同步原理

在 Java 中,每个对象都有一个锁(有时也称为同步监视器),当一个线程获取到对象的锁后,其他线程就无法获取该对象的锁,直到该线程释放掉锁后,其他线程才可以获取到。同步线程的方式有三种,这三种方式的原理都是一样的,都是:线程进入同步代码中去时,就会获取到同步代码中指定的同步监视器,只要线程没从同步代码中出去,就不会释放掉同步监视器。当切换到其他线程时,由于没有获取到同步代码中同步监视器,便无法进入到同步代码中,该线程就会一直阻塞,直到获取到同步代码中的同步监视器才能进入到同步代码中去。也就是说,在同步代码中,永远都只有单线程在执行。

6.3 同步代码块

同步代码块的语法如下:

synchronized(同步监视器){
    //同步代码
}
  1. 操作共享数据(多个线程共同操作的变量)时,就需要进行线程的同步。
  2. 任何对象都可以充当同步监视器。
  3. 代码同步时,多个线程要共用一个同步监视器,否则同步代码将会无效。
  4. 同步代码不能多,也不同少,多和少都会出现问题。

6.4 同步方法

同步方法的语法如下:

修饰符 synchronized  方法返回值 方法名(形参列表)extends 父类名 implements 接口名 throws 异常名{
    // 同步代码
}
  1. 修饰静态方法时,同步监视器为方法所属的类。
  2. 修饰不同方法时,同步监视器为 this 。

6.5 示例代码 BUG 解决

把 Windows 类的 run 方法改成如下代码:

@Override
public void run(){
    while(true){
        synchronized(this){
            if(tickets > 0){
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                tickets--;
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread().getName()+":"+tickets+"票");
            }else{
                break;
            }
        }

    }
}

7 线程的通信

7.1 线程通信的方法

涉及到线程通信的方法有三个,分别为 wait()notifynotifyAll()。这三个方法是 Object 类的方法,所以任何对象都可以使用。

功能 方法名
让调用该线程的方法进入阻塞状态,同时释放同步监视器。 wait()
唤醒一个正在 wait() 的线程,如果有多个线程正在 wait() ,则优先唤醒优先级较高的线程。 notify()
唤醒所以正在 wait() 的线程。 notifyAll()

说明:

  1. wait()notify()notifyAll(),必须在同步代码块或同步方法中使用。

  2. wait()notify()notifyAll() 三个方法的调用者必须是同步代码块或同步方法中的监视器。

补充:

  1. sleep()wait() 的异同?

    同:都让当前线程进入阻塞状态。

    异:1)sleep() 在任何地方都可以用,wait() 只能在同步代码块或同步方法中使用。

    ​ 2)sleep() 属于 Thread 类,wait() 属于 Object 类。

    ​ 3)sleep() 使当前线程进入阻塞模式,不会释放当前线程持有的同步监视器,而 wait() 除了让当前线程进入阻塞状态,还好释放当前线程持有的同步监视器。

7.2 生产者消费者模型

流程图:

图片说明

参考代码:

/**
 * @author opendragonhuang
 * @version 1.0
 * @date 2019/7/29
 */
public class Main {
    public static void main(String[] args) {
        Resource r = new Resource();
        Thread t1 = new Thread(new Producer(r), "生产者");
        Thread t2 = new Thread(new Consume(r), "消费者一");

        t1.start();
        t2.start();
    }
}

class Resource{
    private final int MAX_SIZE = 20;
    private int curSize;

    public void add(){
        synchronized (this){
            if(curSize >= MAX_SIZE ){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                curSize++;
                System.out.println(Thread.currentThread().getName()+"生产,总库存:"+curSize);
                notifyAll();
            }
        }
    }

    public void remove(){
        synchronized (this){
            if(curSize <= 0){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                curSize--;
                System.out.println(Thread.currentThread().getName()+"消费,总库存:"+curSize);
                notifyAll();
            }
        }
    }
}

class Producer implements Runnable{
    private Resource resource;

    public Producer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            resource.add();
        }
    }
}

class Consume implements Runnable{
    private Resource resource;

    public Consume(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            resource.remove();
        }
    }
}

上面的代码,运行的时候,结果确实显示正常,但实则隐含有巨大的 BUG,当把 MAX_SIZE 改成 1 时,再次运行程序,会发现程序无法正常结束。现在我们来分析原因。

假设,某次程序运行时,其输出结果为:

生产者生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0

在输出结果的倒数第二行,程序输出:库存已满,无法生产,通过数输出结果可以发现,Producer 线程中的 i == 9Consume 线程中的 i == 4 。当消费者消费了一个商品后,就会把生产者线程唤醒,使得生产者线程退出。但是,消费者线程还没有结束,下一次消费者线程获得 CPU 使用权时,就会进入阻塞状态,且再也没有线程将消费者线程唤醒,从而造成了程序无法正常结束,而这一现象被称为:虚假唤醒

好了,知道了问题的存在,那么现在就开始修改代码,把代码中的 Resource 类修改成如下:

class Resource{
    private final int MAX_SIZE = 10;
    private int curSize;

    public void add(){
        synchronized (this){
            if(curSize >= MAX_SIZE ){
                try {
                    System.out.println("库存已满,无法生产");
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            curSize++;
            System.out.println(Thread.currentThread().getName()+"生产,总库存:"+curSize);
            notifyAll();
        }
    }

    public void remove(){
        synchronized (this){
            if(curSize <= 0){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            curSize--;
            System.out.println(Thread.currentThread().getName()+"消费,总库存:"+curSize);
            notifyAll();
        }
    }
}

修改完成后,运行程序,发现程序的运行结果正常,且程序也正常了退出了。然后,这个程序还是存在 BUG。当存在多个消费者和生产者线程时,商品的个数会出现负数的存储。如:把主函数代码改成如下,再次运行程序。

public static void main(String[] args) {
    Resource r = new Resource();
    Thread t1 = new Thread(new Producer(r), "生产者一");
    Thread t2 = new Thread(new Producer(r), "生产者二");
    Thread t3 = new Thread(new Consume(r), "消费者一");
    Thread t4 = new Thread(new Consume(r), "消费者二");

    t1.start();
    t2.start();
    t3.start();
    t4.start();
}

程序输出结果为:

生产者一生产,总库存:1
库存已满,无法生产
库存已满,无法生产
消费者一消费,总库存:0
生产者一生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者一生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者一生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者一生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者一生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者一生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者一生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者一生产,总库存:1
库存已满,无法生产
消费者一消费,总库存:0
生产者一生产,总库存:1
消费者二消费,总库存:0
消费者一消费,总库存:-1
消费者二消费,总库存:-2
消费者二消费,总库存:-3
消费者二消费,总库存:-4
消费者二消费,总库存:-5
消费者二消费,总库存:-6
消费者二消费,总库存:-7
消费者二消费,总库存:-8
消费者二消费,总库存:-9
消费者二消费,总库存:-10
生产者二生产,总库存:-9
生产者二生产,总库存:-8
生产者二生产,总库存:-7
生产者二生产,总库存:-6
生产者二生产,总库存:-5
生产者二生产,总库存:-4
生产者二生产,总库存:-3
生产者二生产,总库存:-2
生产者二生产,总库存:-1
生产者二生产,总库存:0

所以,为了解决这个 BUG,需要把 Resource 类修改成如下,也就是把 add() 方法和 remove() 方法的 if 改成 while,参考代码如下:

class Resource{
    private final int MAX_SIZE = 1;
    private int curSize;

    public void add(){
        synchronized (this){
            if(curSize >= MAX_SIZE ){
                try {
                    System.out.println("库存已满,无法生产");
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            curSize++;
            System.out.println(Thread.currentThread().getName()+"生产,总库存:"+curSize);
            notifyAll();
        }
    }

    public void remove(){
        synchronized (this){
            while(curSize <= 0){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            curSize--;
            System.out.println(Thread.currentThread().getName()+"消费,总库存:"+curSize);
            notifyAll();
        }
    }
}

运行程序,没有问题。程序的全部代码参考如下:

/**
 * @author opendragonhuang
 * @version 1.0
 * @date 2019/7/29
 */
public class Main {
    public static void main(String[] args) {
        Resource r = new Resource();
        Thread t1 = new Thread(new Producer(r), "生产者一");
        Thread t2 = new Thread(new Producer(r), "生产者二");
        Thread t3 = new Thread(new Consume(r), "消费者一");
        Thread t4 = new Thread(new Consume(r), "消费者二");

        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

class Resource{
    private final int MAX_SIZE = 1;
    private int curSize;

    public void add(){
        synchronized (this){
            while(curSize >= MAX_SIZE ){
                try {
                    System.out.println("库存已满,无法生产");
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            curSize++;
            System.out.println(Thread.currentThread().getName()+"生产,总库存:"+curSize);
            notifyAll();
        }
    }

    public void remove(){
        synchronized (this){
            while(curSize <= 0){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            curSize--;
            System.out.println(Thread.currentThread().getName()+"消费,总库存:"+curSize);
            notifyAll();
        }
    }
}

class Producer implements Runnable{
    private Resource resource;

    public Producer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            resource.add();
        }
    }
}

class Consume implements Runnable{
    private Resource resource;

    public Consume(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            resource.remove();
        }
    }
}

8. votatile

(1)内存可见性

内存可见性(Memory Visibility)是指当某个线程正在使用对象状态 而另一个线程在同时修改该状态,需要确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。

可见性错误是指当读操作与写操作在不同的线程中执行时,我们无法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚至是根本不可能的事情。

对于可见性错误可以通过同步来保证对象被安全地发布。除此之外也可以 使用一种更加轻量级的 volatile 变量。

(2)volatile 解决内存可见性错误的原理

假设有一个变量 i,没有使用 volatile 关键字修饰,当线程 1 对 i 进行读写时,线程 1 首先是把 i 从主存中拷贝到线程 1 所在的内存中。线程 1 对 i 所做的修改都是在线程 1 所在的内存中进行的,直到修改完成后,才把 i 的值写回主存中。如果在这期间存在线程 2 修改 i 的值话,线程 1 是无法第一时间得知的。当变量 i 使用了 volatile 修饰后,线程对 i 的修改全部都在主存中完成,一个线程修改了 i 的值,另一个线程去修改时,就会立刻发现 i 变量已被其他线程修改了。volatile 修饰的变量和非 volatile 修饰的变量内存模型如下:

图片说明

图片说明

总结:volatile 变 量,用来确保将变量的更新操作通知到其他线程。

9. 原子变量 CAS 算法

(1)原子性

原子是世界上的最小单位,具有不可分割性。比如: a=0;( a 非 long 和 double 类型) 这个操作是不可分割的,那么我们说这个操作时原子操作。再比如:a++; 这个操作实际是 a = a + 1;是可分割的,所以他不是一个原子操作。

(2)CAS 算法

CAS (Compare-And-Swap) 是一种硬件对并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问,从而保证数据变量的原子性。

CAS 包含了 3 个操作数:

  • 需要读写的内存值 V
  • 进行比较的值 A
  • 拟写入的新值 B
  • 当且仅当 V 的值等于 A 时,CAS 通过原子方式用新值 B 来更新 V 的 值,否则不会执行任何操作。

(3)原子变量

类的小工具包,支持在单个变量上解除锁的线程安全编程。事实上,此包中的类可将 volatile 值、字段和数组元素的概念扩展到那些也提供原子条件更新操作的类。

  • AtomicBooleanAtomicIntegerAtomicLongAtomicReference 的实例各自提供对 相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。
  • AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray 类进一步扩展了原子操 作,对这些类型的数组提供了支持。这些类在为其数组元素提供 volatile 访问语义方 面也引人注目,这对于普通数组来说是不受支持的。
  • 核心方法:boolean compareAndSet(expectedValue, updateValue)
  • java.util.concurrent.atomic 包下提供了一些原子操作的常用类:
    • AtomicBooleanAtomicIntegerAtomicLongAtomicReference
    • AtomicIntegerArrayAtomicLongArray
    • AtomicMarkableReference
    • AtomicReferenceArray
    • AtomicStampedReference

(4)原子性问题演示

10. 闭锁

CountDownLatch 一个同步辅助类,他运行一个线程或多个线程等待其他线程执行完后才去执行。

  • 闭锁可以用来确保某些线程直到其他线程都完成后才继续执行。
  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动;
  • 等待直到某个操作所有参与者都准备就绪再继续执行。

使用示例:

import java.util.concurrent.CountDownLatch;

/**
 * @author opendragonhuang
 * @version 1.0
 * @date 2019/7/29
 */
public class CountDownLatchDemo {
    public static void main(String[] args) {
        // 设置 CountDownLatch 监控线程的数量
        CountDownLatch countDownLatch = new CountDownLatch(5);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 5; i++) {
            new Thread(new MyThread(countDownLatch)).start();
        }
        while (countDownLatch.getCount() > 0);
        long end = System.currentTimeMillis();
        System.out.println("线程执行时间:"+(end-start));
    }
}

class MyThread implements Runnable{
    private CountDownLatch countDownLatch;

    public MyThread(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        countDownLatch.countDown();
    }
}

11. Lock

在 Java 5.0 之前,协调共享对象的访问时可以使用的机 制只有 synchronized 和 volatile 。Java 5.0 后增加了一些 新的机制,但并不是一种替代内置锁的方法,而是当内置锁不适用时,作为一种可选择的高级功能。

ReentrantLock 实现了 Lock 接口,并提供了与 synchronized 相同的互斥性和内存可见性。但相较于 synchronized 提供了更高的处理锁的灵活性。

使用 Lock 进行内存同步的示例:

/**
 * @author opendragonhuang
 * @version 1.0
 * @date 2019/7/29
 */
public class Main {
    public static void main(String[] args) {
        Windows wr1 = new Windows(100);
        Thread t1 = new Thread(wr1, "窗口一");
        Thread t2 = new Thread(wr1, "窗口二");
        Thread t3 = new Thread(wr1, "窗口三");

        t1.start();
        t2.start();
        t3.start();
    }
}

class Windows implements Runnable{
    private int tickets;
    private Lock lock = new ReentrantLock();

    public Windows(int ticket){
        this.tickets = ticket;
    }

    public Windows(int tickets, Lock lock) {
        this.tickets = tickets;
        this.lock = lock;
    }

    @Override
    public void run(){
        while(true) {
            lock.lock();
            // 为了保证 unlock() 方法一定得到执行,所以用 try-finally
            try {
                if (tickets > 0) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    tickets--;
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println(Thread.currentThread().getName() + ":" + tickets + "票");
                } else {
                    break;
                }
            } finally {
                lock.unlock();
            }
        }

    }
}

12. Condition

Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用 法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关 联。为了避免兼容性问题,Condition 方法的名称与对应的 Object 版 本中的不同。

  • Condition 对象中,与 waitnotifynotifyAll 方法对应的分别是 awaitsignalsignalAll
  • Condition 实例实质上被绑定到一个锁上。要为特定 Lock 实例获得 Condition 实例,请使用其 newCondition() 方法。

使用 Lock 和 Condition 实现身材者和消费者模型:

public class Main {
    public static void main(String[] args) {
        Resource r = new Resource();
        Thread t1 = new Thread(new Producer(r), "生产者一");
        Thread t2 = new Thread(new Producer(r), "生产者二");
        Thread t3 = new Thread(new Consume(r), "消费者一");
        Thread t4 = new Thread(new Consume(r), "消费者二");

        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

class Resource{
    private final int MAX_SIZE = 1;
    private int curSize;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void add(){
        lock.lock();
        try {
            while(curSize >= MAX_SIZE ){
                try {
                    System.out.println("库存已满,无法生产");
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            curSize++;
            System.out.println(Thread.currentThread().getName()+"生产,总库存:"+curSize);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void remove(){
        lock.lock();
        try {
            while(curSize <= 0){
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            curSize--;
            System.out.println(Thread.currentThread().getName()+"消费,总库存:"+curSize);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

class Producer implements Runnable{
    private Resource resource;

    public Producer(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            resource.add();
        }
    }
}

class Consume implements Runnable{
    private Resource resource;

    public Consume(Resource resource) {
        this.resource = resource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            resource.remove();
        }
    }
}

13. ReadWriteLock

多线程编程中,对于共享资源读操作可以并发执行,而写操作任意时刻只能一个线程在执行。但是,传统的线程同步方法中,是不会区分这些的,为了解决这个问题,出现了 ReadWriteLock 。

ReadWriteLock 读取操作通常不会改变共享资源,但执行 写入操作时,必须独占方式来获取锁。对于读取操作占
多数的数据结构。 ReadWriteLock 能提供比独占锁更高 的并发性。而对于只读的数据结构,其中包含的不变性
可以完全不需要考虑加锁操作。

演示示例:

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/*
 * 写写/读写 需要“互斥”
 * 读读 不需要互斥
 * 
 */
public class TestReadWriteLock {

    public static void main(String[] args) {
        ReadWriteLockDemo rw = new ReadWriteLockDemo();

        new Thread(new Runnable() {

            @Override
            public void run() {
                rw.set((int)(Math.random() * 101));
            }
        }, "Write:").start();


        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    rw.get();
                }
            }).start();
        }
    }

}

class ReadWriteLockDemo{

    private int number = 0;

    private ReadWriteLock lock = new ReentrantReadWriteLock();

    //读
    public void get(){
        lock.readLock().lock(); //上锁

        try{
            System.out.println(Thread.currentThread().getName() + " : " + number);
        }finally{
            lock.readLock().unlock(); //释放锁
        }
    }

    //写
    public void set(int number){
        lock.writeLock().lock();

        try{
            System.out.println(Thread.currentThread().getName());
            this.number = number;
        }finally{
            lock.writeLock().unlock();
        }
    }
}

14. 线程池

线程池提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。

(1)线程池类的继承关系

图片说明

  • java.util.concurrent.Executor : 负责线程的使用与调度的根接口
  • ExecutorService 子接口: 线程池的主要接口
  • ThreadPoolExecutor 线程池的实现类
  • ScheduledExecutorService 子接口:负责线程的调度
  • ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService

线程池一般情况下不是通过 new 来创建的,而是通过 Executors 对应的工厂方法来创建:

  • ExecutorService newFixedThreadPool() : 创建固定大小的线程池。
  • ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
  • ExecutorService newSingleThreadExecutor() : 创建单个线程池。线程池中只有一个线程。
  • ScheduledExecutorService newScheduledThreadPool():创建定大小可以时间调度的线程池。

(2)线程池使用示例

public class ThreadPoolTest {
    public static void main(String[] args) {
        // 获取线程池,线程池容量为 5
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        ArrayList<Future<Integer>> arrayList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Future<Integer> submit = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum = 0;
                    for (int j = 0; j < 100; j++) {
                        sum += j;
                    }

                    return sum;
                }
            });
            arrayList.add(submit);
        }
        // 关闭线程池
        executorService.shutdown();
        int sum  = 0;
        for (Future<Integer> integerFuture : arrayList) {
            try {
                System.out.println(integerFuture.get());
                sum += integerFuture.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        System.out.println("总和为:"+sum);

    }

}
public class ThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        // 指定的时间开启线程
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(new Random().nextInt(1000));
            }
        }, 2, TimeUnit.SECONDS);
        // 关闭线程
        scheduledExecutorService.shutdown();
    }

}

15. Fork/Join 框架

Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成 若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进
行 join 汇总。

采用 “工作窃取”模式(work-stealing):当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。

相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了 线程的等待时间,提高了性能。

使用参考程序:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * @author opendragonhuang
 * @version 1.0
 * @date 2019/7/29
 */
public class ForkJoinPoolTest {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        long start = System.currentTimeMillis();
        Long result = forkJoinPool.invoke(new ForkJoinSumCal(0, 1000000000L));
        long end = System.currentTimeMillis();
        System.out.print("Fork/Join 框架耗时为:"+(end-start));
        System.out.println(",结算结果为:"+result);

        result = 0L;
        start = System.currentTimeMillis();
        for (long i = 0; i <= 1000000000L; i++) {
            result += i;
        }
        end = System.currentTimeMillis();
        System.out.print("普通循环耗时为:"+(end-start));
        System.out.println(",结算结果为:"+result);
    }
}

class ForkJoinSumCal extends RecursiveTask<Long>{
    private long start;
    private long end;
    private final long THRESHOLD = 10000;

    public ForkJoinSumCal(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        if(end -start < THRESHOLD){
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else{
            long mid = (start+end)/2;
            ForkJoinSumCal left = new ForkJoinSumCal(start, mid);
            left.fork();
            ForkJoinSumCal right = new ForkJoinSumCal(mid+1, end);
            right.fork();

            return left.join()+right.join();
        }
    }
}

输出结果:

Fork/Join 框架耗时为:1031,结算结果为:500000000500000000
普通循环耗时为:6375,结算结果为:500000000500000000