一个小思路 - 我是如何实现K8s资源操作备份的

September 24, 2024 · 0 min · Sxueck

MindSearch 本地部署和简单体验

简介 最近发现到了 MindSearch 这个新兴项目。根据官方的介绍,MindSearch 是一个开源的AI搜索引擎框架。截至 2024 年 9 月 19 日,它仍处于初创阶段,仅具备 Demo 级别的功能,但其背后的技术和设计理念却引起了我的兴趣,它试图通过模拟人类的思维过程,从而提供更为深入和精准的信息检索体验。但是目前网络极少关于这个项目部署和介绍的文档,这也算变相推广介绍了。 PS: 项目目前迭代非常快,因此本文以参考为主 原理 MindSearch 是一款开源的AI搜索引擎,旨在通过模拟人类思维过程来提供深度的信息检索能力。其技术原理主要基于一个多智能体框架(Multi-Agent),结合了大语言模型和复杂的搜索策略,以实现高效的信息获取和处理。受限于项目的快速发展,这里主要以介绍逻辑而不是具体的代码实现(同时也是基于项目论文进行总结) 参考论文:《MindSearch: Mimicking Human Minds Elicits Deep AI Searcher》 核心组件(逻辑) WebPlanner WebPlanner 是 MindSearch 中的高级规划器,负责将用户的查询分解为多个子问题。其工作流程包括: 查询分解:当用户输入问题时,WebPlanner 会分析该问题,并将其拆分为多个可以独立解决的子问题。这种分解使得复杂问题能够被逐步处理。 动态图构建:WebPlanner 利用有向无环图(DAG)来动态构建问题解决的逻辑图。每个子问题作为图中的节点,随着信息的获取,图会不断扩展和细化。这种方法模拟了人类在解决问题时的思维过程,使得搜索结果更加符合逻辑。 构建完成的 DAG 包含了多个节点和边,其中节点表示独立的搜索任务,边表示节点之间的推理关系。每个节点代表一个独立的搜索任务,包括一个 START 节点(用户的原始问题)和一个 END 节点(最终的回答) WebSearcher WebSearcher 是负责实际信息检索的组件,其功能包括: 并行搜索:对于每个从 WebPlanner 接收到的子问题,WebSearcher 会进行并行搜索。这意味着多个子问题可以同时被处理,从而显著提高信息检索的速度。 层次化信息检索:WebSearcher 采用从粗到细的检索策略,首先广泛收集相关信息,然后针对最有价值的页面进行深入分析。这种策略确保了回答的全面性和准确性。 信息提取:在获取到相关网页后,WebSearcher 会提取与子问题相关的关键信息,并将其汇总回WebPlanner,以便生成最终答案。 相较于市面上目前的商业相似品,例如 Perplexity.ai 或者 GPT Research,它更多强调去中心化和用户控制,同时并不是简单的将索引到的网页当成上下文传入到 LLM 进行辅助检索,缺点也正是因为每个步骤都会逐步分解,导致不具备立刻即时响应的能力,以及如果使用商业大模型会非常消耗 Token(如果是 GPT-4 恐怕分分钟烧光余额),一些简单的问题就不适应使用这种方式进行检索了 部署 推荐使用 2C2G 的机器进行部署,磁盘推荐大于 20G 以留出空间给 Python 依赖,同时出于搜索速度也推荐使用境外服务器进行部署,这里优先选择直接安装,或者自己手动封装成镜像,整个项目分为前后端分离 ...

September 24, 2024 · 1 min · Sxueck

一个Pod建立的背后 - 从Kubernetes源码角度剖析

