Crane 推荐组件源码笔记

选自我的笔记,项目非常庞大,希望给各位同好一点启发,这里主要还是推荐部分模块的源码走读 架构 The project structure is organized into several directories and files, each serving a specific purpose. Here’s a brief overview of the main components cmd/ metric-adapter/: Contains code related to the metric adapter component craned/: Contains the main application code for the craned command app/: Likely contains application-specific logic and configurations main.go: The entry point for the craned application crane-agent/: Contains code related to the crane agent component deploy/: Contains deployment scripts and configurations for deploying the application docs/: Documentation files for the project examples/: Example configurations or usage examples for the project hack/: Scripts and utilities for development and maintenance tasks overrides/: Possibly contains configuration overrides or custom settings pkg/: Contains the core library code, which can be used by other applications or commands site/: Could be related to the project’s website or documentation site tools/: Additional tools or scripts that support the project Files: go.mod and go.sum: Go module files that manage dependencies README.md and README_zh.md: Documentation files providing an overview and instructions for the project Makefile: Contains build instructions and tasks for the project Dockerfile: Instructions for building a Docker image of the application .gitignore: Specifies files and directories to be ignored by Git .golangci.yaml: Configuration for GolangCI, a Go linter CONTRIBUTING.md and CODE_OF_CONDUCT.md: Guidelines for contributing to the project and expected behavior Manager 部分 Command Initialization: NewManagerCommand(ctx context.Context) *cobra.Command: Creates a new Cobra command for craned Initializes options using options.NewOptions() Sets up command flags and feature gates Command Execution: The Run function within the Cobra command is executed: Calls opts.Complete() to complete option setup Validates options with opts.Validate() Executes Run(ctx, opts) to start the manager Run Function: Run(ctx context.Context, opts *options.Options) error: Configures the controller manager with ctrl.GetConfigOrDie() Sets up controller options, including leader election and metrics binding Creates a new manager with ctrl.NewManager(config, ctrlOptions) Health Checks: Adds health and readiness checks using mgr.AddHealthzCheck and mgr.AddReadyzCheck Data Sources and Predictor Initialization: initDataSources(mgr, opts): Initializes real-time and historical data sources initPredictorManager(opts, realtimeDataSources, historyDataSources): Sets up the predictor manager using the initialized data sources Scheme and Indexer Initialization: initScheme(): Registers API types with the runtime scheme initFieldIndexer(mgr): Sets up field indexers for efficient querying Webhooks Initialization: initWebhooks(mgr, opts): Configures webhooks if necessary Pod OOM Recorder: Initializes oom.PodOOMRecorder to track out-of-memory events Sets up the recorder with the manager and runs it in a separate goroutine Controller Initialization: initControllers(ctx, podOOMRecorder, mgr, opts, predictorMgr, historyDataSources[providers.PrometheusDataSource]): Initializes various controllers, including analytics and recommendation controllers Metric Collector Initialization: initMetricCollector(mgr): Sets up custom metrics collection Running All Components: runAll(ctx, mgr, predictorMgr, dataSourceProviders[providers.PrometheusDataSource], opts): Starts all components, ensuring that the manager and predictor manager are running Controllers 部分 PodOOMRecorder SetupWithManager: SetupWithManager(mgr ctrl.Manager) error: This function is called to register the PodOOMRecorder with the controller manager It sets up the reconciler to watch for pod events and trigger the Reconcile function Reconcile Function: Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error): Triggered by the controller manager for each pod event Retrieves the pod specified in the request Checks if the pod was terminated due to an OOM event using IsOOMKilled(pod) If OOMKilled, iterates over container statuses to find the terminated container For each OOMKilled container, checks if memory requests are set Increments the OOM count metric and adds an OOMRecord to the queue Run Function: Run(stopCh <-chan struct{}) error: Continuously processes the OOM records from the queue Uses a ticker to periodically clean old OOM records to prevent exceeding ConfigMap storage limits Retrieves OOM records using GetOOMRecord Updates OOM records with updateOOMRecord GetOOMRecord Function: GetOOMRecord() ([]OOMRecord, error): Retrieves OOM records from the cache or ConfigMap If the cache is nil, it fetches the ConfigMap and unmarshals the OOM records cleanOOMRecords Function: cleanOOMRecords(oomRecords []OOMRecord) []OOMRecord: Cleans up old OOM records to maintain the maximum number specified by OOMRecordMaxNumber updateOOMRecord Function: updateOOMRecord(oomRecord OOMRecord, saved []OOMRecord) error: Updates the OOM records in the cache or storage with new records IsOOMKilled Function: IsOOMKilled(pod *v1.Pod) bool: Helper function to determine if a pod was terminated due to an OOM event Detailed Flow Initialization: The PodOOMRecorder is set up with the manager using SetupWithManager, which registers it to watch for pod events Event Handling: For each pod event, Reconcile is called to check for OOM events and record them Record Management: The Run function processes the queue of OOM records, periodically cleans old records, and updates the storage Data Retrieval: GetOOMRecord fetches the current OOM records, either from the cache or the ConfigMap This detailed explanation provides a comprehensive view of the PodOOMRecorder’s functionality and its role in tracking OOM events in a Kubernetes cluster ...

