containerd 源码分析:kubelet 和 containerd 交互

containerd,kubelet · 浏览次数 : 0

小编点评

**代码分析** ** instrumentService.gofunc RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (res *runtime.RunPodSandboxResponse, err error) { // 创建沙箱服务 res, err := in.c.RunPodSandbox(ctrdutil.WithNamespace(ctx), r) if err != nil { return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err) } // 启动沙箱服务 ctrl, err := c.sandboxService.StartSandbox(ctx, sandbox.Sandboxer, id) if err != nil { ... } return res, nil } ``` **运行流程图** ```mermaid graph LR A[kubelet] --> B[注册 instrumentatedService 到 grpc] B --> C[从 kubelet 到 containerd 的交互流程] C --> D[创建 pod 流程] D --> E[kubelet 到 containerd 的交互流程] E --> F[创建 pod ] ``` **总结** 代码展示了 Kubernetes 核心组件 `kubelet` 和 `containerd` 之间交互流程。在 `RunPodSandbox` 方法中,`kubelet` 创建了一个沙箱服务并启动它,并通过 `containerd` 的接口与 `criGRPCServer` 进行交互。最终,代码创建一个新 pod 并将其添加到 Kubernetes 中。

正文


0. 前言

Kubernetes:kubelet 源码分析之创建 pod 流程 介绍了 kubelet 创建 pod 的流程,其中介绍了 kubelet 调用 runtime cri 接口创建 pod。containerd 源码分析:启动注册流程 介绍了 containerd 作为一种行业标准的高级运行时的启动注册流程。那么,kubelet 是怎么和 containerd 交互的呢? 本文会带着这个问题分析 kubeletcontainerd 的交互。

1. kubelet 和 containerd 交互

1.1 kubelet

Kubernetes:kubelet 源码分析之创建 pod 流程 分析,kubelet 调用 runtime cri 接口 /runtime.v1.RuntimeService/RunPodSandbox 创建 pod:

// kubernetes/vendor/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
	out := new(RunPodSandboxResponse)
	err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

kubelet 是 runtime cri 接口调用的客户端,那么容器运行时作为服务端是怎么提供服务的呢?

1.2 kubelet 和 containerd 交互流程

在介绍容器运行时提供的服务之前先看下 cri 架构图

image

从图中可以看出,containerd 的 CRI 插件提供 image serviceruntime service,负责对接 kubelet runtime cri 的接口调用,并将调用转发给 containerd

继续,查看 containerd 的处理流程。

1.3 containerd

1.3.1 CRI Plugin

根据 cri 架构图, 从 CRI 插件入手查看 id 为 io.containerd.grpc.v1.criCRI 插件。

// containerd/plugins/cri/cri.go
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
	...
    // Get runtime service.
	criRuntimePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "runtime")
	if err != nil {
		return nil, fmt.Errorf("unable to load CRI runtime service plugin dependency: %w", err)
	}

    // Get image service.
	criImagePlugin, err := ic.GetByID(plugins.CRIServicePlugin, "images")
	if err != nil {
		return nil, fmt.Errorf("unable to load CRI image service plugin dependency: %w", err)
	}
    ...
    service := &criGRPCServer{
		RuntimeServiceServer: rs,
		ImageServiceServer:   is,
		Closer:               s, // TODO: Where is close run?
		initializer:          s,
	}

	if config.DisableTCPService {
		return service, nil
	}

	return criGRPCServerWithTCP{service}, nil
}

插件返回的是 criGRPCServerWithTCP 对象。其中,包括 criGRPCServer 对象。criGRPCServer 对象实现了 grpcService 接口,将调用接口的 Register 注册对象到 grpc server。

// containerd/plugins/cri/cri.go
// Register registers all required services onto a specific grpc server.
// This is used by containerd cri plugin.
func (c *criGRPCServer) Register(s *grpc.Server) error {
	return c.register(s)
}

func (c *criGRPCServer) register(s *grpc.Server) error {
	instrumented := instrument.NewService(c)
	runtime.RegisterRuntimeServiceServer(s, instrumented)
	runtime.RegisterImageServiceServer(s, instrumented)
	return nil
}

criGRPCServer.register 中创建 instrumentedService 对象。

