1. 保护性暂停定义

即 Guarded Suspension,用在一个线程等待另一个线程执行结果。

重点:

  • 有一个线程的结果需要传递给另一个线程,让它们关联同一个GuardedObject
  • 如果有结果连续不断由一个线程传递另一个线程,则需要考虑消息队列
  • JDK中,join(),Future采用的就是这种模式
  • 因为这是一方要等待到另一方的结果,所有这是一种同步模式。

图示:

2. 实现

根据上图实现一个t1线程等待t2线程下载响应

@Slf4j(topic = "c.TestGuardedObject")
public class TestGuardedObject {
   
    public static void main(String[] args) {
   
        GuardedObject guardedObject = new GuardedObject();
        new Thread(()->{
   
            List<String> list = (List<String>) guardedObject.get();
            log.debug("t1获取到结果");
            log.debug("文件大小:" + list.size());
        },"t1").start();

        new Thread(()->{
   
            try {
   
                List<String> download = download();
                log.debug("t2下载完成");
                guardedObject.complate(download);
                log.debug("t2任务完成");
            } catch (IOException e) {
   
                e.printStackTrace();
            }
        },"t2").start();
    }

}

class GuardedObject{
   

    private Object response;
    private final Object lock = new Object();

    //去GuardedObject取结果
    public Object get(){
   
        synchronized (lock){
   
            //没有结果则等待
            while(response == null){
   
                try {
   
                    lock.wait();
                } catch (InterruptedException e) {
   
                    e.printStackTrace();
                }
            }
        }
        return response;
    }

    //给GuardedObject设置结果
    void complate(Object response){
   
        synchronized (lock){
   
            this.response = response;
            lock.notifyAll();
        }
    }
}

下载工具类:

public class Downloader {
   
    public static List<String> download() throws IOException {
   
        HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
        List<String> lines = new ArrayList<>();
        try (BufferedReader reader =
                     new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
   
            String line;
            while ((line = reader.readLine()) != null) {
   
                lines.add(line);
            }
        }
        return lines;
    }
}

带超时版的GuardedObject

等待不能一直等待啊是吧,如果t2要下载一年,t1还要等待一年吗?
如何设置超时等待呢?

@Slf4j(topic = "c.TestGuardedObject")
public class TestGuardedObject {
   
public static void main(String[] args) {
   
        GuardedObject guardedObject = new GuardedObject();
        new Thread(()->{
   
            log.debug("开始");
            Object o = guardedObject.get(2000);
            log.debug("结果:{}",o);
        },"t1").start();

        new Thread(()->{
   
            log.debug("开始");
            try {
   
                Thread.sleep(3000);
                guardedObject.complate(1111);
            } catch (InterruptedException e) {
   
                e.printStackTrace();
            }
            log.debug("结束");
        },"t2").start();
    }

class GuardedObject{
   

    private Object response;
    private final Object lock = new Object();

    //去GuardedObject取结果,有点不好理解
    public Object get(long timeout){
   
        synchronized (lock){
   
            //记录最初的是按
            long begin = System.currentTimeMillis();
            long passed = 0;
            //没有结果则等待
            while(response == null){
   
                long waitTime = timeout - passed;
                if(waitTime <= 0){
   
                    break;
                }
                try {
   
                    //如果不满足waitTime <= 0,说明还需要等待
                   //用waitTime不用timeout是为了防止被打断,时间反被延迟
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
   
                    e.printStackTrace();
                }
                //经历的时间
                passed = System.currentTimeMillis() - begin;
            }
        }
        return response;
    }

    //给GuardedObject设置结果
    void complate(Object response){
   
        synchronized (lock){
   
            this.response = response;
            lock.notifyAll();
        }
    }

}

join()原理

观察join的源码实现:

