1. 概述
在本文中,我们将研究java.util.concurrent包中的DelayQueue构造。这是一个可以在生产者-消费者程序中使用的阻塞队列。
它有一个非常有用的特性-当消费者想要从队列中取出一个元素时,他们只能在该特定元素的延迟到期时才能取出该元素。
2. 为延迟队列中的元素实现延迟
我们要放入DelayQueue的每个元素都需要实现Delayed接口。假设我们要创建一个DelayObject类,该类的实例将被放入DelayQueue中。
我们将String类型的data和delayInMilliseconds作为参数传递给它的构造函数:
public class DelayObject implements Delayed {
private final String data;
private final long startTime;
DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
}
我们定义了一个startTime,这是应该从队列中消费元素的时间。接下来,我们需要实现getDelay()方法-它应该返回给定时间单位内与该对象关联的剩余延迟。
因此,我们需要使用TimeUnit.convert()方法以适当的TimeUnit返回剩余延迟:
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
当消费者试图从队列中取出一个元素时,DelayQueue将执行getDelay()来确定是否允许从队列中返回该元素。如果getDelay()方法返回零或负数,则意味着可以从队列中检索它。
我们还需要实现compareTo()方法,因为DelayQueue中的元素会根据过期时间进行排序。首先过期的元素保留在队列的头部,过期时间最长的元素保留在队列的尾部:
@Override
public int compareTo(@NotNull Delayed o) {
return Ints.saturatedCast(this.startTime - ((DelayObject) o).startTime);
}
3. 延迟队列消费者和生产者
为了能够测试我们的DelayQueue,我们需要实现生产者和消费者的逻辑。生产者类将队列、要生产的元素数量以及每条消息的延迟(以毫秒为单位)作为参数。
然后,当run()方法被调用时,它将元素放入队列中,并在每次放入后休眠500毫秒:
public class DelayQueueProducer implements Runnable {
private final BlockingQueue<DelayObject> queue;
private final Integer numberOfElementsToProduce;
private final Integer delayOfEachProducedMessageMilliseconds;
// standard constructor
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object = new DelayObject(UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object = " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
消费者的实现非常相似,但它还通过AtomicInteger记录了已消费的消息数量:
public class DelayQueueConsumer implements Runnable {
private final BlockingQueue<DelayObject> queue;
private final Integer numberOfElementsToTake;
final AtomicInteger numberOfConsumedElements = new AtomicInteger();
// standard constructors
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
4. 延迟队列使用测试
为了测试DelayQueue的行为,我们将创建一个生产者线程和一个消费者线程。
生产者将以500毫秒的延迟将两个对象放入队列中,该测试断言消费者消费了两条消息:
@Test
void givenDelayQueue_whenProduceElement_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// given
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
我们可以观察到运行这个程序会产生以下输出:
Put object = {data='c775d902-181c-462f-b8ff-d2bc6d7c7641', startTime=1666764870688}
Consumer take: {data='c775d902-181c-462f-b8ff-d2bc6d7c7641', startTime=1666764870688}
Put object = {data='b4fd0ff7-3a92-402f-baf9-d6b0682da830', startTime=1666764871205}
Consumer take: {data='b4fd0ff7-3a92-402f-baf9-d6b0682da830', startTime=1666764871205}
生产者生产对象,并在一段时间后消费延迟到期的第一个对象。
第二个元素也出现了同样的情况。
5. 消费者无法在给定时间内消费
假设我们有一个生产者正在生产一个将在10秒后过期的元素:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
当我们运行测试时,它将在5秒后终止。由于DelayQueue的特性,消费者将无法消费队列中的消息,因为元素尚未过期:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
请注意,消费者的numberOfConsumedElements的值为零,因为消费者没有消费任何元素。
6. 生成立即过期的元素
当延迟消息getDelay()方法的实现返回一个负数时,这意味着给定的元素已经过期。在这种情况下,生产者将立即消费该元素。
我们可以测试产生负延迟元素的情况:
int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
当我们启动测试用例时,消费者将立即消费元素,因为它已经过期:
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);
7. 总结
在本文中,我们研究了java.util.concurrent包中的DelayQueue构造。
我们实现了一个从队列中生产和消费的延迟元素。
我们利用DelayQueue的实现来消费已过期的元素。
与往常一样,本教程的完整源代码可在GitHub上获得。