February 12, 2025 · 12 min · Sxueck

一个小思路 - 我是如何实现K8s资源操作备份的

September 24, 2024 · 0 min · Sxueck

MindSearch 本地部署和简单体验

简介 最近发现到了 MindSearch 这个新兴项目。根据官方的介绍,MindSearch 是一个开源的AI搜索引擎框架。截至 2024 年 9 月 19 日,它仍处于初创阶段,仅具备 Demo 级别的功能,但其背后的技术和设计理念却引起了我的兴趣,它试图通过模拟人类的思维过程,从而提供更为深入和精准的信息检索体验。但是目前网络极少关于这个项目部署和介绍的文档,这也算变相推广介绍了。 PS: 项目目前迭代非常快,因此本文以参考为主 原理 MindSearch 是一款开源的AI搜索引擎,旨在通过模拟人类思维过程来提供深度的信息检索能力。其技术原理主要基于一个多智能体框架(Multi-Agent),结合了大语言模型和复杂的搜索策略,以实现高效的信息获取和处理。受限于项目的快速发展,这里主要以介绍逻辑而不是具体的代码实现(同时也是基于项目论文进行总结) 参考论文:《MindSearch: Mimicking Human Minds Elicits Deep AI Searcher》 核心组件(逻辑) WebPlanner WebPlanner 是 MindSearch 中的高级规划器,负责将用户的查询分解为多个子问题。其工作流程包括: 查询分解:当用户输入问题时,WebPlanner 会分析该问题,并将其拆分为多个可以独立解决的子问题。这种分解使得复杂问题能够被逐步处理。 动态图构建:WebPlanner 利用有向无环图(DAG)来动态构建问题解决的逻辑图。每个子问题作为图中的节点,随着信息的获取,图会不断扩展和细化。这种方法模拟了人类在解决问题时的思维过程,使得搜索结果更加符合逻辑。 构建完成的 DAG 包含了多个节点和边,其中节点表示独立的搜索任务,边表示节点之间的推理关系。每个节点代表一个独立的搜索任务,包括一个 START 节点(用户的原始问题)和一个 END 节点(最终的回答) WebSearcher WebSearcher 是负责实际信息检索的组件,其功能包括: 并行搜索:对于每个从 WebPlanner 接收到的子问题,WebSearcher 会进行并行搜索。这意味着多个子问题可以同时被处理,从而显著提高信息检索的速度。 层次化信息检索:WebSearcher 采用从粗到细的检索策略,首先广泛收集相关信息,然后针对最有价值的页面进行深入分析。这种策略确保了回答的全面性和准确性。 信息提取:在获取到相关网页后,WebSearcher 会提取与子问题相关的关键信息,并将其汇总回WebPlanner,以便生成最终答案。 相较于市面上目前的商业相似品,例如 Perplexity.ai 或者 GPT Research,它更多强调去中心化和用户控制,同时并不是简单的将索引到的网页当成上下文传入到 LLM 进行辅助检索,缺点也正是因为每个步骤都会逐步分解,导致不具备立刻即时响应的能力,以及如果使用商业大模型会非常消耗 Token(如果是 GPT-4 恐怕分分钟烧光余额),一些简单的问题就不适应使用这种方式进行检索了 部署 推荐使用 2C2G 的机器进行部署,磁盘推荐大于 20G 以留出空间给 Python 依赖,同时出于搜索速度也推荐使用境外服务器进行部署,这里优先选择直接安装,或者自己手动封装成镜像,整个项目分为前后端分离 ...

September 24, 2024 · 1 min · Sxueck

尝试使用手动部署 Kubernetes 集群

本文更新于 2024-12-26,Kubernetes 更新非常快,请注意可能有过时信息 前言 最近公司业务场景要一个高可用的集群节点用于工业环境,而且数据量非常大,刚刚好手上有一台 IDC 淘来的机架服务器,借机复习一下 K8s 的手动部署。 系统资源 虚拟机配置 $ free -h total used free shared buff/cache available Mem: 31Gi 901Mi 25Gi 4.0Mi 5.4Gi 30Gi Swap: 0B 0B 0B $ cat /proc/cpuinfo | grep -c processor 32 集群的安装 由于只安装一个 Control Panel,我们只需要配置好以下几个组件即可 ETCD Kube-ApiServer Kube-Controller-Manager Kube-Scheduler Kube-Proxy Kubelet 基础的配置 安装基础软件 $ sudo apt install bash-completion git net-tools sudo build-essential golang conntrack $ go version # 这里的 Go 版本需要大于 1.12 go version go1.22.2 linux/amd64 进行组件的编译 ...

