📅  最后修改于: 2023-12-03 15:31:56.935000             🧑  作者: Mango
Java中的DelayQueue类是一个基于优先级队列PriorityQueue实现的无界阻塞队列,其中每个元素都有一个过期时间,归根结底是个定时任务队列。 延迟队列的设计思想是通过保证队列元素按照指定时间顺序提供给消费者,让生产者将时间序列控制好之后,便可利用空余时间做一些自己想做的事情,而不用担心是否超时。
Java中的DelayQueue类要求每个元素必须实现java.util.concurrent.Delayed接口. Delayed继承了java.lang.Comparable接口,只有当元素在DelayQueue中到期时,才能从队列中取走该元素。如果一个元素的延迟时间为0或负数,就表明该元素已经到期,可以从延迟队列中取走它。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
元素必须能够实现Delayed接口,同时包含一个有效的到期时间。DelayQueue内部按照元素到期时间进行排序,队首元素是最快到期的元素。
take是DelayQueue提供的一个阻塞方法,如果队列中没到期的元素则会阻塞,否则在队首元素到期时,take方法会返回该元素。
public E take() throws InterruptedException;
既然DelayQueue是一个定时任务队列,那么自然就需要实现这个队列的生产者和消费者。生产者将任务放到队列中,消费者从队列中取出任务并执行。
Java中Executor框架提供了ScheduledExecutorService作为一个生产者,该服务可以实现定时任务的调度。其具体使用方法如下:
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class DelayQueueExample {
private static AtomicInteger index = new AtomicInteger(0);
private static DelayQueue<DelayElement> delayQueue = new DelayQueue<>();
public static void main(String[] args) throws InterruptedException {
startProduce(3);
startConsumer();
}
/**
* 启动生产者线程
*
* @param n 生产者的个数
*/
private static void startProduce(int n) {
for (int i = 1; i <= n; i++) {
int producerId = i;
new Thread(() -> {
while (true) {
try {
// 每个生产者都随机一个过期时间,并将其放入队列中
int delay = (int) (Math.random() * 1000);
delayQueue.put(new DelayElement(producerId, "消息" + index.getAndIncrement(), delay));
Thread.sleep((int) (Math.random() * 500));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
/**
* 启动消费者线程
*/
private static void startConsumer() {
new Thread(() -> {
while (true) {
try {
// 如果队列中有到期的元素则会在队首阻塞,直到有元素到期
DelayElement element = delayQueue.take();
System.out.println("消费者 " + element.getProducerId() + "消费了一条消息:" + element.getMessage());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
/**
* Delayed元素,包含生产者ID,消息内容,到期时间
*/
private static class DelayElement implements Delayed {
private int producerId;
private String message;
private long expiredTime;
public DelayElement(int producerId, String message, long delayTime) {
this.producerId = producerId;
this.message = message;
this.expiredTime = System.currentTimeMillis() + delayTime;
}
public int getProducerId() {
return producerId;
}
public String getMessage() {
return message;
}
/**
* 获取延迟时间到期时间的剩余时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expiredTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 将Delayed元素放入优先级队列PriorityQueue中,用于按到期时间排序
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}
}
上述代码中,先启动了3个生产者线程,每个生产者线程会生成一条消息,并随机一个到期时间(毫秒),并将消息放入延迟队列中。 同时,启动一个消费者线程,该线程不断尝试从队列中取出队列头部到期的元素,如果队列中无到期元素,将会在队首一直阻塞,直到队列到期时间有所改变。
Java中的DelayQueue类提供了一种优雅的定时任务队列实现方式,开发者只需要将延迟元素放入延迟队列中, 并利用一个线程不断从队列中取出到期的元素并执行即可实现类似定时任务的效果。 DelayQueue在实现实时性任务时非常有用,例如网页缓存、超时控制等。