要求说明:
Kafka Brokers 不管是互相通信,对外认证,还是和 Zookeeper 的交互进行,都需要使用 SASL/PLAIN 进行认证
思路:
Kafka 的 bin
目录下面,包含了 Zookeeper 和 Kafka Server 的启动脚本,我们只需要在脚本运行之前,通过环境变量 KAFKA_OPTS
指定对应的 JAAS 认证文件,最后启动 properties
文件即可,理清了整个环节之后还是非常简单的
操作开始
目录结构
首先需要编写 Dockerfile 作为环境变量声明的地方,不然配置文件就没法使用模板进行编排了
FROM openjdk:8-jre-slim
RUN set -eux; \
apt -y update && apt -y install procps; \
apt clean
WORKDIR /
ENV STORE_DATA="/kafka_2.12-1.1.1/store/data" \
STORE_LOGS="/kafka_2.12-1.1.1/store/logs" \
ZOO_USER="ZookeeperUsername" \
ZOO_PASS="ZookeeperPassword" \
ZOO_QUORUM_USER="QuorumUserName" \
ZOO_QUORUM_PASS="QuorumUserPassword" \
KAFKA_USER="KafkaUsername" \
KAFKA_PASS="KafkaPassword" \
KAFKA_CLIENT_BROKER_USER="KafkaClientBrokerUsername" \
KAFKA_CLIENT_BROKER_PASS="KafkaClientBrokerPassword" \
ZOOKEEPER_CONNECT="localhost:2181" \
MODE="" \
# create multiple users with different permissions
# operation attribute: Read Write *
# KAFKA_ACCOUNT_GROUP "user1|password|* readonlyuser|password|Read"
KAFKA_ACCOUNT_GROUP="" \
# each pod contains a zk and a kafka
# so their number of nodes and information should be the same
CLUSTER_NODE=""
# CLUSTERNODE "node0|0|127.0.0.1 node1|1|127.0.0.1"
ENV PATH=$PATH:/kafka_2.12-1.1.1/bin \
ZOO_DATA_DIR="$STORE_DATA/zookeeper/data" \
ZOO_MYID_FILE="$STORE_DATA/zookeeper/data/myid" \
ZOO_DATA_LOG_DIR="$STORE_LOGS/zookeeper/logs/dataLogs" \
ZOO_LOG_DIR="$STORE_LOGS/zookeeper/logs" \
SERVER_JVMFLAGS="-Djava.security.auth.login.config=/kafka_2.12-1.1.1/config/zoo_server_jaas.conf" \
KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-1.1.1/config/kafka_server_jaas.conf" \
ZOO_CFG="/kafka_2.12-1.1.1/config/zookeeper.properties" \
Z00_AUTH="/kafka_2.12-1.1.1/config/zoo_server_jaas.conf" \
KAFKA_CFG="/kafka_2.12-1.1.1/config/server.properties" \
KAFKA_AUTH="/kafka_2.12-1.1.1/config/kafka_server_jaas.conf" \
KAFKA_DATA_LOG_DIR="$STORE_LOGS/kafka/logs/dataLogs"
ENV ENV_CONFIGRUE="ZOO_DATA_DIR ZOO_DATA_LOG_DIR ZOO_USER ZOO_PASS ZOO_QUORUM_USER ZOO_QUORUM_PASS KAFKA_USER KAFKA_PASS KAFKA_CLIENT_BROKER_USER KAFKA_CLIENT_BROKER_PASS KAFKA_DATA_LOG_DIR ZOOKEEPER_CONNECT" \
ENV_CONFIGFILE="ZOO_CFG Z00_AUTH KAFKA_CFG KAFKA_AUTH" \
POD_NAME=""
ADD kafka_2.12-1.1.1.tgz .
COPY zookeeper.properties server.properties kafka_server_jaas.conf zoo_server_jaas.conf /kafka_2.12-1.1.1/config/
RUN set -eux;\
groupadd -r kafka --gid=1000; \
useradd -r -g kafka --uid=1000 kafka; \
mkdir -p "$ZOO_DATA_DIR" "$ZOO_DATA_LOG_DIR" "$ZOO_LOG_DIR" ; \
chown -R kafka:kafka /kafka_2.12-1.1.1/
COPY --chown=root:root docker-entrypoint.sh pre-stop.sh /
USER kafka:kafka
CMD ["/bin/bash", "/docker-entrypoint.sh"]
这里声明了我们需要被用到的一些通用环境变量,还有镜像的一个构建方式
接着编写我们的启动配置文件和认证文件,对于 Zookeeper 和 Kafka,都是需要单独配置的
Zookeeper
Zookeeper 中的角色
- Leader: 负责投票的发起和决议,更新系统状态
- Learner:包含 Follower 和 Observer
- Follower:用户接受客户端请求并向客户端返回结果,在选举过程中参与投票
- Observer:可以接受客户端连接,将写请求转发给 Leader,但 Observer 不参加投票过程,只同步 Leader 的状态,Observer 的目的是为了扩展系统,提高读取速度
zookeeper.properties
clientPort=2181
quorumListenOnAllIPs=true
tickTime=2000
initLimit=5
syncLimit=2
autopurge.purgeInterval=0
autopurge.snapRetainCount=3
maxClientCnxns=60
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
quorum.auth.enableSasl=true # 打开sasl开关, 默认是关的
quorum.auth.learnerRequireSasl=true # ZK做为leaner的时候, 会发送认证信息
quorum.auth.serverRequireSasl=true # 设置为true的时候,learner连接的时候需要发送认证信息,否则拒绝
quorum.auth.learner.loginContext=QuorumLearner # JAAS 配置里面的 Context 名字
quorum.auth.server.loginContext=QuorumServer # JAAS 配置里面的 Context 名字
quorum.cnxn.threads.size=6
dataDir=ZOO_DATA_DIR
dataLogDir=ZOO_DATA_LOG_DIR
Zookeeper 的数据按层次结构组织在文件系统中,并复制到 Ensemble(一个 Zookeeper 服务器的集合) 中所有的 Zookeeper 服务器,对数据的所有操作都是原子和顺序一致性的,而 Ensemble 使用 Zab 协议选举出一个 Leader,并保证所有的写入都能被复制到一个 Quorum,一个 Quorum 表示包含当前领导者的 Ensemble 的多数成员,例如,如果 Ensemble 有 3 个服务器,一个包含领导者的成员和另一个服务器就组成了一个 Quorum。 如果 Ensemble 不能达成一个 Quorum,数据将不能被写入。该机制在集群中的两个作用决定了它可以防止脑裂的出现:
- 集群中最少的节点数用来选举Leader保证集群可用
- 通知客户端数据已经安全保存前集群中最少数量的节点数已经保存了该数据。一旦这些节点保存了该数据,客户端将被通知已经安全保存了,可以继续其他任务。而集群中剩余的节点将会最终也保存了该数据
另外还有一个关键点,Zookeeper Learner 也是在这里被配置,在 Leader 和 Follower 启动期交互过程后,我们分析到整个集群完成 Leader 选举当过半的 Learner 服务器在 Leader 被注册后,将会进入数据同步环节。
zoo_server_jaas.conf
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="ZOO_USER"
password="ZOO_PASS"
user_KAFKA_USER="KAFKA_PASS";
};
QuorumServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
user_ZOO_QUORUM_USER="ZOO_QUORUM_PASS"
;
};
QuorumLearner {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="ZOO_QUORUM_USER"
password="ZOO_QUORUM_PASS"
;
};
这里需要注意的几个点:
Server
里面的用户添加的格式是user_USERNAME = PASSWORD
,我这里使用了 Kafka 的用户模板- 对外认证的部分要使用
DigestLoginModule
模块
Kafka
server.properties
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=ZOOKEEPER_CONNECT
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
zookeeper.set.acl=true
listeners=SASL_PLAINTEXT://:9092
#advertised.listeners=SASL_PLAINTEXT://:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin
kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="KAFKA_USER"
password="KAFKA_PASS"
user_KAFKA_USER="KAFKA_PASS"
;
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="KAFKA_USER"
password="KAFKA_PASS"
;
};
这里 Client
要和 KafkaServer
的 USERNAME 要和 PASSWORD 相同,同时 Server 中也要声明 Client 的认证
docker-entrypoint.sh
#!/bin/bash
function init() {
for configfileName in $ENV_CONFIGFILE; do
eval configfile='${'"$configfileName"'}'
if [ -n "$configfile" ]; then
for configureName in $ENV_CONFIGRUE; do
eval configure='${'"$configureName"'}'
if [ -n "$configure" ]; then
sed -i "s|$configureName|$configure|g" "$configfile"
fi
done
fi
done
}
function ObtainKafkaHostName() {
if [ -n "$POD_NAME" ]; then
echo "advertised.listeners=SASL_PLAINTEXT://$POD_NAME:9092" >>"$KAFKA_CFG"
fi
}
function SplitNodeInfoBySelf() {
status="false"
local ClusterNode=$1
if [ -z "$ClusterNode" ]; then
echo "Please specify information for cluster mode"
exit 1
fi
for x in $ClusterNode; do
x=${x//|/ }
local name=$(echo "$x" | awk '{print $1}')
ID=$(echo "$x" | awk '{print $2}')
Host=$(echo "$x" | awk '{print $3}')
if [ "$name" == "${HOSTNAME}" ]; then
MyID=$ID
Host="0.0.0.0"
status="true"
fi
echo "server.$ID=$Host:2888:3888" >>"$ZOO_CFG"
done
if [ "$status" != "true" ]; then
echo "Not found this node"
exit 1
fi
}
function CreateKafkaPerGroup() {
if [ -z $KAFKA_ACCOUNT_GROUP ]; then
return
fi
for x in $KAFKA_ACCOUNT_GROUP; do
read name passwd permission < <(echo "$x" | awk -F'|' '{print $1,$2,$3}')
infileNum=$(cat "$KAFKA_AUTH" | grep -n "KafkaServer" | awk -F':' '{print $1}')
sed -i "$(($infileNum + 1)) a user_${name}=${passwd}" "$KAFKA_AUTH"
done
}
case $MODE in
"single-zookeeper")
init
export KAFKA_OPTS="$SERVER_JVMFLAGS"
zookeeper-server-start.sh "$ZOO_CFG"
;;
"single-kafka")
init
ObtainKafkaHostName
CreateKafkaPerGroup
kafka-server-start.sh "$KAFKA_CFG"
;;
"zookeeper")
init
export KAFKA_OPTS="$SERVER_JVMFLAGS"
SplitNodeInfoBySelf "$CLUSTER_NODE"
echo "$MyID" >"$ZOO_MYID_FILE"
zookeeper-server-start.sh "$ZOO_CFG"
;;
"kafka")
init
SplitNodeInfoBySelf "$CLUSTER_NODE"
echo "broker.id=$MyID" >>"$KAFKA_CFG"
ObtainKafkaHostName
kafka-server-start.sh "$KAFKA_CFG"
;;
*)
echo "Not found MODE"
;;
esac
echo "server.$zkMyID=$zkHost:2888:3888" >> "$ZOO_CFG"
这句话其实就是 Learner 的配置,2888 端口是 Server 和 Leader 进行交流的端口,3888 端口是 Leader 挂了后,各个 Server 之间进行交流的端口,之要声明了 Learner,我们就能实现一个完整的 Zookeeper 规模集群
启动和验证
Docker-compose.yaml
version: '3'
services:
single-kafka:
container_name: single-kafka
ports:
- 9092:9092
networks:
- kafka
depends_on:
- single-zookeeper
image: kafka-zookeeper
environment:
- KAFKA_HOST_NAME=single-kafka
- KAFKA_USER=KafkaUsername
- KAFKA_PASS=KafkaPassword
- MODE=single-kafka
- ZOOKEEPER_CONNECT=single-zookeeper:2181
- KAFKA_ACCOUNT_GROUP=readonly|readonly123|Read
single-zookeeper:
container_name: single-zookeeper
ports:
- 2181:2181
networks:
- kafka
image: kafka-zookeeper
environment:
- KAFKA_USER=KafkaUsername
- KAFKA_PASS=KafkaPassword
- MODE=single-zookeeper
networks:
kafka:
external: false
$ docker-compose up -d
Creating network "desktop_kafka" with the default driver
Creating single-zookeeper ... done
Creating single-kafka ... done
$ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4bda02b512ed kafka-zookeeper "/bin/bash /docker-e…" 2 seconds ago Up 2 seconds single-kafka
6477dbebb8a7 kafka-zookeeper "/bin/bash /docker-e…" 3 seconds ago Up 2 seconds single-zookeeper
[2021-03-24 02:45:12,058] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-03-24 02:45:12,164] INFO [SocketServer brokerId=1001] Started processors for 1 acceptors (kafka.network.SocketServer)
[2021-03-24 02:45:12,171] INFO Kafka version : 1.1.1 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-24 02:45:12,171] INFO Kafka commitId : 8e07427ffb493498 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-24 02:45:12,172] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
验证
我们这边使用自带的 Kafka 脚本进行认证的一个验证
首先需要准备授权的 JAAS 文件,名为 kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="KafkaUsername"
password="KafkaPassword";
};
接下来分别准备生产者和消费者的配置文件
consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-consumer-group
producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
bootstrap.servers=localhost:9092
compression.type=none
只需要覆写关键变量,在生产消费中指定配置文件后启动即可
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_client_jaas.conf"
kafka-console-consumer.sh --topic testtopic --from-beginning --consumer.config /kafka_2.12-1.1.1/config/consumer.properties --bootstrap-server localhost:9092
kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic --producer.config /kafka_2.12-1.1.1/config/producer.properties
附录
Zookeeper 和 Kafka 之间密码不同会出现的各种情况,同时也是我部署过程中的记录
Client 密码和 Server 不一致,出现陷入 Connected 卡死的情况
kafka
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin123"
password="admin-secret123";
};
zookeeper
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="ZOO_USER"
password="ZOO_PASS"
user_admin="admin-secret";
};
[2021-03-23 08:51:54,219] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-03-23 08:51:54,219] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
三者一致,成功启动
kafka
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};
zookeeper
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="ZOO_USER"
password="ZOO_PASS"
user_admin="admin-secret";
};
success
Server 密码和 Zookeeper 填写的认证信息不一致,提示密码错误
kafka
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin123"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};
zookeeper
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="ZOO_USER"
password="ZOO_PASS"
user_admin="admin-secret";
};
[2021-03-23 08:58:13,323] ERROR [Controller id=1001, targetBrokerId=1001] Connection to node 1001 failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2021-03-23 08:58:13,425] ERROR [Controller id=1001, targetBrokerId=1001] Connection to node 1001 failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)