如何借助Kafka持久化存储K8S事件数据?

如何,借助,kafka,持久,存储,k8s,事件,数据 · 浏览次数 : 93

小编点评

** Kubernetes 事件处理链路** **核心概念:** * **EventRouter**:开源的 Kubernetes 事件处理器,将所有集群事件整合到 Kafka 主题中。 * **Strimzi Operator**:用于创建和更新 Kafka broker 和主题的 Kubernetes 对象。 * **EventSource**:将 Kubernetes 事件推送到 Kafka 主题中。 **步骤:** 1. **创建 Kafka 集群:** - 创建一个新的 Kafka 集群,指定主题和副本数。 2. **创建 Kafka 主题:** - 创建一个名为 `cluster-events` 的主题,设置相关配置,例如主题类型和分区。 3. **编写 Golang 消费者:** - 创建一个消费者,监听 `cluster-events` 主题,并根据事件的命名空间将数据分发到多个主题中。 4. **使用 Spark Structured Streaming 的消费者:** - 如果您的事件量很大,可以使用 Spark Structured Streaming 的消费者实现更高性能的处理。 **完整代码:** ```go // main.go import ( "context" "fmt" "os/signal" kafka "github.com/segmentika/kafka-go" ) func main() { // ... // Create Kafka producer and consumer p := kafka.NewProducer(cfg.Endpoint) c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic) // ... // Run worker thread go func() { for run := true; run; { data, err := c.Read() if err != nil { fmt.Errorf("read event error: %v", err) time.Sleep(5 * time.Second) continue } // Create and send destination event msg, err := event.CreateDestinationMessage(data) if err != nil { fmt.Errorf("cannot create destination event: %v", err) } p.Write(msg.Topic, msg.Message) } }() // Wait for signals os.Wait() } ``` **结论:** 通过使用 EventRouter、Strimzi Operator 和 Spark Structured Streaming,您可以构建一个可靠且实时地监控 Kubernetes 集群的事件日志。

正文

大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。
 

$ kubectl get events

15m         Warning   FailedCreate                                                                                                      replicaset/ml-pipeline-visualizationserver-865c7865bc    

Error creating: pods "ml-pipeline-visualizationserver-865c7865bc-" is forbidden: error looking up service account default/default-editor: serviceaccount "default-editor" not found

 

尽管这些信息十分有用,但它只是临时的,保留时间最长为30天。如果出于审计或是故障诊断等目的,你可能想要把这些信息保留得更久,比如保存在像 Kafka 这样更持久、高效的存储中。然后你可以借助其他工具(如 Argo Events)或自己的应用程序订阅 Kafka 主题来对某些事件做出响应。
 

构建K8s事件处理链路

我们将构建一整套 Kubernetes 事件处理链路,其主要构成为:

  • Eventrouter,开源的 Kubernetes event 处理器,它可以将所有集群事件整合汇总到某个 Kafka 主题中。
  • Strimzi Operator,在 Kubernetes 中轻松管理 Kafka broker。
  • 自定义 Go 二进制文件以将事件分发到相应的 Kafka 主题中。
     

为什么要把事件分发到不同的主题中?比方说,在集群的每个命名空间中存在与特定客户相关的 Kubernetes 资产,那么在使用这些资产之前你当然希望将相关事件隔离开。
 

本示例中所有的配置、源代码和详细设置指示都已经放在以下代码仓库中:
https://github.com/esys/kube-events-kafka
 

 

创建 Kafka broker 和主题

我选择使用 Strimzi(strimzi.io/) 将 Kafka 部署到 Kubernetes 中。简而言之,它是用于创建和更新 Kafka broker 和主题的。你可以在官方文档中找到如何安装该 Operator 的详细说明:
https://strimzi.io/docs/operators/latest/overview.html
 

首先,创建一个新的 Kafka 集群:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: kube-events
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 3
      log.message.format.version: "2.6"
      offsets.topic.replication.factor: 3
      transaction.state.log.min.isr: 2
      transaction.state.log.replication.factor: 3
    listeners:
    - name: plain
      port: 9092
      tls: false
      type: internal
    - name: tls
      port: 9093
      tls: true
      type: internal
    replicas: 3
    storage:
      type: jbod
      volumes:
      - deleteClaim: false
        id: 0
        size: 10Gi
        type: persistent-claim
    version: 2.6.0
  zookeeper:
    replicas: 3
    storage:
      deleteClaim: false
      size: 10Gi
      type: persistent-claim

 

