正常来说,直接官方 Helm 使用 Service + ClusterIP 的在集群的搭建模式会导致分布式多节点的时候无法对集群外部提供服务,我们可以将错误复现一遍

正常模式的错误复现

首先存在一个官方给出的正常 Helm 模版,并且已经都配置好了 Zookeeper 依赖

total 168
drwxr-xr-x 5 root root  4096 Feb 18 11:05 .
drwxr-xr-x 3 root root  4096 Feb 18 10:51 ..
drwxr-xr-x 2 root root  4096 Feb 18 10:51 charts
-rw-r--r-- 1 root root   548 Feb 18 10:51 Chart.yaml
drwxr-xr-x 3 root root  4096 Feb 18 10:51 files
-rw-r--r-- 1 root root   333 Feb 18 10:51 .helmignore
-rw-r--r-- 1 root root 67927 Feb 18 10:51 README.md
-rw-r--r-- 1 root root   227 Feb 18 10:51 requirements.lock
-rw-r--r-- 1 root root   137 Feb 18 10:51 requirements.yaml
drwxr-xr-x 2 root root  4096 Feb 18 10:51 templates
-rw-r--r-- 1 root root 30782 Feb 18 10:51 values-test.yaml
-rw-r--r-- 1 root root 30932 Feb 18 11:02 values.yaml

具体看关键的 listeners 配置

listeners: INTERNAL://:9093,CLIENT://:9092,**OUTSIDE://:9094**
advertisedListeners: INTERNAL://$(MY_POD_NAME).kafka-headless.kafka.svc.cluster.local:9093,CLIENT://$(MY_POD_NAME).kafka-headless.kafka.svc.cluster.local:9092,**OUTSIDE://1.1.1.1:9094**
listenerSecurityProtocolMap: INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,**OUTSIDE:PLAINTEXT**

其实 Kafka 的关键配置相当少,在这里,我们声明了 OUTSIDE 这个对公的 PLAINTEXT 模式,使用了 LoadBalancer 的地址作为监听,并且提交到集群内安装该服务

$ helm install kafka -n kafka-test .
$ kc get pods -n kafka-test
NAME                READY   STATUS             RESTARTS   AGE
kafka-0             0/1     CrashLoopBackOff   4          2m53s
kafka-1             0/1     CrashLoopBackOff   4          2m53s
kafka-2             1/1     Running            1          2m53s
kafka-zookeeper-0   1/1     Running            0          2m53s

很明显是有两个节点没有起来,我们看一下是什么原因

$ kc logs -n kafka-test kafka-0
java.lang.IllegalArgumentException: requirement failed: Configured end points 1.1.1.1:9094 in advertised listeners are already registered by broker 2
        at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3(KafkaServer.scala:456)
        at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3$adapted(KafkaServer.scala:454)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at kafka.server.KafkaServer.createBrokerInfo(KafkaServer.scala:454)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:292)
        at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
[2021-02-18 06:14:58,555] INFO [KafkaServer id=0] shutting down (kafka.server.KafkaServer)

原来是因为另外两个节点进行了重复注册,如果想利用手动修改另外两个 Pod 的配置以求避免错误发生,显然也是不可行的,因为大家的底层都是一个 Statefulsets,只要这个 Pod 被重启,又要手动修改一遍,另外手动创建三个 Kafka Broker,分别向 Zookeeper 进行注册,听起来可行,结构实现太麻烦

配置文件通用修改

这边重新开始实现一遍搭建流程,

$ git clone https://github.com.cnpmjs.org/bitnami/charts.git
$ cp charts/bitnami/kafka . -r
$ cd kafka

配置文件的修改,上面是原本的配置,下面是修改后的配置

Zookeeper 配置,添加 PVC 持久化存储

zookeeper:
  enabled: true 
  auth:
    enabled: false
zookeeper:
  enabled: true
  persistence:
    enabled: true
    accessModes:
      - ReadWriteOnce
    storageClass: "alicloud-disk-essd"
    size: 20Gi
  auth:
    enabled: false

ReplicasCount

replicaCount: 1
replicaCount: 3

方案一:使用 Port-Forward

可以参考这个 https://github.com/confluentinc/cp-helm-charts/issues/351 ( Exposing kafka outside GKE- Kubernetes ),服务搭建完成后使用 port-forward 转发到宿主机上,这个方案也许可行,但是不在我的考虑范围内,因为有不稳定的风险,和未持久化运行的模式,在这里我也没有成功

$ kp -n kafka-test
NAME                READY   STATUS    RESTARTS   AGE
kafka-0             1/1     Running   2          7m
kafka-1             1/1     Running   2          7m
kafka-2             1/1     Running   1          7m
kafka-zookeeper-0   1/1     Running   0          7m21s
$ kubectl port-forward -n kafka-test kafka-0 :9092 --address 0.0.0.0
Forwarding from 0.0.0.0:43504 -> 9092
Handling connection for 43504

