要求说明:

Kafka Brokers 不管是互相通信,对外认证,还是和 Zookeeper 的交互进行,都需要使用 SASL/PLAIN 进行认证

思路:

Kafka 的 bin 目录下面,包含了 Zookeeper 和 Kafka Server 的启动脚本,我们只需要在脚本运行之前,通过环境变量 KAFKA_OPTS 指定对应的 JAAS 认证文件,最后启动 properties 文件即可,理清了整个环节之后还是非常简单的

操作开始

目录结构

/images/kafka-sasl/Untitled.png

首先需要编写 Dockerfile 作为环境变量声明的地方,不然配置文件就没法使用模板进行编排了

FROM openjdk:8-jre-slim

RUN set -eux; \
    apt -y update && apt -y install procps; \
    apt clean

WORKDIR /

ENV STORE_DATA="/kafka_2.12-1.1.1/store/data" \
    STORE_LOGS="/kafka_2.12-1.1.1/store/logs" \
    ZOO_USER="ZookeeperUsername" \
    ZOO_PASS="ZookeeperPassword" \
    ZOO_QUORUM_USER="QuorumUserName" \
    ZOO_QUORUM_PASS="QuorumUserPassword" \
    KAFKA_USER="KafkaUsername" \
    KAFKA_PASS="KafkaPassword" \
    KAFKA_CLIENT_BROKER_USER="KafkaClientBrokerUsername" \
    KAFKA_CLIENT_BROKER_PASS="KafkaClientBrokerPassword" \
    ZOOKEEPER_CONNECT="localhost:2181" \
    MODE="" \
    # create multiple users with different permissions
    # operation attribute: Read Write *
    # KAFKA_ACCOUNT_GROUP "user1|password|* readonlyuser|password|Read"
    KAFKA_ACCOUNT_GROUP="" \
    # each pod contains a zk and a kafka
    # so their number of nodes and information should be the same
    CLUSTER_NODE=""
# CLUSTERNODE "node0|0|127.0.0.1 node1|1|127.0.0.1"

ENV PATH=$PATH:/kafka_2.12-1.1.1/bin \
    ZOO_DATA_DIR="$STORE_DATA/zookeeper/data" \
    ZOO_MYID_FILE="$STORE_DATA/zookeeper/data/myid" \
    ZOO_DATA_LOG_DIR="$STORE_LOGS/zookeeper/logs/dataLogs" \
    ZOO_LOG_DIR="$STORE_LOGS/zookeeper/logs" \
    SERVER_JVMFLAGS="-Djava.security.auth.login.config=/kafka_2.12-1.1.1/config/zoo_server_jaas.conf" \
    KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-1.1.1/config/kafka_server_jaas.conf" \
    ZOO_CFG="/kafka_2.12-1.1.1/config/zookeeper.properties" \
    Z00_AUTH="/kafka_2.12-1.1.1/config/zoo_server_jaas.conf" \
    KAFKA_CFG="/kafka_2.12-1.1.1/config/server.properties" \
    KAFKA_AUTH="/kafka_2.12-1.1.1/config/kafka_server_jaas.conf" \
    KAFKA_DATA_LOG_DIR="$STORE_LOGS/kafka/logs/dataLogs"
ENV ENV_CONFIGRUE="ZOO_DATA_DIR ZOO_DATA_LOG_DIR ZOO_USER ZOO_PASS ZOO_QUORUM_USER ZOO_QUORUM_PASS KAFKA_USER KAFKA_PASS KAFKA_CLIENT_BROKER_USER KAFKA_CLIENT_BROKER_PASS KAFKA_DATA_LOG_DIR ZOOKEEPER_CONNECT" \
    ENV_CONFIGFILE="ZOO_CFG Z00_AUTH KAFKA_CFG KAFKA_AUTH" \
    POD_NAME=""

ADD kafka_2.12-1.1.1.tgz .
COPY zookeeper.properties server.properties kafka_server_jaas.conf zoo_server_jaas.conf /kafka_2.12-1.1.1/config/

