正常来说,直接官方 Helm 使用 Service + ClusterIP 的在集群的搭建模式会导致分布式多节点的时候无法对集群外部提供服务,我们可以将错误复现一遍
正常模式的错误复现
首先存在一个官方给出的正常 Helm 模版,并且已经都配置好了 Zookeeper 依赖
total 168
drwxr-xr-x 5 root root 4096 Feb 18 11:05 .
drwxr-xr-x 3 root root 4096 Feb 18 10:51 ..
drwxr-xr-x 2 root root 4096 Feb 18 10:51 charts
-rw-r--r-- 1 root root 548 Feb 18 10:51 Chart.yaml
drwxr-xr-x 3 root root 4096 Feb 18 10:51 files
-rw-r--r-- 1 root root 333 Feb 18 10:51 .helmignore
-rw-r--r-- 1 root root 67927 Feb 18 10:51 README.md
-rw-r--r-- 1 root root 227 Feb 18 10:51 requirements.lock
-rw-r--r-- 1 root root 137 Feb 18 10:51 requirements.yaml
drwxr-xr-x 2 root root 4096 Feb 18 10:51 templates
-rw-r--r-- 1 root root 30782 Feb 18 10:51 values-test.yaml
-rw-r--r-- 1 root root 30932 Feb 18 11:02 values.yaml
具体看关键的 listeners 配置
listeners: INTERNAL://:9093,CLIENT://:9092,**OUTSIDE://:9094**
advertisedListeners: INTERNAL://$(MY_POD_NAME).kafka-headless.kafka.svc.cluster.local:9093,CLIENT://$(MY_POD_NAME).kafka-headless.kafka.svc.cluster.local:9092,**OUTSIDE://1.1.1.1:9094**
listenerSecurityProtocolMap: INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,**OUTSIDE:PLAINTEXT**
其实 Kafka 的关键配置相当少,在这里,我们声明了 OUTSIDE 这个对公的 PLAINTEXT 模式,使用了 LoadBalancer 的地址作为监听,并且提交到集群内安装该服务
$ helm install kafka -n kafka-test .
$ kc get pods -n kafka-test
NAME READY STATUS RESTARTS AGE
kafka-0 0/1 CrashLoopBackOff 4 2m53s
kafka-1 0/1 CrashLoopBackOff 4 2m53s
kafka-2 1/1 Running 1 2m53s
kafka-zookeeper-0 1/1 Running 0 2m53s
很明显是有两个节点没有起来,我们看一下是什么原因
$ kc logs -n kafka-test kafka-0
java.lang.IllegalArgumentException: requirement failed: Configured end points 1.1.1.1:9094 in advertised listeners are already registered by broker 2
at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3(KafkaServer.scala:456)
at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3$adapted(KafkaServer.scala:454)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at kafka.server.KafkaServer.createBrokerInfo(KafkaServer.scala:454)
at kafka.server.KafkaServer.startup(KafkaServer.scala:292)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
[2021-02-18 06:14:58,555] INFO [KafkaServer id=0] shutting down (kafka.server.KafkaServer)
原来是因为另外两个节点进行了重复注册,如果想利用手动修改另外两个 Pod 的配置以求避免错误发生,显然也是不可行的,因为大家的底层都是一个 Statefulsets,只要这个 Pod 被重启,又要手动修改一遍,另外手动创建三个 Kafka Broker,分别向 Zookeeper 进行注册,听起来可行,结构实现太麻烦
配置文件通用修改
这边重新开始实现一遍搭建流程,
$ git clone https://github.com.cnpmjs.org/bitnami/charts.git
$ cp charts/bitnami/kafka . -r
$ cd kafka
配置文件的修改,上面是原本的配置,下面是修改后的配置
Zookeeper 配置,添加 PVC 持久化存储
zookeeper:
enabled: true
auth:
enabled: false
zookeeper:
enabled: true
persistence:
enabled: true
accessModes:
- ReadWriteOnce
storageClass: "alicloud-disk-essd"
size: 20Gi
auth:
enabled: false
ReplicasCount
replicaCount: 1
replicaCount: 3
方案一:使用 Port-Forward
可以参考这个 https://github.com/confluentinc/cp-helm-charts/issues/351 ( Exposing kafka outside GKE- Kubernetes ),服务搭建完成后使用 port-forward 转发到宿主机上,这个方案也许可行,但是不在我的考虑范围内,因为有不稳定的风险,和未持久化运行的模式,在这里我也没有成功
$ kp -n kafka-test
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 2 7m
kafka-1 1/1 Running 2 7m
kafka-2 1/1 Running 1 7m
kafka-zookeeper-0 1/1 Running 0 7m21s
$ kubectl port-forward -n kafka-test kafka-0 :9092 --address 0.0.0.0
Forwarding from 0.0.0.0:43504 -> 9092
Handling connection for 43504
$ kafkacat -b grafana.ctirobot.com:43504 -L
% ERROR: Failed to acquire metadata: Local: Timed out
方案二:使用 NodePort
Service 配置,用于对外监听
service:
type: ClusterIP
port: 9092
internalPort: 9093
externalPort: 9094
nodePorts:
client: ""
external: ""
loadBalancerSourceRanges: []
annotations: {}
externalAccess:
enabled: false
service:
type: NodePort
port: 31980
internalPort: 9093
externalPort: 9094
nodePorts:
client: "31980"
external: "31981"
loadBalancerSourceRanges: []
annotations: {}
externalAccess:
现在有一个很重要的地方,就是需要让Kafka监听本机的对外接口,而不是集群内部的端口,因此需要进行进行节点亲和,以保证每个 Pods 即使重启后,也是在固定的机器上
kc get nodes
NAME STATUS ROLES AGE VERSION
cn-shenzhen.172.16.10.115 Ready <none> 87d v1.16.9-aliyun.1
cn-shenzhen.172.16.20.193 Ready <none> 87d v1.16.9-aliyun.1
cn-shenzhen.172.16.20.194 Ready <none> 87d v1.16.9-aliyun.1
cn-shenzhen.172.16.20.195 Ready <none> 87d v1.16.9-aliyun.1
cn-shenzhen.172.16.20.199 Ready <none> 87d v1.16.9-aliyun.1
$ kc label nodes cn-shenzhen.172.16.20.195 runapp=kafka
$ kc label nodes cn-shenzhen.172.16.20.193 runapp=kafka
$ kc label nodes cn-shenzhen.172.16.20.194 runapp=kafka
修改监听配置,注意避免重复注册,先检查是否加入了 Downward Api 的读取,没有的话我们自己写进去,关于该 API可以看官方的文档
templates/statefulset.yaml
env:
- name: BITNAMI_DEBUG
value: {{ ternary "true" "false" .Values.image.debug | quote }}
- name: MY_HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: MY_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
$ kc exec -it -n kafka-test kafka-0 -- bash
I have no name!@kafka-0:/$ echo $MY_HOST_IP
172.16.20.195
value.yaml
listeners: INTERNAL://:9093,CLIENT://:9092
advertisedListeners: INTERNAL://$(MY_POD_NAME).kafka-headless.kafka.svc.cluster.local:9093,CLIENT://$(MY_HOST_IP):31980
listenerSecurityProtocolMap: INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT
Service
这里有一个坑, externalTrafficPolicy
这个参数,如果设置为 Cluster
代表隐藏了客户端源 IP,可能导致第二跳到另一个节点,而 Local
保留客户端源 IP 并避免 LoadBalancer 和 NodePort 类型服务的第二跳,如果设置为 Cluster 模式,会导致节点频繁断连,无法收到消息
安装依赖并观察
$ helm dependency update
$ helm install kafka -n kafka-test .
$ kp -n kafka-test
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 9 20h
kafka-1 1/1 Running 9 20h
kafka-2 1/1 Running 0 20h
kafka-zookeeper-0 1/1 Running 0 20h
找出 Pods 所在的 Node,并进行节点选择
$ curl -sL s.sxueck.link/l/k8s/pnode | bash -s -- kafka-test
kafka-0 / cn-shenzhen.172.16.20.195 / Running
kafka-1 / cn-shenzhen.172.16.20.193 / Running
kafka-2 / cn-shenzhen.172.16.20.194 / Running
kafka-zookeeper-0 / cn-shenzhen.172.16.20.199 / Running
找出 nodeSelector
字段
nodeSelector:
runapp: kafka
再次更新并检查是否运行在同一个 Node 上
$ helm upgrade kafka -n kafka-test .
$ curl -sL s.sxueck.link/l/k8s/pnode | bash -s -- kafka-test
kafka-0 / cn-shenzhen.172.16.20.195 / Running
kafka-1 / cn-shenzhen.172.16.20.193 / Running
kafka-2 / cn-shenzhen.172.16.20.194 / Running
kafka-zookeeper-0 / cn-shenzhen.172.16.20.199 / Running
收发验证
Consumer
$ kafkacat -b 172.16.20.194:31980 -C -t testtopic1
Productor
$ kafkacat -b 172.16.20.194:31980,172.16.20.193:31980,172.16.20.195:31980 -P -t testtopic1
完成后的结果