改造 Kubernetes 自定义调度器

kubernetes · 浏览次数 : 0

小编点评

**Go 代码** ```go import ( "context" "kubescheduler.config.k8s.io/v1beta1kind/Deploymentmetadata" "kubescheduler.config.k8s.io/v1beta1kind/MemoryTrafficArgs" "kubescheduler.config.k8s.io/v1beta1kind/NetworkTrafficArgs" ) // PrometheusHandle handles the configuration and execution of the Prometheus scheduler. func PrometheusHandle(ctx context.Context) error { // Load the Kubernetes configuration. config, err := k8s.LoadConfig() if err != nil { return fmt.Errorf("failed to load Kubernetes configuration: %w", err) } // Get the scheduler configuration. schedulerConfig := config.Scheduler if schedulerConfig == nil { return fmt.Errorf("no scheduler configuration found in Kubernetes config") } // Create a new Prometheus client. client, err := k8s.NewClient(config) if err != nil { return fmt.Errorf("failed to create client: %w", err) } // Create a new Prometheus client for the local server. localClient, err := k8s.NewClient(&k8s.Config{ ListOptions: k8s.ListOptions{ Unschedulable: true, }, }) if err != nil { return fmt.Errorf("failed to create local client: %w", err) } // Create a new Prometheus client for the cluster. clusterClient, err := k8s.NewClient(config) if err != nil { return fmt.Errorf("failed to create cluster client: %w", err) } // Create a new deployment object. deployment := &Deploymentmetadata.Deployment{ Metadata: k8s.Metadata{ Name: "gin", }, Spec: k8s.DeploymentSpec{ Template: &k8s.PodTemplate{ Spec: k8s.PodSpec{ Containers: []k8s.Container{ { Name: "gin", Image: "jaydenchang/k8s_test:latest", ImagePullPolicy: k8s.PullPolicyAlways, Command: []string{"./app"}, Port: 9999, }, }, }, }, }, } // Create a new memory traffic object. memoryTraffic := &MemoryTrafficArgs{ Node: "node1", Scopes: []string{"memory"}, } // Create a new network traffic object. networkTraffic := &NetworkTrafficArgs{ Node: "node1", } // Create the Prometheus server. server, err := client.NewServer(context.Background()) if err != nil { return fmt.Errorf("failed to create Prometheus server: %w", err) } // Send the deployment and memory traffic to the Prometheus server. if err := server.Send(context.Background(), deployment); err != nil { return fmt.Errorf("failed to send deployment to Prometheus server: %w", err) } if err := server.Send(context.Background(), memoryTraffic); err != nil { return fmt.Errorf("failed to send memory traffic to Prometheus server: %w", err) } // Start the Prometheus server. server.Start() // Wait for the server to stop. <-server.Shutdown() return nil } ``` **使用说明** 1. 将 `scheduler.yaml` 中的 Kubernetes 配置文件保存到您本地目录下。 2. 将 `scheduler.yaml` 中的 `kubeconfig` 变量设置您的 Kubernetes 服务器的 IP 和端口。 3. 运行 `./prometheus` 命令。 4. 访问 Prometheus 集群,您可以查看您创建的 pod 的日志。

正文

原文出处:改造 Kubernetes 自定义调度器 | Jayden's Blog (jaydenchang.top)

Overview

Kubernetes 默认调度器在调度 Pod 时并不关心特殊资源例如磁盘、GPU 等,因此突发奇想来改造调度器,在翻阅官方调度器框架[1]、调度器配置[2]和参考大佬的文章[3]后,自己也来尝试改写一下。

环境配置

相关软件版本:

  • Kubernetes 版本:v1.19.0
  • Docker 版本:v26.1.2
  • Prometheus 版本:v2.49
  • Node Exporter 版本:v1.7.0

集群内有 1 个 master 和 3 个 node。

实验部分

项目总览

项目结构如下:

.
├── Dockerfile
├── deployment.yaml
├── go.mod
├── go.sum
├── main.go
├── pkg
│   ├── cpu
│   │   └── cputraffic.go
│   ├── disk
│   │   └── disktraffic.go
│   ├── diskspace
│   │   └── diskspacetraffic.go
│   ├── memory
│   │   └── memorytraffic.go
│   ├── network
│   │   └── networktraffic.go
│   └── prometheus.go
├── scheduler
├── scheduler.conf
└── scheduler.yaml

插件部分

下面以构建内存插件为例。

定义插件名称、变量和结构体

