1. 概述
在这篇短文中,我们将探讨Kafka Producer的重试机制以及如何定制其设置以适应特定用例。
我们将讨论关键属性及其默认值,然后根据我们的示例对其进行自定义。
2. 默认配置
Kafka Producer的默认行为是当消息未被代理确认时重试发布,为了演示这一点,我们可以通过故意错误配置主题设置来导致生产者失败。
首先,让我们将kafka-clients依赖添加到pom.xml中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
</dependency>
现在,我们需要模拟Kafka代理拒绝生产者发送的消息的用例。例如,我们可以使用“min.insync.replicas”主题配置,该配置在写入被视为成功之前验证最小数量的副本是否同步。
让我们创建一个主题并将此属性设置为2,即使我们的测试环境仅包含一个Kafka代理。因此,新消息始终被拒绝,这使我们能够测试生产者的重试机制:
@Test
void givenDefaultConfig_whenMessageCannotBeSent_thenKafkaProducerRetries() throws Exception {
NewTopic newTopic = new NewTopic("test-topic-1", 1, (short) 1)
.configs(Map.of("min.insync.replicas", "2"));
adminClient.createTopics(singleton(newTopic)).all().get();
// publish message and verify exception
}
然后,我们创建一个KafkaProducer,向该主题发送一条消息,并验证它多次重试并最终在2分钟后超时:
@Test
void givenDefaultConfig_whenMessageCannotBeSent_thenKafkaProducerRetries() throws Exception {
// set topic config
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic-1", "test-value");
assertThatThrownBy(() -> producer.send(record).get())
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(org.apache.kafka.common.errors.TimeoutException.class)
.hasMessageContaining("Expiring 1 record(s) for test-topic-1-0");
}
从异常和日志中我们可以看出,生产者尝试发送消息多次,最终在2分钟后超时,此行为与KafkaProducer的默认设置一致:
- retries(默认为Integer.MAX_VALUE):发布消息的最大尝试次数
- delivery.timeout.ms(默认为120000):在认为消息失败之前等待消息被确认的最长时间
- retry.backoff.ms(默认为100):重试前等待的时间
- retry.backoff.max.ms(默认为1000):连续重试之间的最大延迟
3. 自定义重试配置
不用说,我们可以调整KafkaProducer的重试配置以更好地满足我们的需求。
例如,我们可以将最大传送时间设置为5秒,重试之间使用500毫秒的延迟,并将最大重试次数降低到20次:
@Test
void givenCustomConfig_whenMessageCannotBeSent_thenKafkaProducerRetries() throws Exception {
// set topic config
Properties props = new Properties();
// other properties
props.put(RETRIES_CONFIG, 20);
props.put(RETRY_BACKOFF_MS_CONFIG, "500");
props.put(DELIVERY_TIMEOUT_MS_CONFIG, "5000");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic-2", "test-value");
assertThatThrownBy(() -> producer.send(record).get())
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(org.apache.kafka.common.errors.TimeoutException.class)
.hasMessageContaining("Expiring 1 record(s) for test-topic-2-0");
}
正如预期的那样,生产者在自定义的5秒超时后停止重试。日志显示重试间隔500毫秒,并确认重试计数从20开始,每次尝试后都会减少:
12:57:19.599 [kafka-producer-network-thread | producer-1] WARN o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 5 on topic-partition test-topic-2-0, retrying (19 attempts left). Error: NOT_ENOUGH_REPLICAS
12:57:20.107 [kafka-producer-network-thread | producer-1] WARN o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 6 on topic-partition test-topic-2-0, retrying (18 attempts left). Error: NOT_ENOUGH_REPLICAS
12:57:20.612 [kafka-producer-network-thread | producer-1] WARN o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 7 on topic-partition test-topic-2-0, retrying (17 attempts left). Error: NOT_ENOUGH_REPLICAS
[...]
4. 总结
在本简短教程中,我们探索了KafkaProducer的重试配置。我们学习了如何设置最大传送时间、指定重试次数以及配置失败尝试之间的延迟。
Show Disqus Comments
Post Directory
扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章
