线程间的通信

JVM在运行时会将自己管理的内存区域,划分为不同的数据区,称为运行时数据区。每个线程都有自己私有的内存空间,如下图示:

在这里插入图片描述

Java线程按照自己虚拟机栈中的方法代码一步一步的执行下去,在这一过程中不可避免的会使用到线程共享的内存区域堆或方法区。为了防止多个线程在同一时刻访问同一个内存地址,需要互相告知自己的状态以避免资源争夺。

线程的通信方式主要分为三种方式:①共享内存②消息传递③管道流

共享内存:线程之间通过对共享内存的读-写来实现隐式通信。Java中的具体实现是:volatile共享内存。

消息传递:线程之间通过明确的发送消息来实现显示通信。Java中的具体实现是:等待/通知机制(wait/notify),join方法。

管道流:管道输入/输出流。

1、等待/通知机制

其过程是:线程A由于某些原因,自主调用了对象o的wait方法,进入WAITING状态,释放占有的锁并等待通知。而线程B则调用对象o的notify方法或notifyall方法进行通知,线程A会收到通知,并从wait方法中返回,继续执行后面的代码。

可以发现,线程A和线程B就是通过对象o的wait方法和notify方法来发送消息,进行通信。

wait方法和notify方法是Object类的方法,而Object类是所有类的父类,因此所有对象都实现了Object类的方法。即所有的对象都具有wait方法和notify方法。

方法 作用 备注
wait 线程调用共享对象的wait()方法后会进入WAITING状态,释放占有的对象锁并等待其他线程的通知或中断才从该方法返回。 该方法可以传参数,wait(long n):超时等待n毫秒,进入TIME-WAITING状态,如果在n毫秒内没有通知或中断,则自行返回
notify 线程调用共享对象的notify()方法后会通知一个调用了wait方法并在此等待的线程返回。但由于在共享变量上等待的线程可能不止一个,故具体通知哪一个线程是随机的。 notifyAll()方法与notify()方法作用一致,不过notify是随机通知一个线程,而notifyAll则是通知所有在该共享变量上等待的线程

由于线程的等待/通知机制需要借助共享对象,所以在调用wait方法前,线程必须先获得该对象的锁,即只能在同步方法或同步块(synchronized代码块)中调用wait方法,在调用wait方法后,线程释放锁。

同样的notify方法在调用前也需要获得对象的锁,即也只能在同步方法或同步块中调用notify方法。若有多个线程在等待,则线程调度器会随机挑选一个线程来通知。需要注意的是,被通知的线程并不会在得到通知后就马上从wait方法返回,而是需要等待获得对象的锁后才能从wait方法返回。而调用了notify方法的线程也并不会在调用时就马上释放对象的锁,而是在执行完同步方法或同步块(synchronized代码块)后,才释放对象的锁。因此,被通知的线程要等调用了notify的线程释放锁后,才能从wait方法中返回。

综上所述,等待/通知机制的经典范式如下:

/**
 * 等待线程(调用wait方法的线程)
 */
synchronized(共享对象){ //同步代码块,进入条件是获得锁
    while(判断条件){ //进行wait线程任务的条件不满足时进入
        共享对象.wait()
    }
    线程任务代码
}

/**
 * 通知线程(调用notify方法的线程)
 */
synchronized(共享对象){ //同步代码块,进入条件是获得锁
    线程任务代码
    改变wait线程任务的条件
    共享对象.notify()
}

根据以上范式,有代码如下:

public class WaitNotify {
    static boolean flag = true; //等待线程继续执行往下执行的条件
    static Object lock = new Object(); //上锁的对象

    public static void main(String[] args) throws InterruptedException {
        Thread waitThread = new Thread(new WaitRunnable(),"waitThread");   //以WaitRunnable为任务类的线程
        Thread notifyThread = new Thread(new NotifyRunnable(),"notifyThread");   //以NotifyRunnable为任务类的线程
        waitThread.start(); //wait线程启动
        Thread.sleep(2000); //主线程休眠2s
        notifyThread.start(); //notify线程启动
    }