const MemoryPlugin = "MemoryTraffic"
var _ = framework.ScorePlugin(&MemoryTraffic{})

type MemoryTraffic struct {
    prometheus *pkg.PrometheusHandle
    handle framework.FrameworkHandle
}

下面来实现 framework.FrameworkHandle 的接口。

先定义插件初始化入口

func New(plArgs runtime.Object, h framework.FrameworkHandle) (framework.Plugin, error) {
    args := &MemoryTrafficArgs{}
    if err := fruntime.DecodeInto(plArgs, args); err != nil {
        return nil, err
    }

    klog.Infof("[MemoryTraffic] args received. Device: %s; TimeRange: %d, Address: %s", args.DeviceName, args.TimeRange, args.IP)

    return &MemoryTraffic{
        handle:     h,
        prometheus: pkg.NewProme(args.IP, args.DeviceName, time.Minute*time.Duration(args.TimeRange)),
    }, nil
}

实现 Score 接口,Score 进行初步打分

func (n *MemoryTraffic) Score(ctx context.Context, state *framework.CycleState, p *corev1.Pod, nodeName string) (int64, *framework.Status) {
    nodeBandwidth, err := n.prometheus.MemoryGetGauge(nodeName)
    if err != nil {
        return 0, framework.NewStatus(framework.Error, fmt.Sprintf("error getting node bandwidth measure: %s", err))
    }
    bandWidth := int64(nodeBandwidth.Value)
    klog.Infof("[MemoryTraffic] node '%s' bandwidth: %v", nodeName, bandWidth)
    return bandWidth, nil
}

实现 NormalizeScore,对上一步 Score 的打分进行修正

func (n *MemoryTraffic) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, scores framework.NodeScoreList) *framework.Status {
    var higherScore int64
    for _, node := range scores {
        if higherScore < node.Score {
            higherScore = node.Score
        }
    }
    // 计算公式为,满分 - (当前内存使用 / 总内存 * 100)
    // 公式的计算结果为,内存使用率越大的节点,分数越低
    for i, node := range scores {
        scores[i].Score = node.Score * 100 / higherScore
        klog.Infof("[MemoryTraffic] Nodes final score: %v", scores[i].Score)
    }

    klog.Infof("[MemoryTraffic] Nodes final score: %v", scores)
    return nil
}

配置插件名称和返回 ScoreExtension

func (n *MemoryTraffic) Name() string {
    return MemoryPlugin
}

// 如果返回framework.ScoreExtensions 就需要实现framework.ScoreExtensions
func (n *MemoryTraffic) ScoreExtensions() framework.ScoreExtensions {
    return n
}

Prometheus 部分

首先来编写查询内存可用率的 PromQL

const memoryMeasureQueryTemplate = ` (avg_over_time(node_memory_MemAvailable_bytes[30m]) / avg_over_time(node_memory_MemTotal_bytes[30m])) * 100 * on(instance) group_left(nodename) (node_uname_info{nodename="%s"})`

然后来声明 PrometheusHandle

type PrometheusHandle struct {
    deviceName string
    timeRange  time.Duration
    ip         string
    client     v1.API
}

另外在插件部分也要声明查询 Prometheus 的参数结构体

type MemoryTrafficArgs struct {
    IP         string `json:"ip"`
    DeviceName string `json:"deviceName"`
    TimeRange  int    `json:"timeRange"`
}

编写初始化 Prometheus 插件入口

func NewProme(ip, deviceName string, timeRace time.Duration) *PrometheusHandle {
    client, err := api.NewClient(api.Config{Address: ip})
    if err != nil {
        klog.Fatalf("[Prometheus Plugin] FatalError creating prometheus client: %s", err.Error())
    }
    return &PrometheusHandle{
        deviceName: deviceName,
        ip:         ip,
        timeRange:  timeRace,
        client:     v1.NewAPI(client),
    }
}

编写通用查询接口,可供其他类型资源查询

func (p *PrometheusHandle) query(promQL string) (model.Value, error) {
    results, warnings, err := p.client.Query(context.Background(), promQL, time.Now())
    if len(warnings) > 0 {
        klog.Warningf("[Prometheus Query Plugin] Warnings: %v\n", warnings)
    }

    return results, err
}

获取内存可用率接口