 public final synchronized void join(long millis)
 throws InterruptedException {
   
     long base = System.currentTimeMillis();
     long now = 0;

     if (millis < 0) {
   
         throw new IllegalArgumentException("timeout value is negative");
     }

     if (millis == 0) {
   
         while (isAlive()) {
   
             wait(0);
         }
     } else {
   
         while (isAlive()) {
   
             long delay = millis - now;
             if (delay <= 0) {
   
                 break;
             }
             wait(delay);
             now = System.currentTimeMillis() - base;
         }
     }
 }

当线程还活着的时候

 while (isAlive()) {
   
             long delay = millis - now;
             if (delay <= 0) {
   
                 break;
             }
             wait(delay);
             now = System.currentTimeMillis() - base;
         }

这段代码的也提现了保护性暂停模式的使用。

多任务版的GuardedObject


类似于图上的,此时存在多个GuardedObject,由唯一的id标识,并且每一个GuardedObject都连接着各自的两个线程进行通信,设计模式就是同步模式之保护性暂停

@Slf4j(topic = "c.TestGuardedObject")
public class TestGuardedObject {
   
    public static void main(String[] args) {
   

        //3个人收信
        for (int i = 0; i < 3; i++) {
   
            new Person().start();
        }
        Sleeper.sleep(1);
        //读取信内容
        for (Integer id : Mailboxes.getIds()) {
   
            new PostMan(id, "内容" + id).start();
        }

    }

}
@Slf4j(topic = "c.Person")
class Person extends Thread{
   
    @Override
    public void run() {
   
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.debug("开始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);

    }
}
@Slf4j(topic = "c.PostMan")
class PostMan extends Thread{
   

    private int id;
    private String mail;

    public PostMan(int id, String mail) {
   
        this.id = id;
        this.mail = mail;
    }

    @Override
    public void run() {
   
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}", id, mail);
        guardedObject.complate(mail);
    }
}

//信箱类,解耦收信人和邮递员
class Mailboxes{
   
    //保证信箱id唯一
    private static int id;

    private static Map<Integer,GuardedObject> map = new Hashtable<>();

    //根据id取信
    public static GuardedObject getGuardedObject(int id){
   
        return map.remove(id);
    }

    //生成一个唯一信封的id
    public static synchronized int generatorId(){
   
        return id ++;
    }

    //创建信件
    public static GuardedObject createGuardedObject(){
   
        GuardedObject guardedObject = new GuardedObject(generatorId());
        map.put(guardedObject.getId(),guardedObject);
        return guardedObject;
    }
    //获取信件id集合
    public static Set<Integer> getIds(){
   
        return map.keySet();
    }
}

class GuardedObject{
   

    //新增id来标识GuardedObject
    private Integer id;

    public Integer getId() {
   
        return id;
    }

    public void setId(Integer id) {
   
        this.id = id;
    }

    public GuardedObject(Integer id) {
   
        this.id = id;
    }

    private Object response;
    private final Object lock = new Object();

    //去GuardedObject取结果
    public Object get(long timeout){
   
        synchronized (lock){
   
            //记录最初的是按
            long begin = System.currentTimeMillis();
            long passed = 0;
            //没有结果则等待
            while(response == null){
   
                long waitTime = timeout - passed;
                if(waitTime <= 0){
   
                    break;
                }
                try {
   
                    //如果不满足waitTime <= 0,说明还需要等待
                    //用waitTime不用timeout是为了防止被打断,时间反被延迟
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
   
                    e.printStackTrace();
                }
                //经历的时间
                passed = System.currentTimeMillis() - begin;
            }
        }
        return response;
    }

    //给GuardedObject设置结果
    void complate(Object response){
   
        synchronized (lock){
   
            this.response = response;
            lock.notifyAll();
        }
    }

}

输出:

从输出结果来看都是一一对应的,它们内部连接的桥梁就是GuardedObject的唯一id。

学习资料:https://www.bilibili.com/video/BV16J411h7Rd?p=105&spm_id_from=pageDriver