1. 概述
身份验证是设计任何消息系统(例如Kafka)的基本方面,我们可以使用基于用户的凭证、SSL证书或基于令牌等方法来实现身份验证。
在本教程中,我们将学习如何在Kafka服务中实现一种名为“简单身份验证和套接字层(SASL)”的身份验证机制,我们还将使用Spring Kafka提供的机制实现客户端身份验证。
2. Kafka身份验证简介
Kafka支持各种身份验证和授权机制来保护网络通信安全,它支持SSL、SASL或委托令牌。身份验证可以发生在客户端与Broker之间、Broker与Zookeeper之间,或者Broker之间。
我们可以根据系统要求和其他基础设施因素采用任何相关方法,SSL身份验证使用X.509证书对客户端和代理进行身份验证,并提供单向或双向身份验证。
SASL身份验证是一个支持不同身份验证机制的安全框架:
- SASL/GSSAPI:SASL/GSSAPI(通用安全服务应用程序接口)是一种标准API,它通过标准API抽象化安全机制,并可轻松与现有的Kerberos服务集成。SASL/GSSAPI身份验证使用密钥分发中心通过网络提供身份验证,通常用于现有基础设施(例如Active Directory或Kerberos服务器)可用的场合。
- SASL/PLAIN:SASL/PLAIN使用基于用户的凭证进行身份验证,它主要用于非生产环境,因为它在网络上不安全。
- SASL/SCRAM:使用SASL/SCRAM身份验证,通过对密码进行哈希处理并添加盐值来创建带盐质询-响应,从而提供比纯文本机制更高的安全性。SCRAM支持不同的哈希算法,例如SHA-256、SHA-512或SHA-1(安全性较低)。
- SASL/OAUTHBEARER:SASL/OAUTHBEARER使用OAUTH 2.0承载令牌进行身份验证,当我们拥有现有的身份提供商(如Keycloak或OKTA)时很有用。
我们还可以将SASL与SSL身份验证结合起来,提供传输层加密。
对于Kafka中的授权,我们可以使用内置的基于ACL或OAUTH/OIDC(OpenID Connect)或自定义授权器。
在本教程中,我们将重点介绍GSSAPI身份验证实现,因为它被广泛使用并且保持其简单性。
3. 使用SASL/GSSAPI身份验证实现Kafka服务
假设我们需要在Docker环境中构建一个支持GSSAPI身份验证的Kafka服务,为此,我们可以利用Kerberos运行时提供票证授予票证(TGT)服务并充当身份验证服务器。
3.1 设置Kerberos
为了在Docker环境中实现Kerberos服务,我们需要自定义Kerberos设置。
首先,让我们包含一个krb5.conf文件来配置领域TUYUCHENG.COM的一些配置:
[libdefaults]
default_realm = TUYUCHENG.COM
dns_lookup_realm = false
dns_lookup_kdc = false
forwardable = true
rdns = true
[realms]
TUYUCHENG.COM = {
kdc = kdc
admin_server = kdc
}
领域是Kafka服务的逻辑名称或域名。
我们需要编写一个脚本,使用kdb5_util初始化Kerberos数据库,然后使用Kadmin.local命令为Kafka、Zookeeper和客户端应用程序创建主体及其关联的密钥表文件。最后,我们将使用krb5kdc和kadmind命令启动Kerberos服务。
然后,让我们实现脚本kdc_setup.sh来添加主体,创建keytab文件,并运行Kerberos服务:
kadmin.local -q "addprinc -randkey kafka/localhost@TUYUCHENG.COM"
kadmin.local -q "addprinc -randkey zookeeper/zookeeper.sasl_default@TUYUCHENG.COM"
kadmin.local -q "addprinc -randkey client@TUYUCHENG.COM"
kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/kafka.keytab kafka/localhost@TUYUCHENG.COM"
kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/zookeeper.keytab zookeeper/zookeeper.sasl_default@TUYUCHENG.COM"
kadmin.local -q "ktadd -k /etc/krb5kdc/keytabs/client.keytab client@TUYUCHENG.COM"
krb5kdc
kadmind -nofork
任何主体的格式通常为<service-name>/<host/domain>@REALM,hostname部分是可选的,REALM通常大写。
我们还应该注意,主体应该正确设置,否则,身份验证将由于服务名称或完全限定域名不匹配而失败。
最后,我们来实现一个Dockerfile来准备Kerberos环境:
FROM debian:bullseye
RUN apt-get update && \
apt-get install -y krb5-kdc krb5-admin-server krb5-user && \
rm -rf /var/lib/apt/lists/*
COPY config/krb5.conf /etc/krb5.conf
COPY setup_kdc.sh /setup_kdc.sh
RUN chmod +x /setup_kdc.sh
EXPOSE 88 749
CMD ["/setup_kdc.sh"]
上述Dockerfile使用之前创建的krb5.conf和setup_kdc.sh文件来初始化并运行Kerberos服务。
我们还将添加一个kadm5.acl文件来授予管理员主体完全权限:
*/admin@TUYUCHENG.COM *
3.2 Kafka和Zookeeper的配置
为了在Kafka中配置GSSAPI身份验证,我们将使用JAAS(Java身份验证和授权服务)来指定Kafka或客户端如何向Kerberos的密钥分发中心(KDC)进行身份验证。
我们将在Kafka服务器、Zookeeper的单独文件中创建JAAS相关配置。
首先,我们将实现zookeeper_jaas.conf文件并设置之前创建的zookeeper.keytab文件和principal参数:
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/keytabs/zookeeper.keytab"
principal="zookeeper/zookeeper.sasl_default@TUYUCHENG.COM";
};
该主体必须与Zookeeper的Kerberos主体相同,通过将useKeyTab设置为true,我们强制身份验证使用keytab文件。
然后,我们在kafka_server_jaas.conf文件中配置Kafka服务器和客户端JAAS相关属性:
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/keytabs/kafka.keytab"
principal="kafka/localhost@TUYUCHENG.COM"
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/keytabs/client.keytab"
principal="client@TUYUCHENG.COM"
serviceName="kafka";
};
3.3 将Kafka与Zookeeper和Kerberos集成
借助Docker服务可以轻松集成Kafka、Zookeeper和自定义Kerberos服务。
首先,我们将使用之前的Dockerfile实现自定义Kerberos服务:
services:
kdc:
build:
context: .
dockerfile: Dockerfile
volumes:
- ./config:/etc/krb5kdc
- ./keytabs:/etc/krb5kdc/keytabs
- ./config/krb5.conf:/etc/krb5.conf
ports:
- "88:88/udp"
上述服务将在内部和主机环境的典型UDP 88端口上可用。
然后,让我们使用confluentinc:cp-zookeeper基础镜像设置Zookeeper服务:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf"
volumes:
- ./config/zookeeper_jaas.conf:/etc/kafka/zookeeper_jaas.conf
- ./keytabs:/etc/kafka/keytabs
- ./config/krb5.conf:/etc/krb5.conf
ports:
- "2181:2181"
上述Zookeeper服务也使用zookeeper_jaas.conf配置了GSSAPI身份验证。
最后,我们将使用与GSSAPI相关的环境属性设置Kafka服务:
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI
KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI
KAFKA_LISTENERS: SASL_PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./config/kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
- ./keytabs:/etc/kafka/keytabs
- ./config/krb5.conf:/etc/krb5.conf
depends_on:
- zookeeper
- kdc
ports:
- 9092:9092
在上面的Kafka服务中,我们在Kafka代理服务器和客户端上都启用了GSSAPI身份验证。Kafka服务将使用之前创建的kafka_server_jaas.conf文件进行GSSAPI配置,例如主体文件和密钥表文件。
我们应该注意,KAFKA_ADVERTISED_LISTENERS属性是Kafka客户端将监听的端点。
现在,我们将使用docker compose命令运行整个Docker设置:
$ docker compose up --build
kafka | [2025-02-03 18:09:10,147] INFO Successfully authenticated client: authenticationID=kafka/localhost@TUYUCHENG.COM; authorizationID=kafka/localhost@TUYUCHENG.COM. (org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler)
kafka | [2025-02-03 18:09:10,148] INFO [RequestSendThread controllerId=1001] Controller 1001 connected to localhost:9092 (id: 1001 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
从以上日志中,我们确认Kafka、Zookeeper、Kerberos服务均已集成,没有错误。
4. 使用Spring实现Kafka客户端
我们将使用Spring Kafka来实现Kafka监听器应用程序。
4.1 Maven依赖
首先,我们将包含spring-kafka依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
4.2 实现Kafka监听器
我们将使用Spring Kafka的KafkaListener和ConsumerRecord类来实现监听器。
让我们用@KafkaListener注解实现Kafka监听器并添加所需的主题:
@KafkaListener(topics = test-topic)
public void receive(ConsumerRecord<String, String> consumerRecord) {
log.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
另外,我们将在application-sasl.yml文件中配置与Spring监听器相关的配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
现在,让我们运行Spring应用程序并验证设置:
kafka | [2025-02-01 03:08:01,532] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /172.21.0.1 (channelId=172.21.0.4:9092-172.21.0.1:59840-16) (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
以上日志确认客户端应用程序无法按预期向Kafka服务器进行身份验证。
为了解决这个问题,我们还需要在应用程序中包含Spring Kafka JAAS配置。
5. 使用JAAS配置Kafka客户端
我们将使用spring.kafka.properties配置来提供SASL/GSSAPI设置。
现在,我们将包括一些与客户端主体、密钥表文件和sasl.mechanism作为GSSAPI相关的附加配置:
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
sasl.mechanism: GSSAPI
sasl.jaas.config: >
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="./src/test/resources/sasl/keytabs/client.keytab"
principal="client@TUYUCHENG.COM"
serviceName="kafka";
我们应该注意,上述serviceName配置应该与Kafka主体的serviceName完全匹配。
我们再次验证一下Kafka消费者应用程序。
6. 在应用程序中测试Kafka监听器
为了快速验证监听器,我们将使用Kafka提供的实用程序kafka-console-producer.sh向主题发送消息。
运行以下命令向主题发送消息:
$ kafka-console-producer.sh --broker-list localhost:9092 \
--topic test-topic \
--producer-property security.protocol=SASL_PLAINTEXT \
--producer-property sasl.mechanism=GSSAPI \
--producer-property sasl.kerberos.service.name=kafka \
--producer-property sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true keyTab=\"/<path>/client.keytab\"
storeKey=true principal=\"client@TUYUCHENG.COM\";"
> hello
在上面的命令中,我们通过client.keytab文件传递类似的与身份验证相关的配置,如security.protocol、sasl.mechanism和sasl.jaas.config。
现在,让我们验证一下收到的消息的监听器日志:
08:52:13.663 INFO c.t.t.s.KafkaConsumer - Received payload: 'ConsumerRecord(topic = test-topic, .... key = null, value = hello)'
我们应该注意,任何投入生产的应用程序中可能还需要一些配置,例如配置SSL证书或DNS。
7. 总结
在本文中,我们学习了如何在docker环境中设置Kafka服务并使用自定义Kerberos设置在docker环境中启用SASL/GSSAPI身份验证。
我们还实现了客户端监听器应用程序,并使用JAAS配置配置了GSSAPI身份验证。最后,我们通过在监听器中发送和接收消息来测试整个设置。
Post Directory