RUN set -eux;\
    groupadd -r kafka --gid=1000; \
    useradd -r -g kafka --uid=1000 kafka; \
    mkdir -p "$ZOO_DATA_DIR" "$ZOO_DATA_LOG_DIR" "$ZOO_LOG_DIR" ; \
    chown -R kafka:kafka /kafka_2.12-1.1.1/

COPY --chown=root:root  docker-entrypoint.sh pre-stop.sh /
USER kafka:kafka
CMD ["/bin/bash", "/docker-entrypoint.sh"]

这里声明了我们需要被用到的一些通用环境变量,还有镜像的一个构建方式

接着编写我们的启动配置文件和认证文件,对于 Zookeeper 和 Kafka,都是需要单独配置的

Zookeeper

Zookeeper 中的角色

  • Leader: 负责投票的发起和决议,更新系统状态
  • Learner:包含 Follower 和 Observer
  • Follower:用户接受客户端请求并向客户端返回结果,在选举过程中参与投票
  • Observer:可以接受客户端连接,将写请求转发给 Leader,但 Observer 不参加投票过程,只同步 Leader 的状态,Observer 的目的是为了扩展系统,提高读取速度

zookeeper.properties

clientPort=2181
quorumListenOnAllIPs=true
tickTime=2000
initLimit=5
syncLimit=2
autopurge.purgeInterval=0
autopurge.snapRetainCount=3
maxClientCnxns=60
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
quorum.auth.enableSasl=true # 打开sasl开关, 默认是关的
quorum.auth.learnerRequireSasl=true # ZK做为leaner的时候, 会发送认证信息
quorum.auth.serverRequireSasl=true # 设置为true的时候,learner连接的时候需要发送认证信息,否则拒绝
quorum.auth.learner.loginContext=QuorumLearner # JAAS 配置里面的 Context 名字
quorum.auth.server.loginContext=QuorumServer # JAAS 配置里面的 Context 名字
quorum.cnxn.threads.size=6

dataDir=ZOO_DATA_DIR
dataLogDir=ZOO_DATA_LOG_DIR

Zookeeper 的数据按层次结构组织在文件系统中,并复制到 Ensemble(一个 Zookeeper 服务器的集合) 中所有的 Zookeeper 服务器,对数据的所有操作都是原子和顺序一致性的,而 Ensemble 使用 Zab 协议选举出一个 Leader,并保证所有的写入都能被复制到一个 Quorum,一个 Quorum 表示包含当前领导者的 Ensemble 的多数成员,例如,如果 Ensemble 有 3 个服务器,一个包含领导者的成员和另一个服务器就组成了一个 Quorum。 如果 Ensemble 不能达成一个 Quorum,数据将不能被写入。该机制在集群中的两个作用决定了它可以防止脑裂的出现:

  1. 集群中最少的节点数用来选举Leader保证集群可用
  2. 通知客户端数据已经安全保存前集群中最少数量的节点数已经保存了该数据。一旦这些节点保存了该数据,客户端将被通知已经安全保存了,可以继续其他任务。而集群中剩余的节点将会最终也保存了该数据

另外还有一个关键点,Zookeeper Learner 也是在这里被配置,在 Leader 和 Follower 启动期交互过程后,我们分析到整个集群完成 Leader 选举当过半的 Learner 服务器在 Leader 被注册后,将会进入数据同步环节。

zoo_server_jaas.conf

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="ZOO_USER"
    password="ZOO_PASS"
    user_KAFKA_USER="KAFKA_PASS";
};

QuorumServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_ZOO_QUORUM_USER="ZOO_QUORUM_PASS"
	;
};
QuorumLearner {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="ZOO_QUORUM_USER"
    password="ZOO_QUORUM_PASS"
	;
};

这里需要注意的几个点:

  1. Server 里面的用户添加的格式是 user_USERNAME = PASSWORD ,我这里使用了 Kafka 的用户模板
  2. 对外认证的部分要使用 DigestLoginModule 模块

