本篇文章使用了类似 “回收站” 的方式,对于被删除的资源进行了一个备份操作,使得出现误操作或者想找回的时候,有效减少恢复时间

相关的项目代码已经开源至 Github,当前支持 K8s 1.24+ 版本

思路

整体手段其实非常简单,我们使用了 Informer 的机制,通过 Watch 到相关资源的方式,当触发 “Delete” 操作的时候,将 YAML 正确抓取出来并上传到存储空间即可,具体流程可以查看下面的流程图

+------------------+     +------------------+     +------------------+
|                  |     |                  |     |                  |
|  配置加载 (init) |---->| 创建 K8s 客户端  |---->|  创建 S3 客户端  |
|                  |     |                  |     |                  |
+------------------+     +------------------+     +------------------+
                                  |
                                  v
+------------------+     +------------------+     +------------------+
|                  |     |                  |     |                  |
|   监听终止信号   |<----| 启动资源监控器   |---->| 创建工作队列处理 |
|                  |     |                  |     |                  |
+------------------+     +------------------+     +------------------+
        |                        |                        |
        |                        v                        v
        |               +------------------+     +------------------+
        |               |                  |     |                  |
        |               | 监听资源删除事件 |---->|  将资源放入队列  |
        |               |                  |     |                  |
        |               +------------------+     +------------------+
        |                                                |
        |                                                v
        |                                      +------------------+
        |                                      |                  |
        |                                      | 从队列获取资源   |
        |                                      |                  |
        |                                      +------------------+
        |                                                |
        |                                                v
        |                                      +------------------+
        |                                      |                  |
        |                                      |  上传到 S3 存储  |
        |                                      |                  |
        |                                      +------------------+
        |
        v
+------------------+
|                  |
|    程序退出      |
|                  |
+------------------+
  1. 程序启动时加载配置
  2. 创建 Kubernetes 客户端连接
  3. 创建 S3 存储客户端连接
  4. 启动资源监控器,监听资源删除事件
  5. 当资源被删除时,将资源信息放入工作队列
  6. 工作队列处理器从队列中获取资源信息
  7. 将资源 YAML 上传到 S3 存储
  8. 程序接收到终止信号时优雅退出

理论存在,实践特别容易,就不需要展开赘述,这里分享一个小技巧,在进行 K8s 编程的时候也能经常用到

如何获取非标准资源的 “句柄”

我们都知道,Client-Go 对于标准的 K8s 资源都提供了一套 CURD 操作方法,利用其提供的接口我们能非常轻松地进行进阶资源处理,那么,对于 CRD 这种无法被预先定义的资源,应该怎么办呢?

这里就需要使用到 Dynamic Client,它是 Kubernetes 提供的一种通用客户端机制,其具备 类型无关性运行时发现 的核心特点,不需要预定义资源的 Go 结构体、可以处理任何 Kubernetes 资源等表现使得其特别适合处理 CRD 和动态资源

在架构层面,Dynamic Client 构建在 Kubernetes 的发现接口和 RESTMapper 系统之上。当客户端初始化时,会通过 API Server 的 discovery API 动态获取当前集群支持的资源类型信息,建立 GroupVersionResource 到具体 API 路径的映射关系。这种延迟绑定机制使得客户端能够自动适配集群环境,即使后续有新的 CRD 被创建,也无需重新编译客户端代码

具体到操作层面,Dynamic Client 使用 unstructured.Unstructured 类型作为数据载体(在下面的代码中也能找到对应使用)。这种特殊结构体采用嵌套的 map[string]interface{} 结构来存储资源数据,配合 JSON 标签系统实现与 API Server 的序列化/反序列化交互

在发起 API 请求时,客户端会根据传入的 GroupVersionResource 动态构造对应的 HTTP 请求路径,这种路径构造逻辑通过 APIPathResolverFunc 实现,能够正确处理核心 API 组(如 /api/v1)和扩展 API 组(如 /apis/batch/v1)的不同路径规则

