看消息中间件的时候,涉及到延时消息的场景,在一篇文章中提到了Java中的DelayQueue也可以实现延时队列的效果,研究一下。



1.

基础信息

1)一个泛型类,最早出现版本在JDK5,属于java.util.concurrent包下,直接继承了AbstractQueue<E>类,直接实现BlockingQueue<E>接口。

2)泛型对象对应的类要实现接口Delayed才可以。

3)一个空参构成,一个有参构造,17个public方法可用,看起来也不是很难懂。

4)因为间接实现了Collection接口,所以它也是集合下的一员,属于Queue这个系列下的实现类,集合的一些方法它也可以用。

5)使用put和offer方法都可以向队列中放入数据,看了下源码,put方法也是调用offer方法,在offer方法中使用了lock锁,所以这里的操作是线程安全的。最终使用的是类PriorityQueue中的offer方法,底层最终使用数组来存放对象。所以可以说这个类的底层数据结构是数组。


2.

看起来不复杂,参考了几篇文章,确实可以在生产者-消费者的场景下去使用。生产者把对象放到队列中,消费者从中取出即可,通过设置过期时间,可以达到延时的效果,但里面好像有很多需要注意的地方,这个在以后使用时再细致看吧。


3.

来看一个简单的使用案例。


1)首先准备放入队列的对象,需要实现接口Delayed,并且重写getDelay和compareTo方法,对于compareTo方法要格外注意,操作不当,可能会造成队列中的对象数据排序错乱,导致延时的效果失效。
package com.itdr.demo;

import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class A implements Delayed {

    private String name;
    private long avaibleTime;

    public A(String name, long delayTime){
        this.name=name;
        //avaibleTime = 当前时间+ delayTime
        this.avaibleTime=delayTime + System.currentTimeMillis();

    }

    @Override
    public long getDelay(TimeUnit unit) {
        //判断avaibleTime是否大于当前系统时间,并将结果转换成MILLISECONDS
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        //compareTo用在DelayedUser的排序
        return (int)(this.avaibleTime - ((A) o).getAvaibleTime());
    }
}
2)创建一个生产者,没什么套路,就是用线程定期创建A对象
package com.itdr.demo;

import java.util.Random;
import java.util.concurrent.DelayQueue;

public class DelayedQueueProducer implements Runnable {
    // 队列
    private DelayQueue<A> delayQueue;
    // 生产次数
    private Integer messageCount;
    // 过期时间
    private long delayedTime;

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                A a = new A(
                        new Random().nextInt(1000)+"", delayedTime);
                delayQueue.put(a);
                System.out.println("生产一个A");
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
        }
    }
}
3)创建一个消费者,也是线程,跟生产者交替操作,便于演示
package com.itdr.demo;

import java.util.concurrent.DelayQueue;

public class DelayedQueueConsumer implements Runnable {

    private DelayQueue<A> delayQueue;
    private int messageCount;

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                A element = delayQueue.take();
                System.out.println("消费一个A");
            } catch (InterruptedException e) {
            }
        }
    }
}

4)然后就是调用线程演示就可以了,正常情况下,会生产一个,消费一个这样。

使用确实不难,但看了好几篇文章,都说要格外注意很多细节,在这不一一列出了,本文算是一个普及操作,知道这是个啥东西,大概能干啥,算是对MQ的一个小扩展了解,所以不做深究了。


有什么好想法,可以留言哦~


参考文章:

java中DelayQueue的使用

https://www.cnblogs.com/flydean/p/java-delayqueue.html