func (p *PrometheusHandle) MemoryGetGauge(node string) (*model.Sample, error) {
    value, err := p.query(fmt.Sprintf(memoryMeasureQueryTemplate, node))
    fmt.Println(fmt.Sprintf(memoryMeasureQueryTemplate, node))
    if err != nil {
        return nil, fmt.Errorf("[MemoryTraffic Plugin] Error querying prometheus: %w", err)
    }

    nodeMeasure := value.(model.Vector)
    if len(nodeMeasure) != 1 {
        return nil, fmt.Errorf("[MemoryTraffic Plugin] Invalid response, expected 1 value, got %d", len(nodeMeasure))
    }
    return nodeMeasure[0], nil

}

然后在程序入口里启用插件并执行

func main() {
    rand.Seed(time.Now().UnixNano())
    command := app.NewSchedulerCommand(
        app.WithPlugin(network.NetworkPlugin, network.New),
        app.WithPlugin(disk.DiskPlugin, disk.New),
        app.WithPlugin(diskspace.DiskSpacePlugin, diskspace.New),
        app.WithPlugin(cpu.CPUPlugin, cpu.New),
        app.WithPlugin(memory.MemoryPlugin, memory.New),
    )
    // 对于外部注册一个plugin
    // command := app.NewSchedulerCommand(
    // 	app.WithPlugin("example-plugin1", ExamplePlugin1.New))

    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

配置部分

为方便观察,这里使用二进制方式运行,准备运行时的配置文件

apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration
clientConnection:
  kubeconfig: /etc/kubernetes/scheduler.conf
profiles:
- schedulerName: custom-scheduler
  plugins:
    score:
      enabled:
      - name: "CPUTraffic"
        weight: 3
      - name: "MemoryTraffic"
        weight: 4
      - name: "DiskSpaceTraffic"
        weight: 3
      - name: "NetworkTraffic"
        weight: 2
      disabled:
      - name: "*"
  pluginConfig:
    - name: "NetworkTraffic"
      args:
        ip: "http://172.19.32.140:9090"
        deviceName: "eth0"
        timeRange: 60   
    - name: "CPUTraffic"
      args:
        ip: "http://172.19.32.140:9090"
        deviceName: "eth0"
        timeRange: 0
    - name: "MemoryTraffic"
      args:
        ip: "http://172.19.32.140:9090"
        deviceName: "eth0"
        timeRange: 0
    - name: "DiskSpaceTraffic"
      args:
        ip: "http://172.19.32.140:9090"
        deviceName: "eth0"
        timeRange: 0

kubeconfig 处为 master 节点的 scheduler.conf,以实际路径为准,内包含集群的证书哈希,ip 为部署 Prometheus 节点的 ip,端口为 Promenade 配置中对外暴露的端口。

将二进制文件和 scheduler.yaml 放至 master 同一目录下运行:

./scheduler --logtostderr=true \
	--address=127.0.0.1 \
	--v=6 \
	--config=`pwd`/scheduler.yaml \
	--kubeconfig="/etc/kubernetes/scheduler.conf" \

验证结果

准备一个要部署的 Pod,使用指定的调度器名称

apiVersion: apps/v1
kind: Deployment
metadata:
  name: gin
  namespace: default
  labels:
    app: gin
spec:
  replicas: 2
  selector:
    matchLabels:
      app: gin
  template:
    metadata:
      labels:
        app: gin
    spec:
      schedulerName: my-custom-scheduler  # 使用自定义调度器
      containers:
      - name: gin
        image: jaydenchang/k8s_test:latest
        imagePullPolicy: Always
        command: ["./app"]
        ports:
        - containerPort: 9999
          protocol: TCP

最后的可以查看日志,部分日志如下:

I0808 17:32:35.138289   27131 memorytraffic.go:83] [MemoryTraffic] node 'node1' bandwidth: %!s(int64=2680340)
I0808 17:32:35.138763   27131 memorytraffic.go:70] [MemoryTraffic] Nodes final score: [{node1 2680340} {node2 0}]
I0808 17:32:35.138851   27131 memorytraffic.go:70] [MemoryTraffic] Nodes final score: [{node1 71} {node2 0}]
I0808 17:32:35.138911   27131 memorytraffic.go:73] [MemoryTraffic] Nodes final score: [{node1 71} {node2 0}]
I0808 17:32:35.139565   27131 default_binder.go:51] Attempting to bind default/go-deployment-66878c4885-b4b7k to node1
I0808 17:32:35.141114   27131 eventhandlers.go:225] add event for scheduled pod default/go-deployment-66878c4885-b4b7k
I0808 17:32:35.141714   27131 eventhandlers.go:205] delete event for unscheduled pod default/go-deployment-66878c4885-b4b7k
I0808 17:32:35.143504   27131 scheduler.go:609] "Successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="no
de1" evaluatedNodes=2 feasibleNodes=2
I0808 17:32:35.104540   27131 scheduler.go:609] "Successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="no
de1" evaluatedNodes=2 feasibleNodes=2

