前言

Kured(Kubernetes Reboot Daemon)是一个针对 Kubernetes 的守护程序,其功能是在底层操作系统的软件包管理系统指示需要重新启动时,进行安全的自动节点重启

该守护程序通过监视重启标志文件(例如 /var/run/reboot-required)或监测特定的哨兵命令是否成功运行来确定是否需要进行重启操作。它会持续不断地观察这些指示,并在需要时采取相应的行动

为了确保在进行节点重启时的平稳运行,Kured 利用了 Kubernetes 的 APIServer 中的锁机制,以保证一次只有一个节点进行重启。这种锁机制确保了在集群中同一时间只有一个节点被重启,并避免了重启操作之间的冲突

此外,Kured 还提供了一些可选功能,例如在存在活动的 Prometheus 警报或特定的 Pod 时,可以推迟执行重启操作。这样可以避免在系统处于故障状态或关键服务正在运行时进行重启,从而确保整个集群的稳定性和可用性

总体来说,这个项目整体难度不算太高,通过对这个项目的二开可以加深对于集群的理解,具体的代码可以参考 https://github.com/kubereboot/kured ,截止目前,最新版本为 1.15.1

Kured 代码流程简述

主流程

  1. 它的 flags 使用了 cobra 库,同时在 Run 形参里面声明了 Root 函数为主要的执行函数,注意不要被 Execute 的调用所疑惑了

  2. window, err := timewindow.New(rebootDays, rebootStart, rebootEnd, timezone) 这里使用了从 flags 中声明的重启时间,其中的 rebootDays 根据上下文来看,是个 []string 类型,推测估计是直接填写日期字符串

  3. buildHostCommand(1, restartCommand) 这个方法实际上是将外部的宿主机的控制权限映射到了内部的容器控制点(当然必须事先声明 hostPID:trueprivileged:true这两个选项允许容器进程获取额外的权限),如果我们进入函数内部进行查看,可以看到它是使用了 Linux 的 Nsenter 命令,这个命令会进入宿主机的命名空间(特别是在这里指定了 -m 选项,与之和宿主机的 PID 为 1 的进程做挂载)。这样,随后追加的 command 就会在宿主机上执行,而不是在容器内部

    func buildHostCommand(pid int, command []string) []string {
    
        // From the container, we nsenter into the proper PID to run the hostCommand.
        // For this, kured daemonset need to be configured with hostPID:true and privileged:true
        cmd := []string{"/usr/bin/nsenter", fmt.Sprintf("-m/proc/%d/ns/mnt", pid), "--"}
        cmd = append(cmd, command...)
        return cmd
    }
    
  4. 而在 Root 函数中,剩余的代码作用于设置和触发节点重启的逻辑,代码这里展示了两种重启的策略方法

    • 命令行重启: 即 MethodCommand,通过 reboot.NewCommandReboot 函数创建一个 CommandReboot 类型的对象。这个对象封装了执行重启所需的一系列命令行指令
    • 信号重启:即 MethodSignal,通过 reboot.NewSignalReboot 函数创建一个 SignalReboot 类型的对象。这个对象封装了发送重启信号的逻辑
  5. 目前我们已经初步了解到了这个服务的一些附属的初始化设置,接下来的 rebootAsRequired 方法才是整个服务较为困难和复杂的逻辑,它涉及到与 Kubernetes API 的交互、节点锁定机制、调度窗口检查、以及执行实际的重启操作

  6. 首先是常规的 client-go 的客户端初始化,这里直接使用 InClusterConfig 方法拿取集群内部的调用配置,所以部署该服务的时候,需要注意一下 RBAC 权限的绑定

  7. 接下来使用 daemonsetlock.New 创建一个锁,以防止同一时间多个实例对同一节点执行重启操作,这个锁是基于一个 DaemonSet 和一个特定的注解来实现的,如果我们没有使用外部传参进行配置的修改,默认的注解为 KuredNodeLockAnnotation string = "weave.works/kured-node-lock",也就是 main 函数文件最前面的声明,当然这个锁更详细的实现由于和主流程的完整阅读没有太大关系,我们暂时先进行跳过这一部分

  8. 接下来代码创建了一个 1 分钟的 Ticker,每隔一个周期间隔,都会执行逻辑:

    for range tick {
    	if holding(lock, &nodeMeta, concurrency > 1) {
    		node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
    		if err != nil {
    			log.Errorf("Error retrieving node object via k8s API: %v", err)
    			continue
    		}
    		if !nodeMeta.Unschedulable {
    			err = uncordon(client, node)
    			if err != nil {
    				log.Errorf("Unable to uncordon %s: %v, will continue to hold lock and retry uncordon", node.GetName(), err)
    				continue
    			} else {
    				if notifyURL != "" {
    					if err := shoutrrr.Send(notifyURL, fmt.Sprintf(messageTemplateUncordon, nodeID)); err != nil {
    						log.Warnf("Error notifying: %v", err)
    					}
    				}
    			}
    		}
    		// If we're holding the lock we know we've tried, in a prior run, to reboot
    		// So (1) we want to confirm that the reboot succeeded practically ( !rebootRequired() )
    		// And (2) check if we previously annotated the node that it was in the process of being rebooted,
    		// And finally (3) if it has that annotation, to delete it.
    		// This indicates to other node tools running on the cluster that this node may be a candidate for maintenance
    		if annotateNodes && !rebootRequired(sentinelCommand) {
    			if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; ok {
    				err := deleteNodeAnnotation(client, nodeID, KuredRebootInProgressAnnotation)
    				if err != nil {
    					continue
    				}
    			}
    		}
    		throttle(releaseDelay)
    		release(lock, concurrency > 1)
    		break
    	} else {
    		break
    	}
    }
    
    1. holding方法会检查调用者也就是我们的 Pod 是否持有锁,注意此时这个循环里的全部操作都是基于我们已经拿到了或者确认了这个锁,也就是某一个 DaemonSet 的 Pod 在执行对应操作
    2. 在持有锁的前提下,获取节点相关信息,如果节点处于可供调度的情况(!nodeMeta.Unschedulable),则开始尝试 “锁住” 这个节点,禁止 Pod 再被调度到该节点上面
    3. 与此同时,循环中剩下的代码的作用是在节点重启流程中进行状态管理,具体来说,它负责确认节点是否成功地重启过,并且更新节点的注解以反映它的最新状态
    4. 当发现节点存在重启的注释,且同时并没有发现 rebootSentinelFile 的时候,代码会认为节点已经被重启完成,并删除正在节点正在重启节点的注解,这个注释在节点开始重启流程之前被添加,以标记该节点正在重启过程中
    5. 释放锁,这里锁的释放可能依赖于并发级别。如果 concurrency > 1,这意味着锁是可以被多个实例共享的,那么解锁的行为可能会有所不同,比如只减少一个引用计数而不是完全释放锁
  9. 在后续的代码中,如果检测到节点的 RebootRequire 为 false 又或者不在重启窗口中,则会调用 preferNoScheduleTaint.Disable() 方法来移除之前设置的污点,允许 Pod 正常调度到这个节点上,截止到目前的代码中,我们暂时还没有对污点进行除了删除外的其他操作

  10. 如果说上面的循环负责处理调度的逻辑,那么下面这个循环则是处理 Kubernetes 节点的重启逻辑,两个循环组成了整个服务的主要流程

    for range tick {
            ...
            nodeMeta.Unschedulable = node.Spec.Unschedulable
    
            var timeNowString string
            if annotateNodes {
                if _, ok := node.Annotations[KuredRebootInProgressAnnotation]; !ok {
                    timeNowString = time.Now().Format(time.RFC3339)
                    // Annotate this node to indicate that "I am going to be rebooted!"
                    // so that other node maintenance tools running on the cluster are aware that this node is in the process of a "state transition"
                    annotations := map[string]string{KuredRebootInProgressAnnotation: timeNowString}
                    // & annotate this node with a timestamp so that other node maintenance tools know how long it's been since this node has been marked for reboot
                    annotations[KuredMostRecentRebootNeededAnnotation] = timeNowString
                    err := addNodeAnnotations(client, nodeID, annotations)
                    if err != nil {
                        continue
                    }
                }
            }
    
            var blockCheckers []RebootBlocker
            if prometheusURL != "" {
                blockCheckers = append(blockCheckers, PrometheusBlockingChecker{promClient: promClient, filter: alertFilter, firingOnly: alertFiringOnly, filterMatchOnly: alertFilterMatchOnly})
            }
            if podSelectors != nil {
                blockCheckers = append(blockCheckers, KubernetesBlockingChecker{client: client, nodename: nodeID, filter: podSelectors})
            }
    
            var rebootRequiredBlockCondition string
            if rebootBlocked(blockCheckers...) {
                rebootRequiredBlockCondition = ", but blocked at this time"
                continue
            }
            log.Infof("Reboot required%s", rebootRequiredBlockCondition)
            ...
    
            err = drain(client, node)
            if err != nil {
                if !forceReboot {
                ...
                }
            }
    
    1. 和上一步骤一样,在经过了重启窗口和重启条件检查后,代码会对节点附加一个 map,格式为 map[string]string{KuredRebootInProgressAnnotation: timeNowString}
    2. 接下来会初始化一个 RebootBlocker 类型的切片,这个切片会包含基于 Prometheus 或者基于 Kubernetes 的阻塞检查器,用于检查是否有条件阻止节点重启,当出现警报等信息的时候,会根据规则取消或者推迟重启任务
    3. 此时代码会检查节点是否已经持有锁,或者尝试获取锁。锁被用来确保在高并发环境下操作不会冲突,如果不能持有也不能获取锁,那么会启用 PreferNoSchedule 污点且跳过本次循环
    if !holding(lock, &nodeMeta, concurrency > 1) && !acquire(lock, &nodeMeta, TTL, concurrency) {
        // Prefer to not schedule pods onto this node to avoid draing the same pod multiple times.
        preferNoScheduleTaint.Enable()
        continue
    }
    
    1. 而最后的 booter.Reboot() 为 reboot.Reboot 类型的接口实现,传入 CommandRebootMethod 或者 SignalRebootMethod 实现可以根据需要实现不同的重启方式

    Booter

细节实现

锁的实现

  1. DaemonSetLock 是一个经过结构体封装的锁(下面简称 DSLock),用于表示 DaemonSet 的锁定状态,它的内部封装了几个主要属性:
    • name:DaemonSet 的名称
    • namespace:DaemonSet 所在的命名空间
    • annotation:用于标记锁定状态的注解键
    • nodeID:标识请求锁定的节点的ID
    • client:Kubernetes 客户端,用于与API服务器通信
  2. GetDaemonSet 这个方法在锁的获取中是一个很重要的环节,无论是单节点锁还是多节点锁都能在方法中看到它的身影,它会不断轮询试图在指定的超时时间内获取一个 DaemonSet 资源,在原本的代码中使用了 wait.PollImmediate 方法,其实可以修改为 wait.PollUntilContextTimeout 会更优雅一点,同时也避免了后续的不兼容,传参 Sleep 和 Timeout 分别代表了轮询间隔和超时时间
  3. 当拿到 DaemonSet 的节点资源后,代码检查 DaemonSet 的元数据注解中是否存在锁定信息。如果存在并且锁还未过期,检查锁是否被当前节点持有。反之如果不存在锁定信息,或者锁已经过期,则尝试创建新的锁定信息(包括节点ID、元数据、创建时间和TTL),并将其序列化为字符串后附加为注解
  4. 所以其实这个锁就是一个结构体的字符串序列化,通过注解的方式就可以简单实现所有权获取等操作