命名空间处理机制采用双重接口设计,NamespaceableResourceInterface 通过组合 ResourceInterface 并添加 Namespace() 方法,实现了作用域的动态切换。这种设计模式使得客户端既能处理命名空间作用域的资源(如 Deployment),也能处理集群作用域的资源(如 Node),在运行时根据资源类型描述自动选择正确的 API 路径

底层通信机制上,Dynamic Client 复用 Kubernetes 标准客户端的 RESTClient 实现,但增加了动态类型适配层。在发起具体操作时,会通过动态生成的 RESTMapper 将 GroupVersionResource 转换为具体的 API 端点路径,同时处理 API 版本协商和资源状态转换(意味着需要正确处理提供 GVK )。对于 Watch 等长连接操作,客户端实现了基于 chunk 的流式处理机制,确保能够高效处理大规模资源变更事件

这种动态客户端架构虽然牺牲了编译时类型安全检查,但为 Kubernetes 生态系统的扩展性提供了重要支撑。它使得开发通用控制器、运维工具等场景无需绑定具体资源类型,特别适合需要处理未知或动态注册资源类型的应用场景。这里有一点可以放心,在实际性能表现上,由于避免了反射操作且直接使用底层 REST 传输,其效率与强类型客户端基本持平,仅在反序列化阶段存在微小差异

Examples

/ runInformer Runs informer for the specified resource
// Here we need to pass in a queue for asynchronous processing, to avoid the process of blocking the cluster due to i/o problems
func runInformer(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, asyncQueue workqueue.RateLimitingInterface) error {
	var informer cache.SharedIndexInformer
	err := retry.OnError(retry.DefaultRetry, func(err error) bool {
		_, isNotFound := err.(*errors.StatusError)
		return isNotFound
	}, func() error {
		informer = createDynamicInformer(dynamicClient, gvr)
		_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
			DeleteFunc: func(obj interface{}) {
				u := ExtractAboutKeyInformation(obj)
				log.Infof("Record Delete Events -  %s %s %s\n", u.Namespace, u.Kind, u.Name)
				asyncQueue.Add(u)
			},
		})
		return err
	})

	if err != nil {
		return fmt.Errorf("failed to create informer for %s after retries: %w", gvr.String(), err)
	}

	go informer.Run(ctx.Done())

	// Add a timeout for cache synchronization
	syncCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
	defer cancel()

	if !cache.WaitForCacheSync(syncCtx.Done(), informer.HasSynced) {
		log.Warnf("Failed to sync cache for %s, continuing without full synchronization", gvr.String())
		log.Warnf("Detailed cache sync failure for %s: %v", gvr.String(), syncCtx.Err())
	}

	//<-ctx.Done()
	//log.Infof("Context done, stopping informer for %s", gvr.String())
	return nil
}

// gvkToGVR Convert GroupVersionKind to GroupVersionResource
func gvkToGVR(discoveryClient *discovery.DiscoveryClient, gvk schema.GroupVersionKind) (schema.GroupVersionResource, error) {
	apiResources, err := discoveryClient.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
	if err != nil {
		return schema.GroupVersionResource{}, fmt.Errorf("error getting server resources for group version %s: %w", gvk.GroupVersion().String(), err)
	}

	for _, apiResource := range apiResources.APIResources {
		if apiResource.Kind == gvk.Kind {
			return schema.GroupVersionResource{
				Group:    gvk.Group,
				Version:  gvk.Version,
				Resource: apiResource.Name,
			}, nil
		}
	}

	return schema.GroupVersionResource{}, fmt.Errorf("no resource found for GVK %s", gvk)
}

func createDynamicInformer(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
				return dynamicClient.Resource(gvr).Namespace(v1.NamespaceAll).List(context.TODO(), options)
			},
			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
				return dynamicClient.Resource(gvr).Namespace(v1.NamespaceAll).Watch(context.TODO(), options)
			},
		},
		&unstructured.Unstructured{},
		informerReSyncPeriod,
		cache.Indexers{},
	)
}