    /**
     * Runnable等待实现类
     * synchronized关键字:可以修饰方法或者以同步块的形式来使用
     */
    static class WaitRunnable implements Runnable{
        @Override
        public void run() {
            //对lock加锁
            synchronized(lock){
                //判断,若flag为true,则继续等待(wait)
                while(flag){
                    try {
                        System.out.println(
                                Thread.currentThread().getName()+
                                "---flag为true,等待 @"+
                                new SimpleDateFormat("hh:mm:ss").format(new Date())
                        );
                        lock.wait(); //等待,并释放锁资源
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //若flag为false,则进行工作
                System.out.println(
                        Thread.currentThread().getName()+
                        "---flag为false,运行 @"+
                        new SimpleDateFormat("hh:mm:ss").format(new Date())
                );
            }
        }
    }

    /**
     * Runnable通知实现类
     */
    static class NotifyRunnable implements Runnable{
        @Override
        public void run(){
            //对lock加锁
            synchronized(lock){
                //以NotifyRunnable为任务类的线程释放lock锁,并进行通知后,以Wait为任务类的线程才可以跳出循环
                System.out.println(
                        Thread.currentThread().getName()+ 
                        "---当前持有锁,释放 @"+
                        new SimpleDateFormat("hh:mm:ss").format(new Date())
                );
                lock.notifyAll(); //通知所有正在等待的线程从wait返回
                flag = false;
                try {
                    Thread.sleep(5000); //notifyThread线程休眠5s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //再次对lock加锁,并休眠
            synchronized (lock){
                System.out.println(
                        Thread.currentThread().getName()+
                        "---再次持有锁,休眠 @"+
                        new SimpleDateFormat("hh:mm:ss").format(new Date())
                );
                try {
                    Thread.sleep(2000); //再次让notifyThread线程休眠2s
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


//该代码示例来自《Java并发编程的艺术》

其结果如下:

waitThread---flag为true,等待 @01:53:51
notifyThread---当前持有锁,释放 @01:53:53
waitThread---flag为false,运行 @01:53:58
notifyThread---再次持有锁,休眠 @01:53:58

以上代码根据等待/通知的经典范式,设置一个线程是否继续往下执行的条件变量flag,以及一个共享对象lock,并使用synchronized关键字对lock上锁。

waitThread线程是等待线程,在启动时会尝试获得锁,成功则进入synchronized代码块。在synchronized代码块中,如果条件不满足(即flag为true),则waitThread线程会进入while循环,并在循环体中调用wait方法,进入WAITING状态及释放锁资源。直到有其他线程调用notify方法通知才从wait方法返回。

notifyThread线程是通知线程,在启动时也会尝试获得锁,成功则同样进入synchronized代码块。在synchronized代码块中,notifyThread线程会改变条件,使waitThread线程可以继续往下执行(即令flag为false),同时notifyThread线程也会调用notyfiAll方法,让waitThread线程收到通知。

但注意,notifyThread线程并不会在调用notyfiAll方法后就马上释放锁,而是在执行完synchronized代码块的内容后才释放锁。我们在notifyThread线程调用notyfiAll后,将该线程休眠5s。可以从打印结果发现,在notifyThread线程休眠的5s中,即使waitThread线程得到了通知,且继续运行的条件也已满足(flag为flase),但waitThread线程在这5s中依然没有得到执行。在notifyThread线程5s的休眠时间结束后,并从synchronized代码块退出,waitThread线程才继续执行。所以,等待线程在得到通知后,仍然需要等待通知线程释放锁,并且在尝试获得锁成功后才能真正从wait方法中返回,并继续执行。

2、共享内存

有如下代码,

/**
 * @Author Feng Jian
 * @Date 2021/1/20 13:18
 * @Version 1.0
 */
public class JMMTest {
    private static boolean run = true;

    public static void main(String[] args) throws InterruptedException {
        Thread My_Thread = new Thread(new Runnable() {
            @Override
            public void run() {
                while(run){
                    //...
                }
            }
        }, "My_Thread");

        My_Thread.start();  //启动My_Thread线程

        System.out.println(Thread.currentThread().getName()+"正在休眠@"+new SimpleDateFormat("hh:mm:ss").format(new Date())+"--"+run);

        Thread.sleep(1000);  //主线程休眠1s
        run = false;  //改变My_Thread线程运行条件,但My_Thread线程并不会停下

        System.out.println(Thread.currentThread().getName()+"正在运行@"+new SimpleDateFormat("hh:mm:ss").format(new Date())+"--"+run);
    }
}

定义了一个变量run,并以此作为My_Thread线程中while循环执行的条件。在启动My_Thread线程,并使主线程休眠1s后,改变变量run的值。其结果如下:

在这里插入图片描述

可以看出,即使是run的值已经改变,但My_Thread线程依然不会停下来。为什么呢?这就需要了解Java的内存模型(JMM)。

我们知道,CPU要从内存中读取出数据来进行计算,但实际上CPU并不总是直接从内存中读取数据。由于CPU和内存间(常称之为主存)的速度不匹配(CPU的速度比主存快得多),为了有效利用CPU,使用多级cache的机制,如图

在这里插入图片描述

因此,CPU读取数据的顺序是:寄存器-高速缓存-主存。主存中的部分数据,会先拷贝一份放到cache中,当CPU计算时,会直接从cache中读取数据,计算完毕后再将计算结果放置到cache中,最后在主存中刷新计算结果。因此每个CPU都会拥有一份拷贝。

以上只是CPU访问内存,进行计算的基本方式。实际上,不同的硬件,访问过程会存在不同程度的差异。比如,不同的计算机,CPU和主存间可能会存在三级缓存、四级缓存、五级缓存等等的情况。

为了屏蔽掉各种硬件和操作系统的内存访问差异,实现让 Java 程序在各种平台下都能达到一致的内存访问效果,定义了Java的内存模型(Java Memory Model,JMM)。

JMM 的主要目标是定义程序中各个变量的访问规则,即在虚拟机中将变量存储到主存和从主存中取出变量这样的底层细节。这里的变量指的是能够被多个线程共享的变量,它包括了实例字段、静态字段和构成数组对象的元素,方法内的局部变量和方法的参数为线程私有,不受JMM的影响。

Java的内存模型如下,

在这里插入图片描述
JMM定义了线程和主内存之间的关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储着主内存中的共享变量的副本。

JMM规定:将所有共享变量放到主内存中,当线程使用变量时,会把其中的变量复制到自己的本地内存,线程读写时操作的是本地内存中的变量副本。一个线程不能访问其他线程的本地内存。

本地内存其实只是一个抽象的概念,它实际上并不真实存在,其包含了缓存、写缓冲区、寄存器以及其他的硬件和编译器的优化。

在多线程环境下,由于每个线程都有主内存***享变量的副本,所以当线程运行时,读取的是自己本地内存中的共享变量的副本,这就产生了线程的安全问题:比如主内存中的共享变量i为1,线程A和B从主内存取出变量i,放入自己的本地内存中成为共享变量i的副本。当线程A执行时,会直接从自己的本地内存中读取副本变量i的值,进行加1计算,完成后更新本地内存中的副本i的值,再写回到主内存中,此时主内存中的i的值为2。

而如果此时线程B也需要用到变量i的值,则它并不会去主内存中读取i的值,而是直接在自己的本地内存中读取i的副本,而此时线程B的本地内存中的副本i的值依然为1,而不是经过线程A修改后的,主内存中的值2。

这也是为什么在上述代码中,main线程明明已经修改了变量run的值,但My_Thread线程依然在执行while循环的原因。如图所示,

在这里插入图片描述

这同样是JMM所要处理的多线程可见性的问题:当一个共享变量在多个线程的工作内存中都有副本时,如果一个线程修改了这个共享变量的副本值,那么其他线程应该能够看到这个被修改后的值。即如何保证指令不会受 cpu 缓存的影响。

回到上述的代码,如何使My_Thread线程能接收到main线程已经修改run = false的信息?即My_Thread线程和main线程如何能够通信。

根据Java的内存模型,这两个线程如果需要通信,则必须经历以下两步:

①main线程把本地内存中修改过的共享变量run的值刷新到主内存中。
②My_Thread线程到主内存中去读取main线程之前已经更新过的共享变量run的值。

这意味着,两个线程的通信必须经过主内存。Java提供volitale关键字实现这一要求。

volitale关键字可以用来修饰字段(成员变量),告知Java程序任何对该变量的访问都要从共享内存(主内存)中获取,而对它的改变都必须同步刷新回共享内存。当一个变量被声明为volitale时,线程在写入变量时,不会把值缓存在寄存器或者高速缓存中(即本地内存),而是会把值刷新回主存,当要读取该共享变量时,线程则会先清空本地内存中的副本值,从主存中重新获取。故volitale关键字可以保证所有线程对变量访问的可见性,即对共享变量的读写都需要经过主内存,因此达到线程通过共享内存进行通信的目的。

知道了线程之间如何通过共享内存进行通信,我们改写一下上述代码,使main线程修改完run = false后,My_Thread线程中的while循环即立即停止。

实际上只需要给共享变量run加上volitale关键字即可:

private static volatile boolean run = true;

修改后的运行结果如下:

图片说明

可见,在main线程修改共享变量run的值后,即刷新回主内存。而My_Thread线程读取主内存中的run发现值为false后即停止了while循环。

实际上,也可以使用synchronized关键字来保证内存可见性问题,实现线程通信。其机制是:在synchronized修饰的同步块中,如果对一个共享变量进行操作,将会清空线程本地内存中此变量的值,并在使用这个共享变量前重新在主内存中读取这个变量的值。而在同步块执行完毕,释放锁资源时,则必须先把此共享变量同步回主内存中。

3、管道流

由于还未学习使用到,先暂时略过。。。

以上内容为本人在学习过程中所做的笔记。参考的书籍、文章或博客如下:
[1]方腾飞,魏鹏,程晓明. Java并发编程的艺术[M].机械工业出版社.
[2]霍陆续,薛宾田. Java并发编程之美[M].电子工业出版社.
[3]Simen郎. 拜托,线程间的通信真的很简单.知乎.https://zhuanlan.zhihu.com/p/138689342
[4]极乐君.Java线程内存模型,线程、工作内存、主内存.知乎.https://zhuanlan.zhihu.com/p/25474331