获取Apache Kafka主题中的最后N条消息

2025/04/03

1. 简介

在这个简短的教程中,我们将了解如何从Apache Kafka主题中检索最后N条消息。

在本文的第一部分中,我们将重点介绍执行此操作所需的先决条件。在第二部分中,我们将使用Kafka Java API库构建一个小型实用程序来使用Java读取消息。最后,我们将提供简短的指导,以便使用KafkaCat从命令行实现相同的结果。

2. 先决条件

要从Kafka主题中检索最后N条消息,只需从明确定义的偏移量开始消费消息即可,Kafka主题中的偏移量表示消费者的当前位置。在上一篇文章中,我们已经了解了如何利用consumer.seekToEnd()方法在Apache Kafka主题中获取特定数量的消息

考虑相同的功能,我们可以通过执行一个简单的减法来计算正确的偏移量:offset = lastOffset - N,然后我们可以从这个位置开始轮询N条消息。

尽管如此,如果我们使用事务生产者生成记录,则此方法将不起作用。在这种情况下,偏移量将跳过一些数字以适应Kafka主题事务记录(提交/回滚等)。使用事务生产者的一个常见情况是我们需要只处理一次Kafka消息。简而言之,如果我们从(lastOffset - N)开始读取消息,我们可能会消费少于N条消息,因为一些偏移量数字已被事务记录消费

3. 使用Java获取Kafka主题中的最后N条消息

首先,我们需要创建一个生产者和一个消费者:

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

现在让我们生成一些消息:

final String TOPIC1 = "tuyucheng-topic";
int messagesInTopic = 100;
for (int i = 0; i < messagesInTopic; i++) {
    producer.send(new ProducerRecord(TOPIC1, null, MESSAGE_KEY, String.valueOf(i))).get();
}

为了清晰和简单起见,我们假设我们只需要为消费者注册一个分区:

TopicPartition partition = new TopicPartition(TOPIC1, 0);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(partition);
consumer.assign(partitions);

正如我们之前提到的,我们需要将偏移量定位在正确的位置,然后我们就可以开始轮询了:

int messagesToRetrieve = 10;
consumer.seekToEnd(partitions);
long startIndex = consumer.position(partition) - messagesToRetrieve;
consumer.seek(partition, startIndex);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));

如果网络特别慢,或者要检索的消息数量特别大,我们可能需要增加轮询持续时间。在这种情况下,我们需要考虑内存中有大量记录可能会导致资源短缺问题。

现在让我们最终检查一下我们是否真正检索到了正确数量的消息:

for (ConsumerRecord<String, String> record : records) {
    assertEquals(MESSAGE_KEY, record.key());
    assertTrue(Integer.parseInt(record.value()) >= (messagesInTopic - messagesToRetrieve));
    recordsReceived++;
}
assertEquals(messagesToRetrieve, recordsReceived);

4. 使用KafkaCat获取Kafka主题中的最后N条消息

KafkaCat(kcat)是一个命令行工具,我们可以使用它来测试和调试Kafka主题。Kafka本身提供了大量脚本和shell工具来执行相同的操作,尽管如此,KafkaCat的简单性和易用性使其成为执行诸如检索Apache Kafka主题中的最后N条消息等操作的事实标准。安装后,可以通过运行以下简单命令来检索Kafka主题中生成的最新N条消息:

$ kafkacat -C -b localhost:9092 -t topic-name -o -<N> -e
  • -C表示我们需要消费消息
  • -b表示Kafka Broker的位置
  • -t表示主题名称
  • -o表示需要从这个偏移量开始读取,负号表示需要从末尾读取N条消息
  • -e选项在读完最后一条消息后退出

联系到我们上面讨论的案例,从名为“tuyucheng-topic”的主题中检索最后10条消息的命令是:

$ kafkacat -C -b localhost:9092 -t tuyucheng-topic -o -10 -e

5. 总结

在本简短教程中,我们了解了如何消费Kafka主题的最新N条消息。在第一部分中,我们使用了Java Kafka API库。在第二部分中,我们使用了名为KafkaCat的命令行实用程序。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章