July 3, 2024 · 6 min · Sxueck

一个Pod建立的背后 - 从Kubernetes源码角度剖析

前言 Pod 是 Kubernetes 集群中的基础单元,它们可以组织和运行多个容器。从用户的角度,Pod 可以很方便地被创建和管理,但是它们的实际工作原理却鲜有人关注或者一知半解。在这篇文章中,我们将从 Kubernetes 源码的角度,详细剖析一个 Pod 建立的背后。同时会分析各个组件的工作流程,以及它们如何协同工作,以实现创建和管理 Pod 的目标。同时我们还将在代码解读中,讲解每个组件的职责和实现原理,以更好地理解 Pod 工作的核心机制。 Pod基础知识 Pod 由一个或多个 Container 组成,或者可以说是容器的抽象,所有容器共享一个网络命名空间、存储卷等,可以很方便地共享数据和通信。 首先我们需要通过声明式配置文件来创建 Pod,描述容器的配置和所需的资源,并通过 kubectl 命令提交给 Kubernetes 集群中,而后集群的调度器也就是 Scheduler 组件会根据预先配置的亲和性与对硬件资源的考量,将 Pod 放到一个合适的节点,一旦调度成功,Kubernetes 会启动其中的容器并为其分配网络命名空间、IP 地址和存储卷。 在 Pod 启动成功后,Kubernetes 会不断监视其状态,并提供了更新、扩展和故障转移等编排功能,而在这篇文章中,我将会假设您已经有了对于 Kubernetes 已经具备了一定程度的水平和经验。同时,为了保证学习的流畅性,文章会摒弃例如 PriorityClass 之类的对于主流程关联性没有那么重要的特性 创建Pod的过程 我们在上面基础复习的时候,已经简述了一个 Pod 从组件上是如何被传递并且被创建了,但是如果想要阅读源码,不仅需要查看代码实现,还需要理解 Kubernetes 的架构设计和组件间的通信机制。 过程简述 我们首先将 YAML 声明使用 kubectl 进行提交,kubectl 会通过 Kubeconfig 文件与 kube-apiserver 进行通信。kubeconfig 中的 token 会与 kube-apiserver 进行校验,这个 token 通常被配置在集群中某一个 Secrets 中,与某个具备一定权限的 ServiceAccount 做了绑定,在通过认证后 kube-apiserver 将对提交的 YAML 声明进行语法和逻辑检查,以确保其符合 Kubernetes 的要求。这些检查包括验证声明中的资源是否存在、依赖关系是否正确等,如果没有问题则将其写到 Etcd 中。 ...

May 11, 2024 · 1 min · Sxueck

通过RBAC权限最小化创建云效AppStack流水线用户

云效官方文档对于 Pipeline 的权限要求 权限要求 权限类型 非分批发布 分批发布 Namespace 读权限 读权限 ControllerRevision (apps/v1/ControllerRevision) 全部权限 全部权限 待部署的 Kubernetes 对象 全部权限 全部权限 Rollout (standard.oam.dev/v1alpha1) - 全部权限 待部署的工作负载对象 全部权限 全部权限 CRD (apiextensions.k8s.io/v1/CustomResourceDefinition) - 全部权限(用于安装 Rollout CRD) ServiceAccount (core/v1/ServiceAccount) - 全部权限(用于维持 Rollout 控制器的运行) ClusterRole (rbac.authorization.k8s.io/v1/ClusterRole) - 全部权限(用于维持 Rollout 控制器的运行) ClusterRoleBinding (rbac.authorization.k8s.io/v1/ClusterRoleBinding) - 全部权限(用于维持 Rollout 控制器的运行) Role (rbac.authorization.k8s.io/v1/Role) - 全部权限(用于维持 Rollout 控制器的运行) RoleBinding (rbac.authorization.k8s.io/v1/RoleBinding) - 全部权限(用于维持 Rollout 控制器的运行) Deployment (apps/v1/Deployment) - 全部权限(用于维持 Rollout 控制器的运行) Pod (core/v1/Pod) - 全部权限(用于 Rollout 控制器的安装后 E2E 测试) 注:"-" 表示在该场景下不需要相应的权限。 ...

April 15, 2024 · 5 min · Sxueck

Ruby实现一个简单的DNS查询工具

