1. 概述
在本教程中,我们将讨论一个名为Redpanda的强大事件流平台。它是事实上的行业流媒体平台Kafka的竞争对手,有趣的是,它也与Kafka API兼容。
我们将研究Redpanda的关键组件、功能和用例,创建用于向Redpanda主题发布消息的Java程序,然后从中读取消息。
2. Redpanda与Kafka
由于Redpanda的制造商声称自己是Kafka的竞争对手,让我们从几个重要因素对它们进行比较:
特征 | Redpanda | Kafka |
---|---|---|
开发者体验 | 包含一个易于安装的二进制包 不依赖JVM和第三方工具 |
它依赖于Zookeeper或KRaft 对于安装,开发人员需要更多专业知识 |
性能 | 由于采用每核线程编程模型,速度比Kafka快10倍 用C++编写 每个核心可以处理每秒1GB的写入 支持自动内核调整 p99999延迟为16ms |
Kafka开发时间已久,因此并未针对新时代的多核CPU进行优化 用Java编写 p99999延迟为1.8秒 |
成本 | 比Kafka低6倍 | 需要更多基础设施来支持类似的性能 |
连接器 | Redpanda Cloud提供了一些开箱即用的托管连接器 | 它相当成熟,支持许多开箱即用的连接器 |
社区支持 | 在可接受性方面,与Kafka相比还有很长的路要走 拥有Slack频道 |
它在各个行业都有广泛的应用,因此有一个非常成熟的社区 |
3. Redpanda架构
Redpanda的架构不仅简洁,而且极其易于掌握。值得一提的是,它只有一个易于安装的二进制安装包,这让开发人员能够快速上手,也是它广受欢迎的原因。此外,它还提供了一个性能极高的流平台,吞吐量极高。
3.1 关键组件和特性
让我们深入了解Redpanda的关键组件和功能,这些组件和功能使其极其强大且性能卓越:
控制平面支持Kafka API,用于管理代理、创建消息主题、发布和消费消息等。因此,依赖Kafka的旧系统可以以显著减少的投入迁移到Redpanda。但是,Redpanda集群的管理和配置需要一组不同的Admin API。
Redpanda支持分层存储,这意味着我们可以将其配置为将数据日志从本地缓存卸载或归档到云端更便宜的对象存储。此外,根据用户的需要,数据会实时从远程对象存储移回本地缓存。
Redpanda拥有一个Raft共识算法实现层,该层在其节点之间复制主题分区数据,此功能可防止发生故障时数据丢失。当然,它保证了高数据安全性和容错性。
Redpanda拥有强大的身份验证和授权支持,它可以使用SASL、OAuth、OpenID Connect(OIDC)、基本身份验证、Kerberos等方法对外部用户和应用程序进行身份验证。此外,它还通过基于角色的访问控制(RBAC)机制对其资源进行细粒度的访问控制。
Schema对于定义Redpanda Broker、消费者和生产者之间交换的数据至关重要,因此,集群有一个Schema Registry,Schema Registry API可帮助注册和修改Schema。
HTTP代理(pandaproxy) API提供了一种与Redpanda交互的便捷方式,可以执行基本数据操作,例如列出主题和代理、获取事件、生成事件等等。
最后,Redpanda为其监控提供了指标端点,这些端点可以在Prometheus(监控工具)上进行配置,以提取重要指标并将其显示在Grafana仪表板上。
3.2 单个二进制安装包
Redpanda的安装包仅包含一个二进制文件,因此其安装比Kafka简单得多。与Kafka不同,它不依赖于JVM或Zookeeper等集群管理器,因此,Redpanda的操作非常轻松。
它采用C++开发,拥有强大的“每核线程”编程模型,有助于优化CPU核心、内存和网络的利用率。因此,其部署的硬件成本显著降低,该模型还能实现低延迟和高吞吐量。
Redpanda的集群由多个节点组成,每个节点既可以充当数据平面,也可以充当控制平面,这些节点只需安装一个二进制包并进行相应的配置即可。如果节点拥有强大的计算能力,它们可以同时充当两个角色,而不会出现性能瓶颈。
3.3 管理工具
Redpanda提供了两种管理工具:Web控制台和名为Redpanda Keeper(RPK)的CLI,控制台是一个用户友好的Web应用程序,可供集群管理员使用。
RPK主要用于低级集群管理和调优,但是,控制台提供了数据流的可视性以及集群故障排除和管理的功能。
4. 部署
Redpanda支持自托管和Redpanda云部署。
在自托管部署中,客户可以在其私有数据中心或公有云的VPC中部署Redpanda集群。它可以部署在物理机、虚拟机和Kubernetes上。根据经验,每个Broker都应该有其专用节点,目前支持RHEL/CentOS和Ubuntu操作系统。
此外,AWS简单存储服务(S3)、Azure Blob存储(ABS)和Google云存储(GCS)可用于支持分层存储。
有趣的是,客户还可以选择Redpanda Cloud的托管服务,他们可以选择将整个集群完全托管在Redpanda Cloud上,也可以选择在其私有数据中心或公有云账户中运行数据平面。控制平面仍然保留在Redpanda Cloud上,监控、配置和升级均由Redpanda Cloud负责。
5. 关键用例
与Kafka不同,Redpanda因其简单的架构和易于安装的特点,对于开发者来说是一个非常强大的流平台,让我们快速看一下同样的用例:
一般来说,流平台的参与者有:
- 源系统生成提要
- 提要可以监控事件、指标、通知等
- 集群中管理主题的代理
- 生产者从源系统读取提要并将其发布到主题
- 消费者不断对订阅的主题进行投票
- 目标系统接收来自消费者的转换后的消息
Redpanda保证从各种来源(如监控工具、合规和安全平台、物联网设备等)向目标系统提供实时信息,平均延迟降低10倍。
它支持消费者和生产者模型,用于处理来自各种来源的实时信息流或事件,生产者是从源系统读取数据并将其发布到Redpanda集群中主题的应用程序。集群中的代理具有高可靠性和容错能力,可保证消息的传递。
消费者应用程序订阅集群中的主题,最终,它们从主题中读取数据,并在进一步转换数据后将其发送到各种目标系统,例如分析平台、NoSQL数据库、关系型数据库或其他流平台。
在微服务架构中,Redpanda通过促进微服务之间的异步通信来帮助解耦微服务。
因此,它可以在各个行业的发展中发挥重要作用:
- 用于事件和日志处理、报告、故障排除和自动修复的可观察性平台
- 实时合规和欺诈检测系统
- 实时分析仪表板和应用程序
6. 使用Kafka API实现Redpanda客户端
值得注意的是,Redpanda支持Kafka API,因此,我们将使用Kafka客户端编写可以与Redpanda Stream交互的程序。
为了举例,我们使用Java Testcontainers在Windows桌面上部署单节点Redpanda。
此外,我们将探索涵盖主题创建、消息发布和消息消费的基本程序。这只是为了演示目的,因此我们不会深入探讨Kafka API的概念。
6.1 先决条件
在开始之前,让我们导入Kafka客户端库必要的Maven依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
6.2 创建主题
为了在Redpanda上创建主题,我们首先从Kafka客户端库实例化AdminClient类:
AdminClient createAdminClient() {
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
return KafkaAdminClient.create(adminProps);
}
为了设置AdminClient,我们获取了代理URL并将其传递给其静态create()方法。
现在,让我们看看如何创建主题:
void createTopic(String topicName) {
try (AdminClient adminClient = createAdminClient()) {
NewTopic topic = new NewTopic(topicName, 1, (short) 1);
adminClient.createTopics(Collections.singleton(topic));
} catch (Exception e) {
LOGGER.error("Error occurred during topic creation:", e);
}
}
AdminClient类的createTopics()方法接收NewTopic对象作为创建主题的参数。
最后,让我们看一下createTopic()方法的实际作用:
@Test
void whenCreateTopic_thenSuccess() throws ExecutionException, InterruptedException {
String topic = "test-topic";
createTopic(topic);
try(AdminClient adminClient = createAdminClient()) {
assertTrue(adminClient.listTopics()
.names()
.get()
.contains(topic));
}
}
该程序在Redpanda上成功创建了主题test-topic,我们还使用AdminClient类的listTopics()方法验证了该主题在代理中的存在。
6.3 向主题发布消息
生产者应用程序最基本的要求是将消息发布到主题,为此,我们将使用KafkaProducer:
KafkaProducer<String, String> createProducer() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<String, String>(producerProps);
}
我们通过向KafkaProducer构造函数提供代理URL和StringSerializer类等基本属性来实例化生产者。
现在,让我们使用生产者将消息发布到主题:
void publishMessage(String msgKey, String msg, String topic, KafkaProducer<String, String> producer)
throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msgKey, msg);
producer.send(record).get();
}
创建ProducerRecord对象后,我们将其传递给KafkaProducer对象中的send()方法以发布消息。send()方法是异步操作的,因此我们调用get()方法以确保阻塞,直到消息发布完成。
最后,现在我们来发布一条消息:
@Test
void givenTopic_whenPublishMsg_thenSuccess() {
try (final KafkaProducer<String, String> producer = createProducer()) {
assertDoesNotThrow(() -> publishMessage("test_msg_key_2", "Hello Redpanda!", "tuyucheng-topic", producer));
}
}
首先,我们通过调用createProducer()方法创建KafkaProducer对象。然后,我们通过调用之前介绍过的publishMessage()方法,将消息“Hello Redpanda!”发布到主题tuyucheng-topic。
6.4 从主题消费消息
下一步,我们首先创建一个KafkaConsumer,然后才能消费流中的消息:
KafkaConsumer<String, String> createConsumer() {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<String, String>(consumerProps);
}
我们通过向KafkaConsumer构造函数提供代理URL、StringDeSerializer类等基本属性来实例化消费者。此外,我们确保消费者从偏移量0(“最早”)开始消费消息。
接下来,让我们处理一些消息:
@Test
void givenTopic_whenConsumeMessage_thenSuccess() {
try (KafkaConsumer<String, String> kafkaConsumer = createConsumer()) {
kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
if(records.count() == 0) {
continue;
}
assertTrue(records.count() >= 1);
break;
}
}
}
该方法在创建KafkaConsumer对象后,会订阅一个主题。然后,它会每1000毫秒轮询一次该主题,以读取其中的消息。这里,为了演示,我们跳过了循环,但在现实世界中,应用程序会持续轮询消息,然后进一步处理它们。
7. 总结
在本教程中,我们探索了Redpanda流平台,从概念上讲,它与Apache Kafka类似,但更易于安装、监控和管理。此外,它占用的计算和内存资源更少,却能实现极高的性能和高容错能力。
但是,Redpanda在行业应用方面与Kafka相比仍有相当大的差距。此外,Redpanda的社区支持也不如Kafka强大。
最后,由于它与Kafka API兼容,应用程序可以以更少的努力从Kafka迁移到Redpanda。
Post Directory
