1. 概述
当生产者向Apache Kafka发送消息时,它会将其附加到日志文件中并保留配置的持续时间。
在本教程中,我们将学习为Kafka主题配置基于时间的消息保留属性。
2. 基于时间的留存
有了保留期属性,消息就拥有了TTL(生存时间),到期后,消息将被标记为删除,从而释放磁盘空间。
相同的保留期属性适用于给定Kafka主题中的所有消息,此外,我们可以在创建主题之前设置这些属性,也可以在运行时针对现有主题进行更改。
在下面的部分中,我们将学习如何通过代理配置来调整新主题的保留期,以及如何通过主题级配置在运行时对其进行控制。
3. 服务器级配置
Apache Kafka支持服务器级保留策略,我们可以通过配置以下3个基于时间的配置属性之一来调整该策略:
- log.retention.hours
- log.retention.minutes
- log.retention.ms
需要注意的是,Kafka会用精度更高的值覆盖精度较低的值,因此,log.retention.ms的优先级最高。
3.1 基础知识
首先,让我们通过从Apache Kafka目录执行grep命令来检查保留期的默认值:
$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168
我们可以在这里注意到默认保留时间是7天。
为了仅保留消息十分钟,我们可以在config/server.properties中设置log.retention.minutes属性的值:
log.retention.minutes=10
3.2 新主题保留期
Apache Kafka软件包包含几个可用于执行管理任务的Shell脚本,我们将使用它们创建一个辅助脚本functions.sh,并在本教程中用到它。
我们首先在functions.sh中添加两个函数,分别用于创建主题和描述其配置:
function create_topic {
topic_name="$1"
bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
--partitions 1 --replication-factor 1 \
--zookeeper localhost:2181
}
function describe_topic_config {
topic_name="$1"
./bin/kafka-configs.sh --describe --all \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
接下来,让我们创建两个独立的脚本,create-topic.sh和get-topic-retention-time.sh:
bash-5.1# cat create-topic.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
. ./functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?
需要注意的是,describe_topic_config会列出该主题的所有配置属性,因此,我们使用awk单行命令为retention.ms属性添加了一个过滤器。
最后,让我们启动Kafka环境并验证新示例主题的保留期配置:
bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000
创建并描述主题后,我们会注意到retention.ms设置为600000(十分钟),这实际上是从我们之前在server.properties文件中定义的log.retention.minutes属性派生而来的。
4. 主题级配置
**代理服务器启动后,log.retention.{hours | minutes | ms}服务器级属性将变为只读**,另一方面,我们可以访问retention.ms属性,并在主题级别对其进行调整。 |
让我们在functions.sh脚本中添加一个方法来配置主题的属性:
function alter_topic_config {
topic_name="$1"
config_name="$2"
config_value="$3"
./bin/kafka-configs.sh --alter \
--add-config ${config_name}=${config_value} \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
然后,我们可以在alter-topic-config.sh脚本中使用它:
#!/bin/sh
. ./functions.sh
alter_topic_retention_config $1 $2 $3
exit $?
最后,让我们将test-topic的保留时间设置为五分钟并进行验证:
bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000
5. 验证
到目前为止,我们已经了解了如何在Kafka主题中配置消息的保留期限,现在是时候验证消息在保留期超时后是否确实过期了。
5.1 生产者-消费者
让我们在functions.sh中添加produce_message和consumer_message函数,在内部,它们分别使用kafka-console-producer.sh和kafka-console-consumer.sh来生成/消费消息:
function produce_message {
topic_name="$1"
message="$2"
echo "${message}" | ./bin/kafka-console-producer.sh \
--bootstrap-server=0.0.0.0:9092 \
--topic ${topic_name}
}
function consume_message {
topic_name="$1"
timeout="$2"
./bin/kafka-console-consumer.sh \
--bootstrap-server=0.0.0.0:9092 \
--from-beginning \
--topic ${topic_name} \
--max-messages 1 \
--timeout-ms $timeout
}
我们必须注意,消费者总是从一开始就读取消息,因为我们需要一个读取Kafka中任何可用消息的消费者。
接下来,让我们创建一个独立的消息生产者:
bash-5.1# cat producer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
message="$2"
produce_message ${topic_name} ${message}
exit $?
最后,创建一个独立的消息消费者:
bash-5.1# cat consumer.sh
#!/bin/sh
. ./functions.sh
topic_name="$1"
timeout="$2"
consume_message ${topic_name} $timeout
exit $?
5.2 消息过期
现在我们已经准备好基本设置,让我们生成一条消息并立即消费它两次:
bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
因此,我们可以看到消费者正在重复消费任何可用的消息。
现在,让我们引入五分钟的睡眠延迟,然后尝试消费该消息:
bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
正如预期的那样,消费者没有找到任何可供消费的消息,因为该消息已超出其保留期。
6. 限制
在内部,Kafka代理维护另一个名为log.retention.check.interval.ms的属性,此属性决定检查消息是否过期的频率。
因此,为了保持保留策略有效,我们必须确保log.retention.check.interval.ms的值低于任何给定主题的retention.ms属性值。
7. 总结
在本教程中,我们探索了Apache Kafka,以了解基于时间的消息保留策略。在此过程中,我们创建了简单的Shell脚本来简化管理任务,之后,我们创建了独立的消费者和生产者,用于验证保留期过后的消息是否过期。
Post Directory