type instrumentedService struct {
	c criService
}

func NewService(c criService) GRPCServices {
	return &instrumentedService{c: c}
}

instrumentedService 包括 criService 对象。实际提供 runtime serviceimage service 的就是 criService 对象。

以注册 runtime service 为例,查看 runtime.RegisterRuntimeServiceServer(s, instrumented) 做了什么。

// containerd/vendor/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
func RegisterRuntimeServiceServer(s *grpc.Server, srv RuntimeServiceServer) {
	s.RegisterService(&_RuntimeService_serviceDesc, srv)
}

var _RuntimeService_serviceDesc = grpc.ServiceDesc{
	ServiceName: "runtime.v1.RuntimeService",
	HandlerType: (*RuntimeServiceServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "Version",
			Handler:    _RuntimeService_Version_Handler,
		},
		{
			MethodName: "RunPodSandbox",
			Handler:    _RuntimeService_RunPodSandbox_Handler,
		},
        ...
    },
    ...
}

可以看到,注册 instrumentedService 到 grpc 中,instrumentedService 提供 runtime.v1.RuntimeService 服务,包括 kubelet 调用的 RunPodSandbox 方法。

继续看,instrumentedServiceRunPodSandbox 做了什么。

// containerd/internal/cri/instrumented_service.go
func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (res *runtime.RunPodSandboxResponse, err error) {
	...
	res, err = in.c.RunPodSandbox(ctrdutil.WithNamespace(ctx), r)
	return res, errdefs.ToGRPC(err)
}

instrumentedService 调用 criGRPCServerRunPodSandbox 方法,实际执行的是 criGRPCServer 中的 criServer 对象:

// containerd/internal/cri/server/sandbox_run.go
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {
	...
    if err := c.sandboxService.CreateSandbox(ctx, sandboxInfo, sb.WithOptions(config), sb.WithNetNSPath(sandbox.NetNSPath)); err != nil {
		return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err)
	}

	ctrl, err := c.sandboxService.StartSandbox(ctx, sandbox.Sandboxer, id)
    if err != nil {
        ...
    }
    ...
}

criService.RunPodSandbox 调用的是 sandboxServiceCreateSandboxStartSandbox 方法。

1.3.2 sanbox Plugin

sandboxServicecri.initCRIService 中实例化:

// containerd/plugins/cri/cri.go
func initCRIService(ic *plugin.InitContext) (interface{}, error) {
    ...
    sbControllers, err := getSandboxControllers(ic)
	if err != nil {
		return nil, fmt.Errorf("failed to get sandbox controllers from plugins %v", err)
	}
    ...
    options := &server.CRIServiceOptions{
		RuntimeService:     criRuntimePlugin.(server.RuntimeService),
		ImageService:       criImagePlugin.(server.ImageService),
		StreamingConfig:    streamingConfig,
		NRI:                getNRIAPI(ic),
		Client:             client,
		SandboxControllers: sbControllers,
	}
    ...
    s, rs, err := server.NewCRIService(options)
    ...
    service := &criGRPCServer{
		RuntimeServiceServer: rs,
		ImageServiceServer:   is,
		Closer:               s, // TODO: Where is close run?
		initializer:          s,
	}
}

首先,getSandboxControllers 获得 sandbox controllers:

// containerd/plugins/cri/cri.go
func getSandboxControllers(ic *plugin.InitContext) (map[string]sandbox.Controller, error) {
    // plugins.SandboxControllerPlugin: "io.containerd.sandbox.controller.v1"
	sandboxers, err := ic.GetByType(plugins.SandboxControllerPlugin)
	if err != nil {
		return nil, err
	}
	...
	return sc, nil
}

sandbox.Controller 是类型为 io.containerd.sandbox.controller.v1 的插件对象。将该对象作为 options 赋给 criServer

// containerd/internal/cri/server/service.go
func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServiceServer, error) {
	...
	c := &criService{
		...
		sandboxService:     newCriSandboxService(&config, options.SandboxControllers),
	}
    ...
}

func newCriSandboxService(config *criconfig.Config, sandboxers map[string]sandbox.Controller) *criSandboxService {
	return &criSandboxService{
		sandboxControllers: sandboxers,
		config:             config,
	}
}

