📅  最后修改于: 2023-12-03 15:16:22.715000             🧑  作者: Mango
DelayQueue是Java中的一个带有延迟时间的队列,延迟时间指的是在指定的时间后才能够获取队列中的元素。可以用来实现定时任务、消息发送等。
该队列的元素必须实现Delayed接口,该接口只有一个方法:getDelay(TimeUnit unit),表示队列到期的剩余时间。
DelayQueue内部采用PriorityQueue进行存储,每个元素的延迟时间会影响其在队列中的排序。
offer()方法为DelayQueue提供了添加元素的功能。它的源码实现如下:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (e == null)
throw new NullPointerException();
if (!shutdown) {
final long delay = e.getDelay(NANOSECONDS);
if (delay < 0)
delayQueue.offer(e);
else {
final long now = System.nanoTime();
if (delay == Long.MAX_VALUE)
delay = now;
else
delay += now;
if (delay < 0) // overflow
delay = Long.MAX_VALUE;
q.offer(new DelayedTask(e, delay));
if (q.peek() == e)
available.signalAll();
}
return true;
}
throw new RejectedExecutionException("Shutdown");
} finally {
lock.unlock();
}
}
可以看出,offer()方法是通过ReentrantLock加锁实现的,该锁保证了对队列操作的原子性。
当添加的元素是null时,会抛出NullPointerException异常;当队列已经被shutdown时,会抛出RejectedExecutionException异常。
当元素的延迟时间小于0时,将直接添加到队列delayQueue中,否则会将元素封装为一个DelayedTask并加入到PriorityQueue中。
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExample {
static class Message implements Delayed {
private String content;
private long delayTime;
public Message(String content, long delayTime) {
this.content = content;
this.delayTime = delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return content.compareTo(((Message) o).content);
}
public String getContent() {
return content;
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<Message> queue = new DelayQueue<>();
queue.offer(new Message("message 1", System.currentTimeMillis() + 10000));
queue.offer(new Message("message 2", System.currentTimeMillis() + 5000));
queue.offer(new Message("message 3", System.currentTimeMillis() + 15000));
queue.offer(new Message("message 4", System.currentTimeMillis() + 7000));
queue.offer(new Message("message 5", System.currentTimeMillis() + 2000));
while (!queue.isEmpty()) {
Message message = queue.take();
System.out.println("got message: " + message.getContent());
}
}
}
该示例模拟了消息发送,将需要发送的消息添加进DelayQueue中,每个消息有不同的延迟时间。
使用queue.take()获取队列中的元素时,若元素的延迟时间还未到,则会被阻塞。
输出结果:
got message: message 5
got message: message 2
got message: message 4
got message: message 1
got message: message 3
可以看到,消息被按照延迟时间依次取出。