$ kafkacat -b grafana.ctirobot.com:43504 -L
% ERROR: Failed to acquire metadata: Local: Timed out

方案二:使用 NodePort

Service 配置,用于对外监听

service:
  type: ClusterIP
  port: 9092
  internalPort: 9093
  externalPort: 9094
  nodePorts:
    client: ""
    external: ""
  loadBalancerSourceRanges: []
  annotations: {}
externalAccess:
  enabled: false
service:
  type: NodePort
  port: 31980
  internalPort: 9093 
  externalPort: 9094 
  nodePorts:
    client: "31980"
    external: "31981"
  loadBalancerSourceRanges: []
  annotations: {}

externalAccess:

现在有一个很重要的地方,就是需要让Kafka监听本机的对外接口,而不是集群内部的端口,因此需要进行进行节点亲和,以保证每个 Pods 即使重启后,也是在固定的机器上

kc get nodes                      
NAME                        STATUS   ROLES    AGE   VERSION
cn-shenzhen.172.16.10.115   Ready    <none>   87d   v1.16.9-aliyun.1
cn-shenzhen.172.16.20.193   Ready    <none>   87d   v1.16.9-aliyun.1
cn-shenzhen.172.16.20.194   Ready    <none>   87d   v1.16.9-aliyun.1
cn-shenzhen.172.16.20.195   Ready    <none>   87d   v1.16.9-aliyun.1
cn-shenzhen.172.16.20.199   Ready    <none>   87d   v1.16.9-aliyun.1
$ kc label nodes cn-shenzhen.172.16.20.195 runapp=kafka
$ kc label nodes cn-shenzhen.172.16.20.193 runapp=kafka
$ kc label nodes cn-shenzhen.172.16.20.194 runapp=kafka

修改监听配置,注意避免重复注册,先检查是否加入了 Downward Api 的读取,没有的话我们自己写进去,关于该 API可以看官方的文档

https://kubernetes.io/zh/docs/tasks/inject-data-application/downward-api-volume-expose-pod-information/

templates/statefulset.yaml

env:
	- name: BITNAMI_DEBUG
    value: {{ ternary "true" "false" .Values.image.debug | quote }}
  - name: MY_HOST_IP
    valueFrom:
	    fieldRef:
	      fieldPath: status.hostIP
  - name: MY_POD_IP
    valueFrom:
	    fieldRef:
	      fieldPath: status.podIP
	- name: MY_POD_NAME
    valueFrom:
	    fieldRef:
	      fieldPath: metadata.name
$ kc exec -it -n kafka-test kafka-0 -- bash    
I have no name!@kafka-0:/$ echo $MY_HOST_IP
172.16.20.195

value.yaml

listeners: INTERNAL://:9093,CLIENT://:9092
advertisedListeners: INTERNAL://$(MY_POD_NAME).kafka-headless.kafka.svc.cluster.local:9093,CLIENT://$(MY_HOST_IP):31980
listenerSecurityProtocolMap: INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT

Service

这里有一个坑, externalTrafficPolicy 这个参数,如果设置为 Cluster 代表隐藏了客户端源 IP,可能导致第二跳到另一个节点,而 Local 保留客户端源 IP 并避免 LoadBalancer 和 NodePort 类型服务的第二跳,如果设置为 Cluster 模式,会导致节点频繁断连,无法收到消息

安装依赖并观察

$ helm dependency update
$ helm install kafka -n kafka-test .
$ kp -n kafka-test
NAME                READY   STATUS    RESTARTS   AGE
kafka-0             1/1     Running   9          20h
kafka-1             1/1     Running   9          20h
kafka-2             1/1     Running   0          20h
kafka-zookeeper-0   1/1     Running   0          20h

找出 Pods 所在的 Node,并进行节点选择

$ curl -sL s.sxueck.link/l/k8s/pnode | bash -s -- kafka-test
kafka-0 / cn-shenzhen.172.16.20.195 / Running
kafka-1 / cn-shenzhen.172.16.20.193 / Running
kafka-2 / cn-shenzhen.172.16.20.194 / Running
kafka-zookeeper-0 / cn-shenzhen.172.16.20.199 / Running

找出 nodeSelector 字段

nodeSelector:
  runapp: kafka

再次更新并检查是否运行在同一个 Node 上

$ helm upgrade kafka -n kafka-test .
$ curl -sL s.sxueck.link/l/k8s/pnode | bash -s -- kafka-test
kafka-0 / cn-shenzhen.172.16.20.195 / Running
kafka-1 / cn-shenzhen.172.16.20.193 / Running
kafka-2 / cn-shenzhen.172.16.20.194 / Running
kafka-zookeeper-0 / cn-shenzhen.172.16.20.199 / Running

收发验证

Consumer

$ kafkacat -b 172.16.20.194:31980 -C -t testtopic1

Productor

$ kafkacat -b 172.16.20.194:31980,172.16.20.193:31980,172.16.20.195:31980 -P -t testtopic1

完成后的结果