1. 保护性暂停定义
即 Guarded Suspension,用在一个线程等待另一个线程执行结果。
重点:
- 有一个线程的结果需要传递给另一个线程,让它们关联同一个GuardedObject
- 如果有结果连续不断由一个线程传递另一个线程,则需要考虑消息队列
- JDK中,join(),Future采用的就是这种模式
- 因为这是一方要等待到另一方的结果,所有这是一种同步模式。
图示:
2. 实现
根据上图实现一个t1线程等待t2线程下载响应
@Slf4j(topic = "c.TestGuardedObject")
public class TestGuardedObject {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(()->{
List<String> list = (List<String>) guardedObject.get();
log.debug("t1获取到结果");
log.debug("文件大小:" + list.size());
},"t1").start();
new Thread(()->{
try {
List<String> download = download();
log.debug("t2下载完成");
guardedObject.complate(download);
log.debug("t2任务完成");
} catch (IOException e) {
e.printStackTrace();
}
},"t2").start();
}
}
class GuardedObject{
private Object response;
private final Object lock = new Object();
//去GuardedObject取结果
public Object get(){
synchronized (lock){
//没有结果则等待
while(response == null){
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return response;
}
//给GuardedObject设置结果
void complate(Object response){
synchronized (lock){
this.response = response;
lock.notifyAll();
}
}
}
下载工具类:
public class Downloader {
public static List<String> download() throws IOException {
HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
List<String> lines = new ArrayList<>();
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
lines.add(line);
}
}
return lines;
}
}
带超时版的GuardedObject
等待不能一直等待啊是吧,如果t2要下载一年,t1还要等待一年吗?
如何设置超时等待呢?
@Slf4j(topic = "c.TestGuardedObject")
public class TestGuardedObject {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(()->{
log.debug("开始");
Object o = guardedObject.get(2000);
log.debug("结果:{}",o);
},"t1").start();
new Thread(()->{
log.debug("开始");
try {
Thread.sleep(3000);
guardedObject.complate(1111);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("结束");
},"t2").start();
}
class GuardedObject{
private Object response;
private final Object lock = new Object();
//去GuardedObject取结果,有点不好理解
public Object get(long timeout){
synchronized (lock){
//记录最初的是按
long begin = System.currentTimeMillis();
long passed = 0;
//没有结果则等待
while(response == null){
long waitTime = timeout - passed;
if(waitTime <= 0){
break;
}
try {
//如果不满足waitTime <= 0,说明还需要等待
//用waitTime不用timeout是为了防止被打断,时间反被延迟
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//经历的时间
passed = System.currentTimeMillis() - begin;
}
}
return response;
}
//给GuardedObject设置结果
void complate(Object response){
synchronized (lock){
this.response = response;
lock.notifyAll();
}
}
}
join()原理
观察join的源码实现:
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
当线程还活着的时候
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
这段代码的也提现了保护性暂停模式的使用。
多任务版的GuardedObject
类似于图上的,此时存在多个GuardedObject,由唯一的id标识,并且每一个GuardedObject都连接着各自的两个线程进行通信,设计模式就是同步模式之保护性暂停
@Slf4j(topic = "c.TestGuardedObject")
public class TestGuardedObject {
public static void main(String[] args) {
//3个人收信
for (int i = 0; i < 3; i++) {
new Person().start();
}
Sleeper.sleep(1);
//读取信内容
for (Integer id : Mailboxes.getIds()) {
new PostMan(id, "内容" + id).start();
}
}
}
@Slf4j(topic = "c.Person")
class Person extends Thread{
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j(topic = "c.PostMan")
class PostMan extends Thread{
private int id;
private String mail;
public PostMan(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complate(mail);
}
}
//信箱类,解耦收信人和邮递员
class Mailboxes{
//保证信箱id唯一
private static int id;
private static Map<Integer,GuardedObject> map = new Hashtable<>();
//根据id取信
public static GuardedObject getGuardedObject(int id){
return map.remove(id);
}
//生成一个唯一信封的id
public static synchronized int generatorId(){
return id ++;
}
//创建信件
public static GuardedObject createGuardedObject(){
GuardedObject guardedObject = new GuardedObject(generatorId());
map.put(guardedObject.getId(),guardedObject);
return guardedObject;
}
//获取信件id集合
public static Set<Integer> getIds(){
return map.keySet();
}
}
class GuardedObject{
//新增id来标识GuardedObject
private Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public GuardedObject(Integer id) {
this.id = id;
}
private Object response;
private final Object lock = new Object();
//去GuardedObject取结果
public Object get(long timeout){
synchronized (lock){
//记录最初的是按
long begin = System.currentTimeMillis();
long passed = 0;
//没有结果则等待
while(response == null){
long waitTime = timeout - passed;
if(waitTime <= 0){
break;
}
try {
//如果不满足waitTime <= 0,说明还需要等待
//用waitTime不用timeout是为了防止被打断,时间反被延迟
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//经历的时间
passed = System.currentTimeMillis() - begin;
}
}
return response;
}
//给GuardedObject设置结果
void complate(Object response){
synchronized (lock){
this.response = response;
lock.notifyAll();
}
}
}
输出:
从输出结果来看都是一一对应的,它们内部连接的桥梁就是GuardedObject的唯一id。
学习资料:https://www.bilibili.com/video/BV16J411h7Rd?p=105&spm_id_from=pageDriver