Kafka

server.properties

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=ZOOKEEPER_CONNECT
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
zookeeper.set.acl=true

listeners=SASL_PLAINTEXT://:9092
#advertised.listeners=SASL_PLAINTEXT://:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin

kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="KAFKA_USER"
    password="KAFKA_PASS"
    user_KAFKA_USER="KAFKA_PASS"
    ;
};
Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="KAFKA_USER"
    password="KAFKA_PASS"
    ;
};

这里 Client 要和 KafkaServer 的 USERNAME 要和 PASSWORD 相同,同时 Server 中也要声明 Client 的认证

docker-entrypoint.sh

#!/bin/bash
function init() {
  for configfileName in $ENV_CONFIGFILE; do
    eval configfile='${'"$configfileName"'}'
    if [ -n "$configfile" ]; then
      for configureName in $ENV_CONFIGRUE; do
        eval configure='${'"$configureName"'}'
        if [ -n "$configure" ]; then
          sed -i "s|$configureName|$configure|g" "$configfile"
        fi
      done
    fi
  done
}

function ObtainKafkaHostName() {
  if [ -n "$POD_NAME" ]; then
    echo "advertised.listeners=SASL_PLAINTEXT://$POD_NAME:9092" >>"$KAFKA_CFG"
  fi
}

function SplitNodeInfoBySelf() {
  status="false"
  local ClusterNode=$1
  if [ -z "$ClusterNode" ]; then
    echo "Please specify information for cluster mode"
    exit 1
  fi

  for x in $ClusterNode; do
    x=${x//|/ }

    local name=$(echo "$x" | awk '{print $1}')
    ID=$(echo "$x" | awk '{print $2}')
    Host=$(echo "$x" | awk '{print $3}')
    if [ "$name" == "${HOSTNAME}" ]; then
      MyID=$ID
      Host="0.0.0.0"
      status="true"
    fi
    echo "server.$ID=$Host:2888:3888" >>"$ZOO_CFG"
  done

  if [ "$status" != "true" ]; then
    echo "Not found this node"
    exit 1
  fi
}

function CreateKafkaPerGroup() {
  if [ -z $KAFKA_ACCOUNT_GROUP ]; then
    return
  fi
  for x in $KAFKA_ACCOUNT_GROUP; do
    read name passwd permission < <(echo "$x" | awk -F'|' '{print $1,$2,$3}')
    infileNum=$(cat "$KAFKA_AUTH" | grep -n "KafkaServer" | awk -F':' '{print $1}')
    sed -i "$(($infileNum + 1)) a user_${name}=${passwd}" "$KAFKA_AUTH"
  done
}

case $MODE in
"single-zookeeper")
  init
  export KAFKA_OPTS="$SERVER_JVMFLAGS"
  zookeeper-server-start.sh "$ZOO_CFG"
  ;;
"single-kafka")
  init
  ObtainKafkaHostName
  CreateKafkaPerGroup
  kafka-server-start.sh "$KAFKA_CFG"
  ;;
"zookeeper")
  init
  export KAFKA_OPTS="$SERVER_JVMFLAGS"
  SplitNodeInfoBySelf "$CLUSTER_NODE"
  echo "$MyID" >"$ZOO_MYID_FILE"
  zookeeper-server-start.sh "$ZOO_CFG"
  ;;
"kafka")
  init
  SplitNodeInfoBySelf "$CLUSTER_NODE"
  echo "broker.id=$MyID" >>"$KAFKA_CFG"
  ObtainKafkaHostName
  kafka-server-start.sh "$KAFKA_CFG"
  ;;
*)
  echo "Not found MODE"
  ;;
esac

echo "server.$zkMyID=$zkHost:2888:3888" >> "$ZOO_CFG"

这句话其实就是 Learner 的配置,2888 端口是 Server 和 Leader 进行交流的端口,3888 端口是 Leader 挂了后,各个 Server 之间进行交流的端口,之要声明了 Learner,我们就能实现一个完整的 Zookeeper 规模集群