前言 Pod 是 Kubernetes 集群中的基础单元,它们可以组织和运行多个容器。从用户的角度,Pod 可以很方便地被创建和管理,但是它们的实际工作原理却鲜有人关注或者一知半解。在这篇文章中,我们将从 Kubernetes 源码的角度,详细剖析一个 Pod 建立的背后。同时会分析各个组件的工作流程,以及它们如何协同工作,以实现创建和管理 Pod 的目标。同时我们还将在代码解读中,讲解每个组件的职责和实现原理,以更好地理解 Pod 工作的核心机制。 Pod基础知识 Pod 由一个或多个 Container 组成,或者可以说是容器的抽象,所有容器共享一个网络命名空间、存储卷等,可以很方便地共享数据和通信。 首先我们需要通过声明式配置文件来创建 Pod,描述容器的配置和所需的资源,并通过 kubectl 命令提交给 Kubernetes 集群中,而后集群的调度器也就是 Scheduler 组件会根据预先配置的亲和性与对硬件资源的考量,将 Pod 放到一个合适的节点,一旦调度成功,Kubernetes 会启动其中的容器并为其分配网络命名空间、IP 地址和存储卷。 在 Pod 启动成功后,Kubernetes 会不断监视其状态,并提供了更新、扩展和故障转移等编排功能,而在这篇文章中,我将会假设您已经有了对于 Kubernetes 已经具备了一定程度的水平和经验。同时,为了保证学习的流畅性,文章会摒弃例如 PriorityClass 之类的对于主流程关联性没有那么重要的特性 创建Pod的过程 我们在上面基础复习的时候,已经简述了一个 Pod 从组件上是如何被传递并且被创建了,但是如果想要阅读源码,不仅需要查看代码实现,还需要理解 Kubernetes 的架构设计和组件间的通信机制。 过程简述 我们首先将 YAML 声明使用 kubectl 进行提交,kubectl 会通过 Kubeconfig 文件与 kube-apiserver 进行通信。kubeconfig 中的 token 会与 kube-apiserver 进行校验,这个 token 通常被配置在集群中某一个 Secrets 中,与某个具备一定权限的 ServiceAccount 做了绑定,在通过认证后 kube-apiserver 将对提交的 YAML 声明进行语法和逻辑检查,以确保其符合 Kubernetes 的要求。这些检查包括验证声明中的资源是否存在、依赖关系是否正确等,如果没有问题则将其写到 Etcd 中。 ...

May 11, 2024 · 1 min · Sxueck

通过RBAC权限最小化创建云效AppStack流水线用户

云效官方文档对于 Pipeline 的权限要求 权限要求 权限类型 非分批发布 分批发布 Namespace 读权限 读权限 ControllerRevision (apps/v1/ControllerRevision) 全部权限 全部权限 待部署的 Kubernetes 对象 全部权限 全部权限 Rollout (standard.oam.dev/v1alpha1) - 全部权限 待部署的工作负载对象 全部权限 全部权限 CRD (apiextensions.k8s.io/v1/CustomResourceDefinition) - 全部权限(用于安装 Rollout CRD) ServiceAccount (core/v1/ServiceAccount) - 全部权限(用于维持 Rollout 控制器的运行) ClusterRole (rbac.authorization.k8s.io/v1/ClusterRole) - 全部权限(用于维持 Rollout 控制器的运行) ClusterRoleBinding (rbac.authorization.k8s.io/v1/ClusterRoleBinding) - 全部权限(用于维持 Rollout 控制器的运行) Role (rbac.authorization.k8s.io/v1/Role) - 全部权限(用于维持 Rollout 控制器的运行) RoleBinding (rbac.authorization.k8s.io/v1/RoleBinding) - 全部权限(用于维持 Rollout 控制器的运行) Deployment (apps/v1/Deployment) - 全部权限(用于维持 Rollout 控制器的运行) Pod (core/v1/Pod) - 全部权限(用于 Rollout 控制器的安装后 E2E 测试) 注:"-" 表示在该场景下不需要相应的权限。 ...

April 15, 2024 · 5 min · Sxueck

Ruby实现一个简单的DNS查询工具

