文章目录
说明
Docker Hub - bitnami/kafka
Docker Hub - apache/kafka
Kafka QuickStart
Kafka 目前没有 Docker 官方镜像, 目前拉取次数最多的是 bitnami/kafka
, Apache 提供的是 apache/kafka
(更新最及时), 本文使用 bitnami/kafka
bitnami/kafka 镜像支持的环境变量有很多, 除 DockerHub 文档中列出的外, Kafka 的每一个配置项都可以与一个以 KAFKA_CFG_
开头的环境变量相对应, 如 KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
对应 auto.create.topics.enable
, KAFKA_CFG_BACKGROUND_THREADS
对应 background.threads
Kafka 现在有两种启动方式, ZooKeeper 或 KRaft, 后者不需要依赖 Zookeeper, 本文使用 KRaft
数据卷 | 数据卷印射在容器内的路径 |
---|---|
kafka | /bitnami/kafka |
命令
Kafka 文档
bitnami configuration
bitnami security
bitnami apache-kafka-kraft-mode-configuration
bitnami apache-kafka-zookeeper-mode-configuration
bitnami setting-up-a-apache-kafka-cluster
docker pull bitnami/kafka
docker network create network
docker volume create kafka
说明
Kafka 集群的安全认证机制主要包括 认证(Authentication)、授权(Authorization) 和 数据加密(Encryption), 确保集群和客户端的安全通信
- Kafka 主要支持 SASL(Simple Authentication and Security Layer) 认证方式
- 认证成功后, 还需要 授权 来控制不同用户的访问权限, 默认使用 ACL(Access Control List) 进行权限管理
- 常见的 ACL 权限有: Read(消费权限), Write(生产权限), Create(创建主题), Delete(删除主题), Alter(修改主题)
- 如果不启用 ACL, Kafka 允许所有用户访问
- Kafka 支持 TLS/SSL 加密, 用于 客户端-服务器 之间的数据传输保护
Kafka 可以分别配置 与 Controller 相关的通信
/ Broker 之间的通信
/ Broker 与 Client 之间的通信
是否做安全认证与是否加密, 但是 Spring 与 Kafka 的集成仅限于 Client 与 Broker 通信 这一种, 所以这里不关注其他两个通信方式的认证与加密与否, 只考虑客户端相关的配置
Spring 与 Kafka 集成有几个核心配置参数如下
spring.kafka.security.protocol
配置对应 Kafka 的 安全协议, 用于指定 Kafka 客户端 和 集群 之间的认证与通信方式, 有如下可选值- PLAINTEXT: 无认证,无加密, 默认值
- SSL: 无认证, 有加密(TLS/SSL)
- SASL_PLAINTEXT: 有认证(SASL), 无加密
- SASL_SSL: 有认证(SASL), 有加密(TLS/SSL)
spring.kafka.properties.sasl.mechanism
用于配置 Kafka SASL 认证机制, 只在安全协议为SASL_PLAINTEXT
和SASL_SSL
时生效, 有如下可选值, 这里只记录 PLAIN 机制, 因为其他的很难搞没成功- PLAIN: 明文用户名/密码认证
- SCRAM-SHA-256: 更安全的用户名/密码认证
- SCRAM-SHA-512: 更安全的用户名/密码认证
- GSSAPI: 基于 Kerberos 认证, 默认值
- OAUTHBEARER: 基于 OAuth 令牌认证
PLAINTEXT, 无认证, 无加密
docker run -d --name kafka --restart=always --network network -p 9092:9092 -e TZ=Asia/Shanghai -e KAFKA_CFG_PROCESS_ROLES=controller,broker -e KAFKA_CFG_NODE_ID=0 -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -e KAFKA_CFG_LISTENERS=CONTROLLER://:9099,PLAINTEXT://:9092 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9099 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false -v kafka:/bitnami/kafka bitnami/kafka:latest
docker run -d --name kafka --restart=always --network network -p 9092:9092 -e TZ=Asia/Shanghai
# process.roles, 本节点扮演的角色, 即是 broker 也是 controller, 没有该配置则认为处于ZK模式
-e KAFKA_CFG_PROCESS_ROLES=controller,broker
# node.id, 本节点的唯一标识
-e KAFKA_CFG_NODE_ID=0
# listener.security.protocol.map, 侦听器名称和安全协议之间的映射,默认值:PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# listeners, 可用的监听器, 用于 Controller 相关的通信, Broker 之间通信, Client 和 Broker 之间通信
-e KAFKA_CFG_LISTENERS=CONTROLLER://:9099,PLAINTEXT://:9092
# advertised.listeners, 用于告诉外部客户端和其他 Broker 该节点的访问地址
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
# controller.listener.names, 指定用于控制器通信的监听器名称, 作用是告诉 Kafka 控制器节点应该使用哪个监听器来进行通信。控制器节点是 Kafka 集群中的一种特殊节点,负责管理集群的元数据、领导者选举、分区分配和其他集群管理任务。在 KRaft 模式下,控制器节点通过与其他节点之间的通信来执行这些任务。KRaft 模式下, 控制器节点必须配置
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# controller.quorum.voters, 投票者列表, 奇数, 格式为 {id}@{host}:{port}, 这里的 host 为 kafka 是因为 docker 支持用容器名称来访问
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9099
-e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
-v kafka:/bitnami/kafka bitnami/kafka:latest
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.group-id=test
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.acks=all
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
SASL_PLAINTEXT/PLAIN, 有认证(SASL), 无加密
docker run -d --name kafka --restart=always --network network -p 9093:9093 -e TZ=Asia/Shanghai -e KAFKA_CFG_PROCESS_ROLES=controller,broker -e KAFKA_CFG_NODE_ID=0 -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT -e KAFKA_CFG_LISTENERS=CONTROLLER://:9099,PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093 -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9099 -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER -e KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN -e KAFKA_CFG_SASL_MECHANISMS=PLAIN -e KAFKA_CLIENT_USERS=kafka -e KAFKA_CLIENT_PASSWORDS=Mrv587.. -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false -v kafka:/bitnami/kafka bitnami/kafka:latest
docker run -d --name kafka --restart=always --network network -p 9093:9093 -e TZ=Asia/Shanghai
-e KAFKA_CFG_PROCESS_ROLES=controller,broker
-e KAFKA_CFG_NODE_ID=0
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
-e KAFKA_CFG_LISTENERS=CONTROLLER://:9099,PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
# 客户端可通过 advertised.listeners 公开的地址连接集群, 即 9092(无认证) 和 9093(有认证) 同时生效, 通过 -p 控制允许访问的端口
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9099
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
-e KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
-e KAFKA_CFG_SASL_MECHANISMS=PLAIN
-e KAFKA_CLIENT_USERS=kafka
-e KAFKA_CLIENT_PASSWORDS=Mrv587..
-e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
-v kafka:/bitnami/kafka bitnami/kafka:latest
spring.kafka.bootstrap-servers=localhost:9093
spring.kafka.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="Mrv587..";
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.group-id=test
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.acks=all
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
SSL, 无认证, 有加密
好难搞, 等真的需要的时候再研究吧
SASL_SSL/PLAIN, 有认证(SASL), 有加密
好难搞, 等真的需要的时候再研究吧
其他
可以在启动日志中找到 Kafka 节点的完整配置
# 查看日志(启动失败也可查看)
docker logs kafka
# 跟踪日志
docker logs -f kafka
docker logs --follow kafka
# 查看最后10条日志
docker logs --tail 10 kafka
# 查看最近10分钟的日志
docker logs --since 10m kafka
# 进入容器并执行 bash
docker exec -it kafka bash
# 退出容器
exit
Kafka 脚本文件在容器中的存放路径 /opt/bitnami/kafka/bin
默认情况下, 新建的 Topic 有一个分区, 每个分区有一个副本, 由 num.partitions
和 default.replication.factor
指定, 默认值都是 1
docker exec -it kafka bash
cd /opt/bitnami/kafka
# 创建 topic, 创建一个1分区1副本的主题, 集群每个节点都需要写到 --bootstrap-server 中, 用逗号分割
# host 用 kafka 是因为启动容器指定了 hostname
# 分区数会影响消费者的最大并行度
bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --partitions 1 --replication-factor 1 --topic test
bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --topic test
# 查看 topic
bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
bin/kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic test
# 修改 topic, 分区改为两个(分区数只能增不能减)
bin/kafka-topics.sh --bootstrap-server kafka:9092 -alter --partitions 2 --topic test
# 删除 topic
bin/kafka-topics.sh --bootstrap-server kafka:9092 --delete --topic test
# 生产
bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test