[转帖]Kafka可靠性之HW与Leader Epoch

kafka,可靠性,hw,leader,epoch · 浏览次数 : 0

小编点评

**数据不一致场景** 1. 当 leader 收到了 fetch 请求时,如果没有消息要同步,返回 leader_HW=2 follower接到 response,再更新follower_HW = 2就OK了。 2. 由于有新的消息,旧的 leader 必须重新开始,所以它只能从 offset = 100 开始写入,当前还没死0 01 100 leader epoch 的作用在日志截断之前,加一层判断是否有必要截断。 3. 如果处于同一天的场景, leader 只能取 follower_max_version+1 这个版本对应的start_offset。 4. 如果处于不同一天的场景, leader 只能取 follower_max_version对应的start_offset。

正文

《深入理解Kafka:核心设计与实现原理》是基于2.0.0版本的书

在这本书中,终于看懂了笔者之前提过的几个问题

准备知识

1、leader里存着4个数据:leader_LEO、leader_HW、remote_LEO集合、remote_HW集合

2、follower里只保存自身的:follower_LEO、follower_HW

HW和LEO更新过程

假设:

1个leader 和 2个follower

此时:leader_LEO = 5、leader_HW = 0、所有 follower 的 follower_LEO = 0、follower_HW = 0

同步开始

1、follower向leader发送fetch请求,来拉取消息,这个fetch请求会带上各自的follower_LEO

(先省略leader处理fetch请求的过程)

2、leader接收fetch请求后,处理并返回,这个fetch请求的响应会带上 leader_HW

3、follower拿到了fetch请求的response,同步消息数据,更新自己的 follower_LEO

(假设此时 这2个 follower_HW 分别为3和4)

4、接下来计算一下 follower_HW 并更新

公式:follower_HW = min(leader_HW,follower_LEO) 

所以 两个follower此时的 follower_HW 都是 min(0,0) = 0

5、follower再次发送fetch请求,并带上各自的 follower_LEO

6、leader处理fetch请求过程如下

把这2个 follower_LEO 加上自己的 leader_LEO,取最小值作为新的 leader_HW

公式:leader_HW = min(leader_LEO,RemoteIsrLEO)

(RemoteIsrLEO:ISR中的所有follower的follower_LEO)

leader_HW = min(l5,3,4) = 3

7、leader_HW = 3 被 fetch请求的response带上,返回

8、2个follower收到response后,依然是,同步消息数据,更新 follower_LEO,更新 follower_HW

(假设此时 这2个 follower_HW 分别更新至 6和8)

follower_HW1 =  min(3,6) = 3

follower_HW2 =  min(3,8) = 3

(leader在上面几张图分别等于5、10、15、20的意思是,在上述过程中,leader一直接收着producer的消息数据)、

为什么follower_HW = min(leader_HW,follower_LEO) 

因为这个follower没说是哪里的follower。

当然对于ISR中的follower根本不必多此一举,其实直接 follower_HW = leader_HW 就可以

但是对于OSR中正在拼命追赶leader的follower,它的follower_LEO可能小于leader_HW

所以公式为什么是公式,原因就在这。

如图为笔者的理解。

至于很多博客,说什么谁大谁小,谁一定小于谁,谁一定小于等于谁。

不加限定条件你说NM呢?HW和LEO多了去了,谁知道你说的是哪个和哪个比?

HW更新的问题

大家仔细观察上图

这幅图展示的是follower接收到了fetch请求的response,更新了各自的follower_LEO和follower_HW

问题在哪?

leader 中有数据 offset 0-9 

follower1 中有数据 offset 0-2

follower2 中有数据 offset 0-3

现在是不是offset 0-2这3个消息是ISR中所有replica中都有了,这是不是完全同步了。

既然是,为什么HW不更新成3?offset 0-2这3条数据此时,按理说就应该可以被consumer消费了。

但是并没有,HW更新为3是下一次fetch请求响应都结束后,leader_HW和follower_HW才更新成3的。

remote_LEO、leader_HW、follower_HW的更新发生于follower_LEO更新后的第二轮fetch请求。

说人话就是:

消息数据已经完全同步了,但是HW的更新比消息晚了一次请求。

也可以理解为HW的更新是异步的。

因为按照我们的认知,既然消息已经在ISR完全同步成功后,应该立马可以消费,或者说,消息和HW应该同步更新。

也正因为不是这样的,所以才会出现数据丢失和数据一致性的问题。

HW的截断

假设此时的状态是 HW= 5,即:offset = 0~4的消息对于consumer可见

此时leader宕机,follower1选举为leader,leader重启成为follower。

原则就是:一切以leader为准,新follower此时会根据之前 HW 位置进行日志截断,并重新发起fetch请求追赶。