然后创建 Kafka 主题来接收我们的事件:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cluster-events
spec:
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
  partitions: 1
  replicas: 1

 

设置 EventRouter

在本教程中使用 kubectl apply 命令即可,我们需要编辑 router 的配置,以指明我们的 Kafka 端点和要使用的主题:

apiVersion: v1
data:
  config.json: |-
    {
      "sink": "kafka",
      "kafkaBrokers": "kube-events-kafka-bootstrap.kube-events.svc.cluster.local:9092",
      "kafkaTopic": "cluster-events"
    }
kind: ConfigMap
metadata:
  name: eventrouter-cm

 

验证设置是否正常工作

我们的 cluster-events Kafka 的主题现在应该收到所有的事件。最简单的方法是在主题上运行一个 consumer 来检验是否如此。为了方便期间,我们使用我们的一个 Kafka broker pods,它已经有了所有必要的工具,你可以看到事件流:

kubectl -n kube-events exec kube-events-kafka-0 -- bin/kafka-console-consumer.sh \
  --bootstrap-server kube-events-kafka-bootstrap:9092 \
  --topic kube-events \
  --from-beginning
{"verb":"ADDED","event":{...}}
{"verb":"ADDED","event":{...}}
...

 

编写 Golang 消费者

现在我们想将我们的 Kubernetes 事件依据其所在的命名空间分发到多个主题中。我们将编写一个 Golang 消费者和生产者来实现这一逻辑:

  • 消费者部分在 cluster-events 主题上监听传入的集群事件
  • 生产者部分写入与事件的命名空间相匹配的 Kafka 主题中
     

如果为Kafka配置了适当的选项(默认情况),就不需要特地创建新的主题,因为 Kafka 会默认为你创建主题。这是 Kafka 客户端 API 的一个非常酷的功能。

p, err := kafka.NewProducer(cfg.Endpoint)
if err != nil {
        sugar.Fatal("cannot create producer")
}
defer p.Close()

c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic)
if err != nil {
        sugar.Fatal("cannot create consumer")
}
defer c.Close()

run := true
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
        sig := <-sigs
        sugar.Infof("signal %s received, terminating", sig)
        run = false
}()

var wg sync.WaitGroup
go func() {
        wg.Add(1)
        for run {
                data, err := c.Read()
                if err != nil {
                        sugar.Errorf("read event error: %v", err)
                        time.Sleep(5 * time.Second)
                        continue
                }
                if data == nil {
                        continue
                }
                msg, err := event.CreateDestinationMessage(data)
                if err != nil {
                        sugar.Errorf("cannot create destination event: %v", err)
                }
                p.Write(msg.Topic, msg.Message)
        }
        sugar.Info("worker thread done")
        wg.Done()
}()

wg.Wait()

 

完整代码在此处:
https://github.com/esys/kube-events-kafka/blob/master/events-fanout/cmd/main.go
 

当然还有更高性能的选择,这取决于预计的事件量和扇出(fanout)逻辑的复杂性。对于一个更强大的实现,使用 Spark Structured Streaming 的消费者将是一个很好的选择。
 

部署消费者

构建并将二进制文件推送到 Docker 镜像之后,我们将它封装为 Kubernetes deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: events-fanout
  name: events-fanout
spec:
  replicas: 1
  selector:
    matchLabels:
      app: events-fanout
  template:
    metadata:
      labels:
        app: events-fanout
    spec:
      containers:
        - image: emmsys/events-fanout:latest
          name: events-fanout
          command: [ "./events-fanout"]
          args:
            - -logLevel=info
          env:
            - name: ENDPOINT
              value: kube-events-kafka-bootstrap:9092
            - name: TOPIC
              value: cluster-events

 

检查目标主题是否创建

现在,新的主题已经创建完成:

kubectl -n kube-events get kafkatopics.kafka.strimzi.io -o name

kafkatopic.kafka.strimzi.io/cluster-events
kafkatopic.kafka.strimzi.io/kube-system
kafkatopic.kafka.strimzi.io/default
kafkatopic.kafka.strimzi.io/kafka
kafkatopic.kafka.strimzi.io/kube-events

 