启动和验证

Docker-compose.yaml

version: '3'

services:
  single-kafka:
    container_name: single-kafka
    ports: 
      - 9092:9092
    networks:
      - kafka
    depends_on:
      - single-zookeeper
    image: kafka-zookeeper
    environment:
      - KAFKA_HOST_NAME=single-kafka
      - KAFKA_USER=KafkaUsername
      - KAFKA_PASS=KafkaPassword
      - MODE=single-kafka
      - ZOOKEEPER_CONNECT=single-zookeeper:2181
      - KAFKA_ACCOUNT_GROUP=readonly|readonly123|Read

  single-zookeeper:
    container_name: single-zookeeper
    ports: 
      - 2181:2181
    networks:
      - kafka
    image: kafka-zookeeper
    environment: 
      - KAFKA_USER=KafkaUsername
      - KAFKA_PASS=KafkaPassword
      - MODE=single-zookeeper

networks: 
  kafka:
    external: false
$ docker-compose up -d
Creating network "desktop_kafka" with the default driver
Creating single-zookeeper ... done
Creating single-kafka     ... done
$ docker ps -a
CONTAINER ID   IMAGE             COMMAND                  CREATED         STATUS         PORTS     NAMES
4bda02b512ed   kafka-zookeeper   "/bin/bash /docker-e…"   2 seconds ago   Up 2 seconds             single-kafka
6477dbebb8a7   kafka-zookeeper   "/bin/bash /docker-e…"   3 seconds ago   Up 2 seconds             single-zookeeper
[2021-03-24 02:45:12,058] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
[2021-03-24 02:45:12,164] INFO [SocketServer brokerId=1001] Started processors for 1 acceptors (kafka.network.SocketServer)
[2021-03-24 02:45:12,171] INFO Kafka version : 1.1.1 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-24 02:45:12,171] INFO Kafka commitId : 8e07427ffb493498 (org.apache.kafka.common.utils.AppInfoParser)
[2021-03-24 02:45:12,172] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)

验证

我们这边使用自带的 Kafka 脚本进行认证的一个验证

首先需要准备授权的 JAAS 文件,名为 kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="KafkaUsername"
    password="KafkaPassword";
};

接下来分别准备生产者和消费者的配置文件

consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-consumer-group

producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
bootstrap.servers=localhost:9092
compression.type=none

只需要覆写关键变量,在生产消费中指定配置文件后启动即可

export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_client_jaas.conf"
kafka-console-consumer.sh --topic testtopic --from-beginning --consumer.config /kafka_2.12-1.1.1/config/consumer.properties --bootstrap-server localhost:9092
kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic --producer.config /kafka_2.12-1.1.1/config/producer.properties

/images/kafka-sasl/Untitled%201.png

附录

Zookeeper 和 Kafka 之间密码不同会出现的各种情况,同时也是我部署过程中的记录

Client 密码和 Server 不一致,出现陷入 Connected 卡死的情况

kafka
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret";
};
Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin123"
    password="admin-secret123";
};

zookeeper
Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="ZOO_USER"
    password="ZOO_PASS"
    user_admin="admin-secret";
};

[2021-03-23 08:51:54,219] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2021-03-23 08:51:54,219] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)

三者一致,成功启动

kafka
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret";
};
Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="admin-secret";
};

zookeeper
Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="ZOO_USER"
    password="ZOO_PASS"
    user_admin="admin-secret";
};

success

Server 密码和 Zookeeper 填写的认证信息不一致,提示密码错误

kafka
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin123"
    password="admin-secret"
    user_admin="admin-secret";
};
Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="admin-secret";
};

zookeeper
Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="ZOO_USER"
    password="ZOO_PASS"
    user_admin="admin-secret";
};
[2021-03-23 08:58:13,323] ERROR [Controller id=1001, targetBrokerId=1001] Connection to node 1001 failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2021-03-23 08:58:13,425] ERROR [Controller id=1001, targetBrokerId=1001] Connection to node 1001 failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)