前言 最近需要写一个自动轮询生成指定域名的最快 IP 的工具,本身是一个小工具的属性,直接是用 Ruby 调试加上编写花费了三个小时完成了部分功能,过程比我想象地要难一点,这里直接放出来代码给大家参考 同时我也参考了这段 Gist :https://gist.github.com/jvns/1e5838a53520e45969687e2f90199770 效果 代码和部分讲解 # frozen_string_literal: true require 'socket' require 'stringio' DNS_TYPES = { 1 => "A", 2 => "NS", 5 => "CNAME", } class DNSEncapsulationOfWriteOperations # fixed format, just hardcode private def make_question_header(query_id) [query_id, 0x0100, 0x0001, 0x0000, 0x0000, 0x0000].pack('nnnnnn') end public def make_dns_query(domain, query_id) question = domain.split('.').map { |label| [label.length, label].pack('Ca*') }.join + "\x00" question << [0x0001, 0x0001].pack('nn') # query type and class make_question_header(query_id) + question end end class DNSEncapsulationOfReadOperations def read_domain_name(buf) domain = [] loop do label_length = buf.read(1).unpack1('C') break if label_length == 0 if (label_length & 0xc0) == 0xc0 # DNS compression pointer_offset = ((label_length & 0x3f) << 8) + buf.read(1).unpack1('C') old_pos = buf.pos buf.pos = pointer_offset domain << read_domain_name(buf) buf.pos = old_pos break else # Normal label domain << buf.read(label_length) end end domain.join('.') end def read_rdata(buf, length) case DNS_TYPES[@type] || @type when "CNAME", "NS" read_domain_name(buf) when "A" buf.read(4).unpack1('C4') else buf.read(length) end end end class DNSHeader attr_reader :id, :flags, :num_questions, :num_answers, :num_auth, :num_additional def initialize(buf) hdr = buf.read(12) @id, @flags, @num_questions, @num_answers, @num_auth, @num_additional = hdr.unpack('nnnnnn') end end class DNSRecord attr_reader :name, :type, :class, :ttl, :rdlength, :rdata, :parsed_rdata def initialize(buf) @dns_read_ops = DNSEncapsulationOfReadOperations.new @name = @dns_read_ops.read_domain_name(buf) @type, @class, @ttl, @rdlength = buf.read(10).unpack('nnNn') @parsed_rdata = read_rdata(buf, @rdlength) end def read_rdata(buf, length) @type_name = DNS_TYPES[@type] || @type if @type_name == "CNAME" or @type_name == "NS" @dns_read_ops.read_domain_name(buf) elsif @type_name == "A" buf.read(length).unpack('C*').join('.') else buf.read(length) end end def to_s "#{@name}\t\t#{@ttl}\t#{@type_name}\t#{@parsed_rdata}" end end class DNSQuery attr_reader :domain, :type, :cls def initialize(buf) dns_read_ops = DNSEncapsulationOfReadOperations.new @domain = dns_read_ops.read_domain_name(buf) @type, @cls = buf.read(4).unpack('nn') end end class DNSResponse attr_reader :header, :queries, :answers, :authorities, :additionals def initialize(bytes) buf = StringIO.new(bytes) # 将字节数组打包成字符串并创建 StringIO 对象 @header = DNSHeader.new(buf) @queries = (1..@header.num_questions).map { DNSQuery.new(buf) } if @header.num_questions > 0 @answers = (1..@header.num_answers).map { DNSRecord.new(buf) } if @header.num_answers > 0 @authorities = (1..@header.num_auth).map { DNSRecord.new(buf) } if @header.num_auth > 0 @additionals = (1..@header.num_additional).map { DNSRecord.new(buf) } if @header.num_additional > 0 end end def send_dns_query(sock, domain, query_id) dns_write_ops = DNSEncapsulationOfWriteOperations.new sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, [3, 0].pack('l_2')) # timeout - 3s sock.send(dns_write_ops.make_dns_query(domain, query_id), 0) end def receive_dns_response(sock) reply, _ = sock.recvfrom(1024) DNSResponse.new(reply) end module Resolvfast class Error < StandardError; end def self.main(domain) begin sock = UDPSocket.new sock.bind('0.0.0.0', rand(10240..65535)) sock.connect('223.5.5.5', 53) # send query send_dns_query(sock, domain, 1) # receive & parse response response = receive_dns_response(sock) print "Answers for #{domain}:\n" response.answers.each do |record| puts record end ensure sock.close end end end if ARGV.empty? puts "Usage: #{$PROGRAM_NAME} <domain>" exit 1 end Resolvfast.main(ARGV[0]) 测试 ...

April 2, 2024 · 3 min · Sxueck