你会发现你的事件根据其命名空间整齐地存储在这些主题中。
 

总结

访问 Kubernetes 历史事件日志可以使你对 Kubernetes 系统的状态有了更好的了解,但这单靠 kubectl 比较难做到。更重要的是,它可以通过对事件做出反应来实现集群或应用运维自动化,并以此来构建可靠、反应灵敏的软件。
 

原文链接:
https://hackernoon.com/monitor-your-kubernetes-cluster-events-with-eventrouter-golang-and-kafka-wh2a35l0

与如何借助Kafka持久化存储K8S事件数据?相似的内容:

如何借助Kafka持久化存储K8S事件数据?

大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。 ``` $ kubectl get events 15m Warning FailedCreate replicaset/ml-pipe

ChatGPT之问艺道:如何借助神级算法Prompt,让你轻松get到更高质量答案?

> 摘要:本文由[葡萄城技术团队](https://www.grapecity.com.cn/)编写,文章的内容借鉴于Ibrahim John的《The Art of Asking ChatGPT》(向ChatGPT提问的艺术)。 # 前言 当今,ChatGPT赢得越来越多人的青睐,人们通过它输入问

低代码平台如何借助Nginx实现网关服务

摘要:本文由葡萄城技术团队于博客园原创并首发。转载请注明出处:葡萄城官网,葡萄城为开发者提供专业的开发工具、解决方案和服务,赋能开发者。 前言 在典型的系统部署架构中,应用服务器是一种软件或硬件系统,它承载着应用程序的核心逻辑。它接收客户端的请求并处理相应的业务逻辑、数据操作等任务。应用服务器通常被

[转帖]高性能网络实战:借助 eBPF 来优化负载均衡的性能

https://zhuanlan.zhihu.com/p/592981662 网络性能优化,eBPF 是如何发挥作用的呢? 本篇文章,我就以最常用的负载均衡器为例,带你一起来看看如何借助 eBPF 来优化网络的性能。 1 Nginx 负载均衡器 既然要优化负载均衡器的网络性能,那么首先就需要有一个优

一行命令即可启动 Walrus丨入门教程

应用管理平台 Walrus 已正式开源,本文将介绍如何上手安装 Walrus 以及如何借助 Walrus 进行应用部署。

破局DevOps|8大北极星指标指引研发效能方向

放弃那些动辄就上百个的研发度量指标吧,8大北极星指标指引你的研发效能方向,1个北极星指标公式让你清晰了解​公司研发效能现状。 每当研发效能/DevOps业务做规划的时候,有的人就会毫无头绪,不知道如何下手,不知道方向在哪里,价值怎么衡量。本文将介绍如何借助北极星指标这个工具来帮我们完成这项工作,并以

Serverless冷启动:如何让函数计算更快更强?

摘要:借助Serverless计算,开发者仅需上传业务代码并进行简单的资源配置便可实现服务的快速构建部署,云服务商则按照函数服务调用量和实际资源使用收费,从而帮助用户实现业务的快速交付和低成本运行。 本文分享自华为云社区《Serverless冷启动:如何让函数计算更快更强?》,作者:DevAI 。

如何建设一个用于编译 iOS App 的 macOS 云服务器集群?

现代软件开发一般会借助 CI/CD 来提升代码质量、加快发版速度、自动化重复的事情,iOS App 只能在 mac 机器上编译,CI/CD 工具因此需要有一个 macOS 云服务器集群来执行 iOS App 的编译。今天就来谈谈如何建设 macOS 云服务器集群

Redis如何批量删除指定前缀的key

批量删除指定前缀的Key有两中方法,一种是借助 `redis-cli`,另一种是通过 `SCAN` 命令来遍历所有匹配前缀的 key,并使用 `DEL` 命令逐个删除它们。 ## redis-cli 使用 Redis 自带的 `redis-cli` 命令行工具,你可以通过以下方式批量删除指定前缀的

如何提升百度小程序的收录?百度小程序如何做优化?

如何通过百度小程序获得更多的自然流量?这是做百度小程序肯定要考虑的问题,做百度小程序的目的就是想借助百度生态,做相应的关键词给自己的小程序引流