DelayQueue指南

2023/06/07

1. 概述

在本文中,我们将研究java.util.concurrent包中的DelayQueue构造。这是一个可以在生产者-消费者程序中使用的阻塞队列。

它有一个非常有用的特性-当消费者想要从队列中取出一个元素时,他们只能在该特定元素的延迟到期时才能取出该元素

2. 为延迟队列中的元素实现延迟

我们要放入DelayQueue的每个元素都需要实现Delayed接口。假设我们要创建一个DelayObject类,该类的实例将被放入DelayQueue中。

我们将String类型的data和delayInMilliseconds作为参数传递给它的构造函数:

public class DelayObject implements Delayed {
    private final String data;
    private final long startTime;

    DelayObject(String data, long delayInMilliseconds) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delayInMilliseconds;
    }
}

我们定义了一个startTime,这是应该从队列中消费元素的时间。接下来,我们需要实现getDelay()方法-它应该返回给定时间单位内与该对象关联的剩余延迟。

因此,我们需要使用TimeUnit.convert()方法以适当的TimeUnit返回剩余延迟:

@Override
public long getDelay(TimeUnit unit) {
    long diff = startTime - System.currentTimeMillis();
    return unit.convert(diff, TimeUnit.MILLISECONDS);
}

当消费者试图从队列中取出一个元素时,DelayQueue将执行getDelay()来确定是否允许从队列中返回该元素。如果getDelay()方法返回零或负数,则意味着可以从队列中检索它。

我们还需要实现compareTo()方法,因为DelayQueue中的元素会根据过期时间进行排序。首先过期的元素保留在队列的头部,过期时间最长的元素保留在队列的尾部:

@Override
public int compareTo(@NotNull Delayed o) {
    return Ints.saturatedCast(this.startTime - ((DelayObject) o).startTime);
}

3. 延迟队列消费者和生产者

为了能够测试我们的DelayQueue,我们需要实现生产者和消费者的逻辑。生产者类将队列、要生产的元素数量以及每条消息的延迟(以毫秒为单位)作为参数。

然后,当run()方法被调用时,它将元素放入队列中,并在每次放入后休眠500毫秒:

public class DelayQueueProducer implements Runnable {

    private final BlockingQueue<DelayObject> queue;
    private final Integer numberOfElementsToProduce;
    private final Integer delayOfEachProducedMessageMilliseconds;

    // standard constructor

    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToProduce; i++) {
            DelayObject object = new DelayObject(UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
            System.out.println("Put object = " + object);
            try {
                queue.put(object);
                Thread.sleep(500);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

消费者的实现非常相似,但它还通过AtomicInteger记录了已消费的消息数量:

public class DelayQueueConsumer implements Runnable {
    private final BlockingQueue<DelayObject> queue;
    private final Integer numberOfElementsToTake;
    final AtomicInteger numberOfConsumedElements = new AtomicInteger();

    // standard constructors

    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToTake; i++) {
            try {
                DelayObject object = queue.take();
                numberOfConsumedElements.incrementAndGet();
                System.out.println("Consumer take: " + object);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

4. 延迟队列使用测试

为了测试DelayQueue的行为,我们将创建一个生产者线程和一个消费者线程。

生产者将以500毫秒的延迟将两个对象放入队列中,该测试断言消费者消费了两条消息:

@Test
void givenDelayQueue_whenProduceElement_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
    // given
    ExecutorService executor = Executors.newFixedThreadPool(2);
    
    BlockingQueue<DelayObject> queue = new DelayQueue<>();
    int numberOfElementsToProduce = 2;
    int delayOfEachProducedMessageMilliseconds = 500;
    DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
    DelayQueueProducer producer = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

    // when
    executor.submit(producer);
    executor.submit(consumer);

    // then
    executor.awaitTermination(5, TimeUnit.SECONDS);
    executor.shutdown();
    
    assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}

我们可以观察到运行这个程序会产生以下输出:

Put object = {data='c775d902-181c-462f-b8ff-d2bc6d7c7641', startTime=1666764870688}
Consumer take: {data='c775d902-181c-462f-b8ff-d2bc6d7c7641', startTime=1666764870688}
Put object = {data='b4fd0ff7-3a92-402f-baf9-d6b0682da830', startTime=1666764871205}
Consumer take: {data='b4fd0ff7-3a92-402f-baf9-d6b0682da830', startTime=1666764871205}

生产者生产对象,并在一段时间后消费延迟到期的第一个对象。

第二个元素也出现了同样的情况。

5. 消费者无法在给定时间内消费

假设我们有一个生产者正在生产一个将在10秒后过期的元素:

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

当我们运行测试时,它将在5秒后终止。由于DelayQueue的特性,消费者将无法消费队列中的消息,因为元素尚未过期:

executor.submit(producer);
executor.submit(consumer);

executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);

请注意,消费者的numberOfConsumedElements的值为零,因为消费者没有消费任何元素。

6. 生成立即过期的元素

当延迟消息getDelay()方法的实现返回一个负数时,这意味着给定的元素已经过期。在这种情况下,生产者将立即消费该元素。

我们可以测试产生负延迟元素的情况:

int numberOfElementsToProduce = 1;
int delayOfEachProducedMessageMilliseconds = -10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

当我们启动测试用例时,消费者将立即消费元素,因为它已经过期:

executor.submit(producer);
executor.submit(consumer);

executor.awaitTermination(1, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 1);

7. 总结

在本文中,我们研究了java.util.concurrent包中的DelayQueue构造。

我们实现了一个从队列中生产和消费的延迟元素。

我们利用DelayQueue的实现来消费已过期的元素。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章