前言 最近需要写一个自动轮询生成指定域名的最快 IP 的工具,本身是一个小工具的属性,直接是用 Ruby 调试加上编写花费了三个小时完成了部分功能,过程比我想象地要难一点,这里直接放出来代码给大家参考 同时我也参考了这段 Gist :https://gist.github.com/jvns/1e5838a53520e45969687e2f90199770 效果 代码和部分讲解 # frozen_string_literal: true require 'socket' require 'stringio' DNS_TYPES = { 1 => "A", 2 => "NS", 5 => "CNAME", } class DNSEncapsulationOfWriteOperations # fixed format, just hardcode private def make_question_header(query_id) [query_id, 0x0100, 0x0001, 0x0000, 0x0000, 0x0000].pack('nnnnnn') end public def make_dns_query(domain, query_id) question = domain.split('.').map { |label| [label.length, label].pack('Ca*') }.join + "\x00" question << [0x0001, 0x0001].pack('nn') # query type and class make_question_header(query_id) + question end end class DNSEncapsulationOfReadOperations def read_domain_name(buf) domain = [] loop do label_length = buf.read(1).unpack1('C') break if label_length == 0 if (label_length & 0xc0) == 0xc0 # DNS compression pointer_offset = ((label_length & 0x3f) << 8) + buf.read(1).unpack1('C') old_pos = buf.pos buf.pos = pointer_offset domain << read_domain_name(buf) buf.pos = old_pos break else # Normal label domain << buf.read(label_length) end end domain.join('.') end def read_rdata(buf, length) case DNS_TYPES[@type] || @type when "CNAME", "NS" read_domain_name(buf) when "A" buf.read(4).unpack1('C4') else buf.read(length) end end end class DNSHeader attr_reader :id, :flags, :num_questions, :num_answers, :num_auth, :num_additional def initialize(buf) hdr = buf.read(12) @id, @flags, @num_questions, @num_answers, @num_auth, @num_additional = hdr.unpack('nnnnnn') end end class DNSRecord attr_reader :name, :type, :class, :ttl, :rdlength, :rdata, :parsed_rdata def initialize(buf) @dns_read_ops = DNSEncapsulationOfReadOperations.new @name = @dns_read_ops.read_domain_name(buf) @type, @class, @ttl, @rdlength = buf.read(10).unpack('nnNn') @parsed_rdata = read_rdata(buf, @rdlength) end def read_rdata(buf, length) @type_name = DNS_TYPES[@type] || @type if @type_name == "CNAME" or @type_name == "NS" @dns_read_ops.read_domain_name(buf) elsif @type_name == "A" buf.read(length).unpack('C*').join('.') else buf.read(length) end end def to_s "#{@name}\t\t#{@ttl}\t#{@type_name}\t#{@parsed_rdata}" end end class DNSQuery attr_reader :domain, :type, :cls def initialize(buf) dns_read_ops = DNSEncapsulationOfReadOperations.new @domain = dns_read_ops.read_domain_name(buf) @type, @cls = buf.read(4).unpack('nn') end end class DNSResponse attr_reader :header, :queries, :answers, :authorities, :additionals def initialize(bytes) buf = StringIO.new(bytes) # 将字节数组打包成字符串并创建 StringIO 对象 @header = DNSHeader.new(buf) @queries = (1..@header.num_questions).map { DNSQuery.new(buf) } if @header.num_questions > 0 @answers = (1..@header.num_answers).map { DNSRecord.new(buf) } if @header.num_answers > 0 @authorities = (1..@header.num_auth).map { DNSRecord.new(buf) } if @header.num_auth > 0 @additionals = (1..@header.num_additional).map { DNSRecord.new(buf) } if @header.num_additional > 0 end end def send_dns_query(sock, domain, query_id) dns_write_ops = DNSEncapsulationOfWriteOperations.new sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, [3, 0].pack('l_2')) # timeout - 3s sock.send(dns_write_ops.make_dns_query(domain, query_id), 0) end def receive_dns_response(sock) reply, _ = sock.recvfrom(1024) DNSResponse.new(reply) end module Resolvfast class Error < StandardError; end def self.main(domain) begin sock = UDPSocket.new sock.bind('0.0.0.0', rand(10240..65535)) sock.connect('223.5.5.5', 53) # send query send_dns_query(sock, domain, 1) # receive & parse response response = receive_dns_response(sock) print "Answers for #{domain}:\n" response.answers.each do |record| puts record end ensure sock.close end end end if ARGV.empty? puts "Usage: #{$PROGRAM_NAME} <domain>" exit 1 end Resolvfast.main(ARGV[0]) 测试 ...

