线程间通信
1.两个数字交替打印
1.1自定义锁
使用volatile保证数据的可见性,避免多线程访问同一个变量,它的值刷新不及时的问题;设想定义一个volatile修饰的boolean变量flag,当flag为true时,线程1打印1,同时修改flag的值,置为false,当flag为false时,线程2打印2,同时修改falg的值,置为true,依次循环即可
package cn.alibaba.thread;
public class ThreadPrint2 {
volatile static boolean flag=true;//volatile
public static void main(String[] args) {
Thread t1=new MyThread(1);
Thread t2=new MyThread(2);
t1.start();
t2.start();
}
static class MyThread extends Thread {
int printValue;
public MyThread(int printValue) {
this.printValue = printValue;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
if (flag) {
System.out.println("1");
} else {
System.out.println("2");
}
flag = !flag;
}
}
}
}
1.2 wait/notify/notify All
/**
* @author: xingkong
* @date: 2020/10/8 15:11
* @description:
*/
public class ThreadPrinter implements Runnable {
private final String name;
private final Object prev;
private final Object self;
private ThreadPrinter(String name, Object prev, Object self) {
this.name = name;
this.prev = prev;
this.self = self;
}
@Override
public void run() {
while (true) {
// 多线程并发,不能用if,必须使用whil循环
synchronized (prev) {
// 先获取 prev 锁
synchronized (self) {
// 再获取 self 锁
System.out.print(name);
//打印
self.notifyAll();
// 唤醒其他线程竞争self锁,注意此时self锁并未立即释放。
}
//此时执行完self的同步块,这时self锁才释放。
try {
prev.wait();
// 立即释放 prev锁,当前线程休眠,等待唤醒
/**
* JVM会在wait()对象锁的线程中随机选取一线程,赋予其对象锁,唤醒线程,继续执行。
*/
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Object a = new Object();
Object b = new Object();
ThreadPrinter pa = new ThreadPrinter("A", b, a);
ThreadPrinter pb = new ThreadPrinter("B", a, b);
new Thread(pa).start();
Thread.sleep(10);
//保证初始ABC的启动顺序
new Thread(pb).start();
Thread.sleep(10);
}
}
1.2ReentrantLock结合Condition
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ABC_Condition {
private static Lock lock = new ReentrantLock();
private static Condition A = lock.newCondition();
private static Condition B = lock.newCondition();
private static int count = 0;
static class ThreadA extends Thread {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
while (count % 2 != 0) {//注意这里是不等于0,也就是说在count % 3为0之前,当前线程一直阻塞状态
A.await(); // A释放lock锁
}
System.out.print("A");
count++;
B.signal();
// A执行完唤醒B线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
static class ThreadB extends Thread {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
while (count % 2 != 1) {
B.await();// B释放lock锁,当前面A线程执行后会通过B.signal()唤醒该线程
}
System.out.print("B");
count++;
A.signal();// B执行完唤醒C线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
new ThreadA().start();
new ThreadB().start();
}
}
1.3 Semaphore信号量方式
/**
* @author: xingkong
* @date: 2020/10/8 15:34
* @description:
*/
import java.util.concurrent.Semaphore;
public class MultipleThreadRotationUsingSemaphore {
public static void main(String[] args) {
PrintABCUsingSemaphore printABC = new PrintABCUsingSemaphore();
new Thread(() -> printABC.printA()).start();
new Thread(() -> printABC.printB()).start();
}
}
class PrintABCUsingSemaphore {
private Semaphore semaPhoreA = new Semaphore(1);
private Semaphore semaPhoreB = new Semaphore(0);
//private int attempts = 0;
public void printA() {
print("A", semaPhoreA, semaPhoreB);
}
public void printB() {
print("B", semaPhoreB, semaPhoreA);
}
private void print(String name, Semaphore currentSemaphore, Semaphore nextSemaphore) {
while (true){
try {
currentSemaphore.acquire();
System.out.println(Thread.currentThread().getName() +" print "+ name);
nextSemaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2.ABC交替打印
2.1wait/notify/notify All
public class Test77 {
public static class ThreadPrinter implements Runnable {
private String name;
private Object prev;
private Object self;
private ThreadPrinter(String name, Object prev, Object self) {
this.name = name;
this.prev = prev;
this.self = self;
}
@Override
public void run() {
int count = 10;
while (count > 0) {// 多线程并发,不能用if,必须使用whil循环
synchronized (prev) { // 先获取 prev 锁
synchronized (self) {// 再获取 self 锁
System.out.print(name);//打印
count--;
self.notifyAll();// 唤醒其他线程竞争self锁,注意此时self锁并未立即释放。
}
//此时执行完self的同步块,这时self锁才释放。
try {
prev.wait(); // 立即释放 prev锁,当前线程休眠,等待唤醒
/**
* JVM会在wait()对象锁的线程中随机选取一线程,赋予其对象锁,唤醒线程,继续执行。
*/
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws Exception {
Object a = new Object();
Object b = new Object();
Object c = new Object();
ThreadPrinter pa = new ThreadPrinter("A", c, a);
ThreadPrinter pb = new ThreadPrinter("B", a, b);
ThreadPrinter pc = new ThreadPrinter("C", b, c);
new Thread(pa).start();
Thread.sleep(10);
//保证初始ABC的启动顺序
new Thread(pb).start();
Thread.sleep(10);
new Thread(pc).start();
Thread.sleep(10);
}
}
可以看到程序一共定义了a,b,c三个对象锁,分别对应A、B、C三个线程。A线程最先运行,A线程按顺序申请c,a对象锁,打印操作后按顺序释放a,c对象锁,并且通过notify操作唤醒线程B。线程B首先等待获取A锁,再申请B锁,后打印B,再释放B,A锁,唤醒C。线程C等待B锁,再申请C锁,后打印C,再释放C,B锁,唤醒A。看起来似乎没什么问题,但如果你仔细想一下,就会发现有问题,就是初始条件,三个线程必须按照A,B,C的顺序来启动,但是这种假设依赖于JVM中线程调度、执行的顺序。
原实现存在的问题:
如果把上述代码放到eclipse上运行,可以发现程序虽然完成了交替打印ABC十次的任务,但是打印完毕后无法自动结束线程。这是为什么呢?原因就在于下面这段代码:
try {
prev.wait(); // 立即释放 prev锁,当前线程休眠,等待唤醒
/**
* JVM会在wait()对象锁的线程中随机选取一线程,赋予其对象锁,唤醒线程,继续执行。
*/
} catch (InterruptedException e) {
e.printStackTrace();
}
prev.wait(); 是释放prev锁并休眠线程,等待唤醒。在最后一次打印完毕后,因为count为0,无法进入while循环的同步代码
我们找到了了问题的原因,解决起来就简单了。最直接的思路就是在最后一次打印操作时在不休眠线程的情况下释放对象锁,这可以通过notifyAll操作实现。于是改进的代码如下
public static class ThreadPrinter implements Runnable {
private String name;
private Object prev;
private Object self;
private ThreadPrinter(String name, Object prev, Object self) {
this.name = name;
this.prev = prev;
this.self = self;
}
@Override
public void run() {
int count = 10;
while (count > 0) {// 多线程并发,不能用if,必须使用whil循环
synchronized (prev) { // 先获取 prev 锁
synchronized (self) {// 再获取 self 锁
System.out.print(name);//打印
count--;
self.notifyAll();// 唤醒其他线程竞争self锁,注意此时self锁并未立即释放。
}
//此时执行完self的同步块,这时self锁才释放。
try {
if (count == 0) {// 如果count==0,表示这是最后一次打印操作,通过notifyAll操作释放对象锁。
prev.notifyAll();
} else {
prev.wait(); // 立即释放 prev锁,当前线程休眠,等待唤醒
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws Exception {
Object a = new Object();
Object b = new Object();
Object c = new Object();
ThreadPrinter pa = new ThreadPrinter("A", c, a);
ThreadPrinter pb = new ThreadPrinter("B", a, b);
ThreadPrinter pc = new ThreadPrinter("C", b, c);
new Thread(pa).start();
Thread.sleep(10);
//保证初始ABC的启动顺序
new Thread(pb).start();
Thread.sleep(10);
new Thread(pc).start();
Thread.sleep(10);
}
2.2ReentrantLock结合Condition
与ReentrantLock搭配的通行方式是Condition,如下:
Condition是被绑定到Lock上的,必须使用lock.newCondition()才能创建一个Condition。从上面的代码可以看出,Synchronized能实现的通信方式,Condition都可以实现,功能类似的代码写在同一行中。这样解题思路就和第一种方法基本一致,只是采用的方法不同。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ABC_Condition {
private static Lock lock = new ReentrantLock();
private static Condition A = lock.newCondition();
private static Condition B = lock.newCondition();
private static Condition C = lock.newCondition();
private static int count = 0;
static class ThreadA extends Thread {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
while (count % 3 != 0) {//注意这里是不等于0,也就是说在count % 3为0之前,当前线程一直阻塞状态
A.await(); // A释放lock锁
}
System.out.print("A");
count++;
B.signal();
// A执行完唤醒B线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
static class ThreadB extends Thread {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
while (count % 3 != 1) {
B.await();// B释放lock锁,当前面A线程执行后会通过B.signal()唤醒该线程
}
System.out.print("B");
count++;
C.signal();// B执行完唤醒C线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
static class ThreadC extends Thread {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
while (count % 3 != 2) {
C.await();
// C释放lock锁
}
System.out.print("C");
count++;
A.signal();
// C执行完唤醒A线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
}
3.Semaphore信号量方式
Semaphore又称信号量,是操作系统中的一个概念,在Java并发编程中,信号量控制的是线程并发的数量。
public Semaphore(int permits)
其中参数permits就是允许同时运行的线程数目;
Semaphore是用来保护一个或者多个共享资源的访问,Semaphore内部维护了一个计数器,其值为可以访问的共享资源的个数。一个线程要访问共享资源,先获得信号量,如果信号量的计数器值大于1,意味着有共享资源可以访问,则使其计数器值减去1,再访问共享资源。如果计数器值为0,线程进入休眠。当某个线程使用完共享资源后,释放信号量,并将信号量内部的计数器加1,之前进入休眠的线程将被唤醒并再次试图获得信号量。
Semaphore使用时需要先构建一个参数来指定共享资源的数量,Semaphore构造完成后即是获取Semaphore、共享资源使用完毕后释放Semaphore。
import java.util.concurrent.Semaphore;
public class ABC_Semaphore {
// 以A开始的信号量,初始信号量数量为1
private static Semaphore A = new Semaphore(1);
// B、C信号量,A完成后开始,初始信号数量为0
private static Semaphore B = new Semaphore(0);
private static Semaphore C = new Semaphore(0);
static class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
A.acquire();
// A获取信号执行,A信号量减1,当A为0时将无法继续获得该信号量
System.out.print("A");
B.release();
// B释放信号,B信号量加1(初始为0),此时可以获取B信号量
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ThreadB extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
B.acquire();
System.out.print("B");
C.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ThreadC extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
C.acquire();
System.out.println("C");
A.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
}
A1B2C3D4E5F6G7H8I9J10K11L12M13N14O15P16Q17R18S19T20U21V22W23X24Y25Z26
import java.util.concurrent.locks.LockSupport;
/**
* @author: xingkong
* @date: 2020/10/8 15:45
* @description:
*/
public class LockSupportDemo1 {
static Thread t1 = null;
static Thread t2 = null;
public static void main(String[] args) {
t1 = new Thread(() -> {
for (int i = 65; i < 91; i++) {
System.out.print((char) i);
LockSupport.unpark(t2);
LockSupport.park();
}
});
t2 = new Thread(() -> {
for (int i = 1; i < 27; i++) {
LockSupport.park();
System.out.print(i);
LockSupport.unpark(t1);
}
});
t1.start();
t2.start();
}
}
参考文献
http://edisonxu.com/2017/03/02/java-thread-communication.html
https://juejin.im/post/5d595558e51d4561ac7bccc4#heading-10
https://www.jianshu.com/p/5b9fdae43335
https://crossoverjie.top/2018/03/16/java-senior/thread-communication/