criService.sandboxService.CreateSandbox 调用的是插件对象 sanbox controllersCreateSandbox 方法,该方法最终调用的是 sandboxClientCreateSandbox

func (c *sandboxClient) CreateSandbox(ctx context.Context, in *CreateSandboxRequest, opts ...grpc.CallOption) (*CreateSandboxResponse, error) {
	out := new(CreateSandboxResponse)
	err := c.cc.Invoke(ctx, "/containerd.runtime.sandbox.v1.Sandbox/CreateSandbox", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

可以看到,在 sandboxClient.CreateSandbox 中调用 containerd 提供的 containerd cri 接口 /containerd.runtime.sandbox.v1.Sandbox/CreateSandbox,该接口用来创建 sandbox,即 pod。

1.4 创建 pod 流程

根据上述分析,这里画出 kubeletcontainerd 的交互流程图如下:
image

2. 小结

本文在前文 Kubernetes:kubelet 源码分析之创建 pod 流程containerd 源码分析:启动注册流程 的基础上,进一步分析从 kubeletcontainerd 的交互流程,打通了 kubeletcontainerd 这一步。


与containerd 源码分析:kubelet 和 containerd 交互相似的内容:

containerd 源码分析:kubelet 和 containerd 交互

0. 前言 Kubernetes:kubelet 源码分析之创建 pod 流程 介绍了 kubelet 创建 pod 的流程,其中介绍了 kubelet 调用 runtime cri 接口创建 pod。containerd 源码分析:启动注册流程 介绍了 containerd 作为一种行业标准的高级

containerd 源码分析:创建 container(一)

0. 前言 Kubernetes:kubelet 源码分析之 pod 创建流程 介绍了 kubelet 创建 pod 的流程,containerd 源码分析:kubelet 和 containerd 交互 介绍了 kubelet 通过 cri 接口和 containerd 交互的过程,contain

containerd 源码分析:创建 container(三)

文接 containerd 源码分析:创建 container(二) 1.2.2.2 启动 task 上节介绍了创建 task,task 创建之后将返回 response 给 ctr。接着,ctr 调用 task.Start 启动容器。 // containerd/client/task.go fu

containerd 源码分析:启动注册流程

0. 前言 containerd 是一个行业标准的容器运行时,其强调简单性、健壮性和可移植性。本文将从 containerd 的代码结构入手,查看 containerd 的启动注册流程。 1. 启动注册流程 1.1 containerd 首先以调试模式运行 containerd: // contai

Tomcat处理http请求之源码分析

本文将从请求获取与包装处理、请求传递给Container、Container处理请求流程,这3部分来讲述一次http穿梭之旅。

[转帖]【k8s】二、containerd的安装

目录 前言 安装containerd 解压安装 配置成systemd任务 安装runc ​编辑 安装cni 配置containerd镜像源 containerd基本使用 拓展阅读 nerdctl工具安装及使用 整体脚本 总结 写在后面 前言 上一篇文章,我们介绍了虚拟机的基础环境以及基础的网络配置,

Dubbo架构设计与源码解析(一) 架构设计

作者:黄金 一、架构演变 单应用架构 > 垂直架构 > 分布式架构 > 微服务架构 > 云原生架构   二、Dubbo总体架构   1、角色职能 • Container:服务容器 (tomcat、jetty、weblogic) • Provider:服务提供者 •Consumer:服务消

opengrok源代码在线阅读平台搭建及字体修改

服务搭建 我所编写的docker-compose.yml如下,成功运行后将源码目录移动至 /data/opengrok/src ,重启容器使得opengrok快速更新索引 services: opengrok: container_name: opengrok # 1.6版本在使用中还算稳定 ima

[转帖]containerd_v1.6.0+nerdctl+buildkit 二进制安装,支持多CPU并发构建

一、安装containerd # yum install libseccomp -y #下载containerd curl -L https://github.com/containerd/containerd/releases/download/v1.6.0/cri-containerd-cni-

[转帖]Containerd安装与使用

https://www.cnblogs.com/punchlinux/p/16496094.html Containerd 的技术方向和目标 简洁的基于 gRPC 的 API 和 client library 完整的 OCI 支持(runtime 和 image spec) 同时具备稳定性和高性能的