April 2, 2024 · 3 min · Sxueck

源码随笔 - Kured插件

前言 Kured(Kubernetes Reboot Daemon)是一个针对 Kubernetes 的守护程序,其功能是在底层操作系统的软件包管理系统指示需要重新启动时,进行安全的自动节点重启 该守护程序通过监视重启标志文件(例如 /var/run/reboot-required)或监测特定的哨兵命令是否成功运行来确定是否需要进行重启操作。它会持续不断地观察这些指示,并在需要时采取相应的行动 为了确保在进行节点重启时的平稳运行,Kured 利用了 Kubernetes 的 APIServer 中的锁机制,以保证一次只有一个节点进行重启。这种锁机制确保了在集群中同一时间只有一个节点被重启,并避免了重启操作之间的冲突 此外,Kured 还提供了一些可选功能,例如在存在活动的 Prometheus 警报或特定的 Pod 时,可以推迟执行重启操作。这样可以避免在系统处于故障状态或关键服务正在运行时进行重启,从而确保整个集群的稳定性和可用性 总体来说,这个项目整体难度不算太高,通过对这个项目的二开可以加深对于集群的理解,具体的代码可以参考 https://github.com/kubereboot/kured ,截止目前,最新版本为 1.15.1 Kured 代码流程简述 主流程 它的 flags 使用了 cobra 库,同时在 Run 形参里面声明了 Root 函数为主要的执行函数,注意不要被 Execute 的调用所疑惑了 window, err := timewindow.New(rebootDays, rebootStart, rebootEnd, timezone) 这里使用了从 flags 中声明的重启时间,其中的 rebootDays 根据上下文来看,是个 []string 类型,推测估计是直接填写日期字符串 buildHostCommand(1, restartCommand) 这个方法实际上是将外部的宿主机的控制权限映射到了内部的容器控制点(当然必须事先声明 hostPID:true 和 privileged:true这两个选项允许容器进程获取额外的权限),如果我们进入函数内部进行查看,可以看到它是使用了 Linux 的 Nsenter 命令,这个命令会进入宿主机的命名空间(特别是在这里指定了 -m 选项,与之和宿主机的 PID 为 1 的进程做挂载)。这样,随后追加的 command 就会在宿主机上执行,而不是在容器内部 func buildHostCommand(pid int, command []string) []string { // From the container, we nsenter into the proper PID to run the hostCommand. // For this, kured daemonset need to be configured with hostPID:true and privileged:true cmd := []string{"/usr/bin/nsenter", fmt.Sprintf("-m/proc/%d/ns/mnt", pid), "--"} cmd = append(cmd, command...) return cmd } 而在 Root 函数中,剩余的代码作用于设置和触发节点重启的逻辑,代码这里展示了两种重启的策略方法 ...

April 1, 2024 · 4 min · Sxueck

从Brutal算法的实现引申出的随想