(这个值会存入本地的复制点文件 replication-offset-checkpoint )

那么新follower最多能追到offset=6,这是不是说明offset=7的消息就丢了。

不是的,这种情况就是正常的情况,因为offset=7这个消息并未“完全同步”,这个消息根本没有写入成功,何谈丢消息。

如图,如果offset=5的消息丢了,那才是真正的丢消息。

HW机制之丢失消息

在 0.11.0.0 版本之前, Kafka使用的是基于HW的同步机制,

但这样有可能出现数据丢失或leader副本和follower副本数据不一致的问题。

官方已经自己给出例子说明了

https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation

简言之,既然消息数据和HW是异步更新的,

那么你一定能找出某种场景:让已经实现“完全同步”但HW没来得及更新的消息丢失。

1、此时leader收到fetch请求,没有消息要同步了,返回leader_HW=2

follower接到response,再更新follower_HW = 2就OK了。

删除线部分并没有发生,而是follower宕机了,此时再次重启follower,重启之后会根据之前 HW 位置进行日志截断 。

好的,此时follower上的offset=1的消息没了。

2、follower截断日志后,应该去追赶leader,先一次fetch,从新把offset=1的消息拿过来,再一次fetch,把HW更新为2

删除线部分依然没发生,而是leader宕机了,follower成了新leader,重启旧leader成为了follower。

3、follower_HW不能比leader_HW 大,必须以leader为准,所以还会做一次日志截断,以此将follower_HW调整为 1

至此,offset=1的消息就被玩没了。

HW机制之数据不一致

场景:leader有 2 条消息,且 HW 和 LEO 都为 2, follower有 1 条消息,且 HW 和 LEO 都为 1

1、leader和follower同时宕机,follower率先复苏,成为新leader,并接收一个消息m

并将LEO和HW更新至 2 (假设所有场景中的 min.insync.replicas 参数配置为 1)

2、旧leader重启,成为follower,需要根据 HW 截断日志及发送 FetchRequest 至新leader ,

不过此时HW相等,那么就可以不做任何调整了。

至此,数据就不一致了。

leader epoch 方案

Kafka 从 0.11.0.0 开始引入了 leader epoch 的概念,

笔者总结的核心思路

1、在需要截断数据的时候使用 leader epoch 作为参考依据而不是原本的 HW

2、leader epoch 把他比喻成 时代、朝代 都可以,就是为了比较,follower和leader是不是处于一个朝代的

方唐镜:包大人你好大的官威啊,拿前朝的剑斩本朝的官!

3、如果处于同时代,follower的LEO对比leader的LEO,follower落后就不用截断,超过就自行了断。

4、如果处于不同时代,// TODO

存储位置和segment是一起的,文件名:leader-epoch-checkpoint

文件内容分两列

第一列:leader的版本号,每当leader出现变更就会追加一条记录,版本号+1

第二列:这个leader上任后,写入的第一条数据的offset

  1. # 说明leader的0代目,写下了offset 099 = 100 条消息,之后当场去世
  2. # 1代目,从offset = 100 开始写入,目前还没死
  3. 0 0
  4. 1 100

leader epoch 的作用

在日志截断之前,加一层判断是否有必要截断。而这个判断的依据就是leader epoch

当replica成为leader时

收到生产者发来的第一条消息时,会将新的epoch和当前LEO添加到leader-epoch-checkpoint文件中。

当replica成为follower时

(epoch太别扭了,以下都用version代替)

0、先不急着截断

1、向leader发送请求,带上follower当前本地的最新epoch,也就是文件中的那个最大版本号 follower_max_version

2、leader接收请求,需要进行一下计算,并返回一个last_offset值

如果 follower_max_version = leader_max_version,last_offset = leader_LEO

如果不相等,leader就取 follower_max_version+1 这个版本对应的start_offset 当作 last_offset

3、follower拿到response里的 last_offset 进行判断

如果 last_offset  < follower_LEO,需要进行截断

否则不需要

4、开始fetch请求

笔者的疑问

很奇怪,说到last_offset的判断时,书里也只说了相等或不相等,很多看似华丽的博客完全都是copy书上的,一点没改。

按理来说,不应该区分以下谁大谁小么?

不相等,为什么能保证一定能取到  follower_max_version+1 的数据呢?

会不会有的follower的follower_max_version比leader中的还大,这个不得而知。

【如果不相等,leader就取 follower_max_version+1 这个版本对应的start_offset 】

你怎么就知道 follower_max_version 一定比leader的小呢?

———————————————以下是笔者的假设———————————————————

假设,一个leader,目前为止还没写入过消息,所有replica的leader epoch 都是一样的,等于2。

此时,这个leader收到了producer的一条数据,记录了leader epoch,假设为3。