参考链接


  1. Scheduling Framework | Kubernetes ↩︎

  2. Scheduler Configuration | Kubernetes ↩︎

  3. 基于Prometheus的Kubernetes网络调度器 | Cylon's Collection (oomkill.com) ↩︎

与改造 Kubernetes 自定义调度器相似的内容:

改造 Kubernetes 自定义调度器

原文出处:改造 Kubernetes 自定义调度器 | Jayden's Blog (jaydenchang.top) Overview Kubernetes 默认调度器在调度 Pod 时并不关心特殊资源例如磁盘、GPU 等,因此突发奇想来改造调度器,在翻阅官方调度器框架[1]、调度器配置[2]和参

K8S 1.20 弃用 Docker 评估之 Docker CLI 的替代产品

背景 2020 年 12 月初,Kubernetes 在其最新的 Changelog 中宣布,自 Kubernetes 1.20 之后将弃用 Docker 作为容器运行时。 弃用 Docker 带来的,可能是一系列的改变,包括不限于: 容器镜像构建工具 容器 CLI 容器镜像仓库 容器运行时 专题文

K8S 1.20 弃用 Docker 评估之 Docker CLI 的替代产品 nerdctl

背景 2020 年 12 月初,Kubernetes 在其最新的 Changelog 中宣布,自 Kubernetes 1.20 之后将弃用 Docker 作为容器运行时。 弃用 Docker 带来的,可能是一系列的改变,包括不限于: 容器镜像构建工具 容器 CLI 容器镜像仓库 容器运行时 专题文

K8S 1.20 弃用 Docker 评估之 Docker 和 OCI 镜像格式的差别

背景 2020 年 12 月初,Kubernetes 在其最新的 Changelog 中宣布,自 Kubernetes 1.20 之后将弃用 Docker 作为容器运行时。 弃用 Docker 带来的,可能是一系列的改变,包括不限于: 容器镜像构建工具 容器 CLI 容器镜像仓库 容器运行时 专题文

使用Kubernetes中的Nginx来改善第三方服务的可靠性和延迟

使用Kubernetes中的Nginx来改善第三方服务的可靠性和延迟 译自:How we improved third-party availability and latency with Nginx in Kubernetes 本文讨论了如何在Kubernetes中通过配置Nginx缓存来提升第

【Azure Kubernetes】通过 kubelogin 进行非交互式登录AKS

问题描述 当对AKS的登录方式(认证和授权)从“Local Account with Kubernetes RBAC ”改变为“Azure AD authentication with Azure RBAC”. 通过 kubectl 连接AKS会要求交互式登录,需要通过浏览器输入认证码后关联azur

Ubuntu22.04 安装单机版kubernetes

# 前言 上期讲到要实现.net 6框架下的EF Core操作数据库基本增删改查,没有及时兑现。没有兑现的原因就是因为安装kubernetes。安装kubernetes的过程是灾难性的,也是十分顺利的。灾难性是因为在安装kubernetes过程中误操作,在/etc下执行了一个重置的命令导致我的工作站

Cilium 系列-7-Cilium 的 NodePort 实现从 SNAT 改为 DSR

## 系列文章 * [Cilium 系列文章](https://ewhisper.cn/tags/Cilium/) ## 前言 将 Kubernetes 的 CNI 从其他组件切换为 Cilium, 已经可以有效地提升网络的性能。但是通过对 Cilium 不同模式的切换/功能的启用,可以进一步提升

一键开启云原生网络安全新视界

本文作者:陈桐乐 李卓嘉 随着云原生的兴起,微服务、容器、kubernetes容器编排正在快速改变着企业软件架构的形态,单体架构、分布式架构、微服务架构,软件架构在持续演进的过程中,变得越来越复杂,管理和维护也越来越困难,不断出现的安全漏洞也在持续挑战着企业的安全运营响应能力,如何准确识别风险点,怎

[转帖]Redis检索性能不足,改造rsbeat解决历史慢日志跟踪

https://www.sohu.com/a/313061840_411876 作者介绍 刘宇,甜橙金融创新中心基础技术架构师,拥有9年IT从业经验、9年数据库开发运维经验、4次大型营销活动经验。目前关注容器、分布式数据库技术等基础技术。 在线上排查redis性能问题时,从redis中找进行优化是一