前篇 关于 TCP 拥塞的控制算法,入门知识可以从 《TCP拥塞控制算法BBR的原理和改进实践》开始过一遍 Brutal 算法的核心代码剖析 static void brutal_update_rate(struct sock *sk) { struct tcp_sock *tp = tcp_sk(sk); struct brutal *brutal = inet_csk_ca(sk); u64 sec = tcp_sock_get_sec(tp); u64 min_sec = sec - PKT_INFO_SLOTS; u32 acked = 0, losses = 0; u32 ack_rate; // Scaled by 100 (100=1.00) as kernel doesn't support float u64 rate = brutal->rate; u32 cwnd; u32 mss = tp->mss_cache; u32 rtt_ms = (tp->srtt_us >> 3) / USEC_PER_MSEC; if (!rtt_ms) rtt_ms = 1; for (int i = 0; i < PKT_INFO_SLOTS; i++) { if (brutal->slots[i].sec >= min_sec) { acked += brutal->slots[i].acked; losses += brutal->slots[i].losses; } } if (acked + losses < MIN_PKT_INFO_SAMPLES) ack_rate = 100; else { ack_rate = acked * 100 / (acked + losses); if (ack_rate < MIN_ACK_RATE_PERCENT) ack_rate = MIN_ACK_RATE_PERCENT; } rate *= 100; rate /= ack_rate; // The order here is chosen carefully to avoid overflow as much as possible cwnd = rate / MSEC_PER_SEC; cwnd *= rtt_ms; cwnd /= mss; cwnd *= brutal->cwnd_gain; cwnd /= 10; cwnd = max_t(u32, cwnd, MIN_CWND); brutal_tcp_snd_cwnd_set(tp, min(cwnd, tp->snd_cwnd_clamp)); WRITE_ONCE(sk->sk_pacing_rate, min_t(u64, rate, READ_ONCE(sk->sk_max_pacing_rate))); } 剖析 从 struct tcp_sock 检索当前时间戳,转换成秒 计算出一个最小的时间戳,该时间戳是当前时间戳减去一个固定大小的窗口(PKT_INFO_SLOTS)。这个窗口用来限定考虑的数据包信息的时间范围 遍历存储数据包信息的数组 brutal->slots,累加在时间窗口内的确认(acked)和丢失(losses)的数据包数量 如果在时间窗口内的样本数量小于定义的最小样本数量 MIN_PKT_INFO_SAMPLES,则将确认率 ack_rate 设为 100%,表示没有丢包 如果样本数量足够,则计算确认率 ack_rate,即在考虑的时间窗口内被确认的数据包与总数据包的比例,并乘以 100 转换为百分比格式 如果确认率低于定义的最小确认率 MIN_ACK_RATE_PERCENT,则使用这个最小值 然后根据确认率调整发送速率 rate。这里使用了一些数学操作来避免使用浮点数,因为内核通常不使用浮点运算 接下来,根据新的发送速率、往返时间(RTT),以及最大分段大小(MSS)计算新的拥塞窗口大小 cwnd 拥塞窗口大小还乘以了窗口增益 cwnd_gain(调整因子),然后除以10(因为增益是以十分之一为单位) 拥塞窗口大小被限制在一个最小值 MIN_CWND 和 tp->snd_cwnd_clamp(拥塞窗口的最大允许值)之间 使用 brutal_tcp_snd_cwnd_set 更新 tcp_sock 结构体中的 snd_cwnd 拥塞窗口大小 最后,更新套接字的发送速率 sk_pacing_rate,这也被限制在 sk_max_pacing_rate(最大发送速率)以下 其实我们能明显看出来,TCP 拥塞算法的核心思想,就是根据当前的网络拥塞情况(在这里依赖于判断丢包率)来计算 cwnd,这个显著指标作为核心值会判定直接影响到发送速率和数据包重传策略的决策,这种 AIMD 原则会在网络状况良好时逐渐增加发送窗口,以增大吞吐量,而在检测到网络拥塞(如丢包事件)时则大幅减小窗口大小,以减轻网络负担,理论上来说遵循这种原则的算法都可以算是一种 “君子算法”,但是 Brutal 算法不是的,在他们的文档中就说明了这一点: ...

December 6, 2023 · 3 min · Sxueck