然后他马上宕机,此时其他的follower的 leader epoch还都是2(这就是问题所在,这个epoch到底是怎么同步的)

有一个follower选举为leader,且一直没有producer发消息,这个新leader的 leader epoch 也是2,

这是旧的leader重启成为follower,那么此时是不是follower的leader epoch比新leader的大了呢?

——————————————————————————————————————————

笔者仔细看了官方的文档,借助谷歌翻译,大概能知道为了保持leader epoch的同步,Kafka一定做了某些处理方案,

但是具体的看不懂。

而对于,两本书中都提到的“不等于”、“否则”,哎,不等于到底是大于还是小于啊?

如果在leader中一定能找到 follower_max_version+1 的 epoch,那为什么用不等于这种模棱两可的表达呢?

 

 

 

 

 

 

文章知识点与官方知识档案匹配,可进一步学习相关知识
云原生入门技能树首页概览13727 人正在系统学习中

与[转帖]Kafka可靠性之HW与Leader Epoch相似的内容:

[转帖]Kafka可靠性之HW与Leader Epoch

《深入理解Kafka:核心设计与实现原理》是基于2.0.0版本的书 在这本书中,终于看懂了笔者之前提过的几个问题 准备知识 1、leader里存着4个数据:leader_LEO、leader_HW、remote_LEO集合、remote_HW集合 2、follower里只保存自身的:follower

[转帖]Kafka之ISR机制的理解

Kafka对于producer发来的消息怎么保证可靠性? 每个partition都给配上副本,做数据同步,保证数据不丢失。 副本数据同步策略 和zookeeper不同的是,Kafka选择的是全部完成同步,才发送ack。但是又有所区别。 所以,你们才会在各种博客看到这句话【kafka不是完全同步,也不

[转帖]Kafka常见使用场景与Kafka高性能之道

https://juejin.cn/post/6958997115012186119 消息队列使用场景 队列,在数据结构中是一种先进先出的结构,消息队列可以看成是一个盛放消息的容器,这些消息等待着各种业务来处理。 消息队列是分布式系统中重要的组件,kafka就可以看做是一种消息队列,其大致使用场景:

[转帖]Kafka 与RocketMQ 落盘机制比较

https://www.jianshu.com/p/fd50befccfdd 引言 前几期的评测中,我们对比了Kafka和RocketMQ的吞吐量和稳定性,本期我们要引入一个新的评测标准——软件可靠性。 何为“可靠性”? 先看下面这种情况:有A,B两辆越野汽车,在城市的周边地区均能很好应对泥泞的路况

[转帖]Kafka高可用 — KRaft集群搭建

Apache Kafka Raft 是一种共识协议,它的引入是为了消除 Kafka 对 ZooKeeper 的元数据管理的依赖,被社区称之为 Kafka Raft metadata mode,简称 KRaft 模式。本文介绍了KRaft模式及三节点的 KRaft 集群搭建。 1 KRaft介绍 KR

[转帖]Kafka高可用 — KRaft集群搭建

Apache Kafka Raft 是一种共识协议,它的引入是为了消除 Kafka 对 ZooKeeper 的元数据管理的依赖,被社区称之为 Kafka Raft metadata mode,简称 KRaft 模式。本文介绍了KRaft模式及三节点的 KRaft 集群搭建。 1 KRaft介绍 KR

[转帖]Kafka中offsets.retention.minutes和log.retention.minutes之间的区别

https://www.cnblogs.com/lestatzhang/p/10771115.html 前言 在Kafka中,我们可能会发现两个与retention相关的配置: log.retention.minutes offsets.retention.minutes 那么它们之前的差别是什么呢

[转帖]kafka_export 部署实战

https://zhuanlan.zhihu.com/p/57704357 Kafka Exporter 监控 Kafka 实时数据 需要安装的组件 Prometheus:时序数据库,按时间保存监控历史数据。语言:Go Grafana:metrics 可视化系统 Kafka Exporter:一个用

[转帖]kafka漏洞升级记录,基于SASL JAAS 配置和 SASL 协议,涉及版本3.4以下

攻击者可以使用基于 SASL JAAS 配置和 SASL 协议的任意 Kafka 客户端,在对 Kafka Connect worker 创建或修改连接器时,通过构造特殊的配置,进行 JNDI 注入。 影响范围:2.3.0 <= Apache Kafka <= 3.3.2 解决办法:升级到3.4版本

[转帖]关于kafka压力测试(使用官方自带脚本测试)

文章目录 kafka官方自带压测脚本文件Producer生产者环境测试测试命令返回测试结果 Consumer消费者环境测试测试命令测试结果说明 提升kafka的吞吐量可通过以下的方式来提升kafka生产者的吞吐量buffer.memorycompression.typebatch.sizelinge