一个小思路 - 我是如何实现K8s资源操作备份的
本篇文章使用了类似 “回收站” 的方式,对于被删除的资源进行了一个备份操作,使得出现误操作或者想找回的时候,有效减少恢复时间
相关的项目代码已经开源至 Github,当前支持 K8s 1.24+ 版本
思路
整体手段其实非常简单,我们使用了 Informer 的机制,通过 Watch 到相关资源的方式,当触发 “Delete” 操作的时候,将 YAML 正确抓取出来并上传到存储空间即可,具体流程可以查看下面的流程图
+------------------+ +------------------+ +------------------+
| | | | | |
| 配置加载 (init) |---->| 创建 K8s 客户端 |---->| 创建 S3 客户端 |
| | | | | |
+------------------+ +------------------+ +------------------+
|
v
+------------------+ +------------------+ +------------------+
| | | | | |
| 监听终止信号 |<----| 启动资源监控器 |---->| 创建工作队列处理 |
| | | | | |
+------------------+ +------------------+ +------------------+
| | |
| v v
| +------------------+ +------------------+
| | | | |
| | 监听资源删除事件 |---->| 将资源放入队列 |
| | | | |
| +------------------+ +------------------+
| |
| v
| +------------------+
| | |
| | 从队列获取资源 |
| | |
| +------------------+
| |
| v
| +------------------+
| | |
| | 上传到 S3 存储 |
| | |
| +------------------+
|
v
+------------------+
| |
| 程序退出 |
| |
+------------------+
- 程序启动时加载配置
- 创建 Kubernetes 客户端连接
- 创建 S3 存储客户端连接
- 启动资源监控器,监听资源删除事件
- 当资源被删除时,将资源信息放入工作队列
- 工作队列处理器从队列中获取资源信息
- 将资源 YAML 上传到 S3 存储
- 程序接收到终止信号时优雅退出
理论存在,实践特别容易,就不需要展开赘述,这里分享一个小技巧,在进行 K8s 编程的时候也能经常用到
如何获取非标准资源的 “句柄”
我们都知道,Client-Go 对于标准的 K8s 资源都提供了一套 CURD 操作方法,利用其提供的接口我们能非常轻松地进行进阶资源处理,那么,对于 CRD 这种无法被预先定义的资源,应该怎么办呢?
这里就需要使用到 Dynamic Client,它是 Kubernetes 提供的一种通用客户端机制,其具备 类型无关性
和 运行时发现
的核心特点,不需要预定义资源的 Go 结构体、可以处理任何 Kubernetes 资源等表现使得其特别适合处理 CRD 和动态资源
在架构层面,Dynamic Client 构建在 Kubernetes 的发现接口和 RESTMapper 系统之上。当客户端初始化时,会通过 API Server 的 discovery API 动态获取当前集群支持的资源类型信息,建立 GroupVersionResource 到具体 API 路径的映射关系。这种延迟绑定机制使得客户端能够自动适配集群环境,即使后续有新的 CRD 被创建,也无需重新编译客户端代码
具体到操作层面,Dynamic Client 使用 unstructured.Unstructured
类型作为数据载体(在下面的代码中也能找到对应使用)。这种特殊结构体采用嵌套的 map[string]interface{}
结构来存储资源数据,配合 JSON 标签系统实现与 API Server 的序列化/反序列化交互
在发起 API 请求时,客户端会根据传入的 GroupVersionResource 动态构造对应的 HTTP 请求路径,这种路径构造逻辑通过 APIPathResolverFunc
实现,能够正确处理核心 API 组(如 /api/v1)和扩展 API 组(如 /apis/batch/v1)的不同路径规则
命名空间处理机制采用双重接口设计,NamespaceableResourceInterface
通过组合 ResourceInterface 并添加 Namespace() 方法,实现了作用域的动态切换。这种设计模式使得客户端既能处理命名空间作用域的资源(如 Deployment),也能处理集群作用域的资源(如 Node),在运行时根据资源类型描述自动选择正确的 API 路径
底层通信机制上,Dynamic Client 复用 Kubernetes 标准客户端的 RESTClient 实现,但增加了动态类型适配层。在发起具体操作时,会通过动态生成的 RESTMapper 将 GroupVersionResource
转换为具体的 API 端点路径,同时处理 API 版本协商和资源状态转换(意味着需要正确处理提供 GVK )。对于 Watch 等长连接操作,客户端实现了基于 chunk 的流式处理机制,确保能够高效处理大规模资源变更事件
这种动态客户端架构虽然牺牲了编译时类型安全检查,但为 Kubernetes 生态系统的扩展性提供了重要支撑。它使得开发通用控制器、运维工具等场景无需绑定具体资源类型,特别适合需要处理未知或动态注册资源类型的应用场景。这里有一点可以放心,在实际性能表现上,由于避免了反射操作且直接使用底层 REST 传输,其效率与强类型客户端基本持平,仅在反序列化阶段存在微小差异
Examples
/ runInformer Runs informer for the specified resource
// Here we need to pass in a queue for asynchronous processing, to avoid the process of blocking the cluster due to i/o problems
func runInformer(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, asyncQueue workqueue.RateLimitingInterface) error {
var informer cache.SharedIndexInformer
err := retry.OnError(retry.DefaultRetry, func(err error) bool {
_, isNotFound := err.(*errors.StatusError)
return isNotFound
}, func() error {
informer = createDynamicInformer(dynamicClient, gvr)
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
u := ExtractAboutKeyInformation(obj)
log.Infof("Record Delete Events - %s %s %s\n", u.Namespace, u.Kind, u.Name)
asyncQueue.Add(u)
},
})
return err
})
if err != nil {
return fmt.Errorf("failed to create informer for %s after retries: %w", gvr.String(), err)
}
go informer.Run(ctx.Done())
// Add a timeout for cache synchronization
syncCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
if !cache.WaitForCacheSync(syncCtx.Done(), informer.HasSynced) {
log.Warnf("Failed to sync cache for %s, continuing without full synchronization", gvr.String())
log.Warnf("Detailed cache sync failure for %s: %v", gvr.String(), syncCtx.Err())
}
//<-ctx.Done()
//log.Infof("Context done, stopping informer for %s", gvr.String())
return nil
}
// gvkToGVR Convert GroupVersionKind to GroupVersionResource
func gvkToGVR(discoveryClient *discovery.DiscoveryClient, gvk schema.GroupVersionKind) (schema.GroupVersionResource, error) {
apiResources, err := discoveryClient.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
return schema.GroupVersionResource{}, fmt.Errorf("error getting server resources for group version %s: %w", gvk.GroupVersion().String(), err)
}
for _, apiResource := range apiResources.APIResources {
if apiResource.Kind == gvk.Kind {
return schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: apiResource.Name,
}, nil
}
}
return schema.GroupVersionResource{}, fmt.Errorf("no resource found for GVK %s", gvk)
}
func createDynamicInformer(dynamicClient dynamic.Interface, gvr schema.GroupVersionResource) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return dynamicClient.Resource(gvr).Namespace(v1.NamespaceAll).List(context.TODO(), options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return dynamicClient.Resource(gvr).Namespace(v1.NamespaceAll).Watch(context.TODO(), options)
},
},
&unstructured.Unstructured{},
informerReSyncPeriod,
cache.Indexers{},
)
}