看消息中间件的时候,涉及到延时消息的场景,在一篇文章中提到了Java中的DelayQueue也可以实现延时队列的效果,研究一下。
1.
基础信息
1)一个泛型类,最早出现版本在JDK5,属于java.util.concurrent包下,直接继承了AbstractQueue<E>类,直接实现BlockingQueue<E>接口。
2)泛型对象对应的类要实现接口Delayed才可以。
3)一个空参构成,一个有参构造,17个public方法可用,看起来也不是很难懂。
4)因为间接实现了Collection接口,所以它也是集合下的一员,属于Queue这个系列下的实现类,集合的一些方法它也可以用。
5)使用put和offer方法都可以向队列中放入数据,看了下源码,put方法也是调用offer方法,在offer方法中使用了lock锁,所以这里的操作是线程安全的。最终使用的是类PriorityQueue中的offer方法,底层最终使用数组来存放对象。所以可以说这个类的底层数据结构是数组。
2.
看起来不复杂,参考了几篇文章,确实可以在生产者-消费者的场景下去使用。生产者把对象放到队列中,消费者从中取出即可,通过设置过期时间,可以达到延时的效果,但里面好像有很多需要注意的地方,这个在以后使用时再细致看吧。
3.
来看一个简单的使用案例。
package com.itdr.demo; import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class A implements Delayed { private String name; private long avaibleTime; public A(String name, long delayTime){ this.name=name; //avaibleTime = 当前时间+ delayTime this.avaibleTime=delayTime + System.currentTimeMillis(); } @Override public long getDelay(TimeUnit unit) { //判断avaibleTime是否大于当前系统时间,并将结果转换成MILLISECONDS long diffTime= avaibleTime- System.currentTimeMillis(); return unit.convert(diffTime,TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { //compareTo用在DelayedUser的排序 return (int)(this.avaibleTime - ((A) o).getAvaibleTime()); } }2)创建一个生产者,没什么套路,就是用线程定期创建A对象
package com.itdr.demo; import java.util.Random; import java.util.concurrent.DelayQueue; public class DelayedQueueProducer implements Runnable { // 队列 private DelayQueue<A> delayQueue; // 生产次数 private Integer messageCount; // 过期时间 private long delayedTime; @Override public void run() { for (int i = 0; i < messageCount; i++) { try { A a = new A( new Random().nextInt(1000)+"", delayedTime); delayQueue.put(a); System.out.println("生产一个A"); Thread.sleep(500); } catch (InterruptedException e) { } } } }3)创建一个消费者,也是线程,跟生产者交替操作,便于演示
package com.itdr.demo; import java.util.concurrent.DelayQueue; public class DelayedQueueConsumer implements Runnable { private DelayQueue<A> delayQueue; private int messageCount; @Override public void run() { for (int i = 0; i < messageCount; i++) { try { A element = delayQueue.take(); System.out.println("消费一个A"); } catch (InterruptedException e) { } } } }
4)然后就是调用线程演示就可以了,正常情况下,会生产一个,消费一个这样。
使用确实不难,但看了好几篇文章,都说要格外注意很多细节,在这不一一列出了,本文算是一个普及操作,知道这是个啥东西,大概能干啥,算是对MQ的一个小扩展了解,所以不做深究了。
有什么好想法,可以留言哦~
参考文章:
java中DelayQueue的使用
https://www.cnblogs.com/flydean/p/java-delayqueue.html