从kafka与Flink的事务原理来看二阶段提交与事务日志的结合使用

kafka,flink,事务,原理,来看,阶段,提交,日志,结合,使用 · 浏览次数 : 62

小编点评

**两阶段提交的成立条件:** 1. **节点间网络通信:**节点之间可以进行网络通信。 2. **预写式日志:**所有节点采用预写式日志,日志被写入后即被保存在可靠的存储设备上。 3. **kafka事务kafka:**kafka事务kafka实现了Exactly Once(精确一次)语义,主要基于生产者端幂等以及kafka服务端事务保障。 4. **序列号:**每个分区都有一个顺序的消息日志,序列号帮助确保消息按照正确的顺序添加到分区中。 5. **事务原理:**kafka引入了Transaction Coordinator(类似Seata AT模式中的TC组件)用于协调管理事务。

正文

两阶段提交的成立要基于以下假设:

  • 该分布式系统中,存在一个节点作为协调者,其他节点作为参与者,且节点之间可以进行网络通信。

  • 所有节点都采用预写式日志,且日志被写入后即被保存在可靠的存储设备上,即使节点损坏也不会导致日志数据的丢失。

  • 所有节点不会永久性损坏,即使损坏后也可以恢复。

kafka事务

kafka实现了Exactly Once(精确一次)语义,主要是基于生产者端幂等以及kafka服务端事务保障。

生产者幂等

生产者幂等的实现主要是通过序列号(Sequence Number)标识分区消息顺序:

  1. Kafka的生产者幂等性是一种特性,它确保生产者在发送消息时,无论消息是否成功传递,都不会导致重复消息的发送。
  2. 幂等性是通过分配唯一的序列号(Sequence Number)给每条消息来实现的。这个序列号通常是递增的,每次发送新消息时会增加。
  3. 当生产者发送一条消息时,Kafka会根据消息的主题、分区和序列号来识别该消息,如果消息已经被成功接收并记录,那么即使生产者尝试再次发送具有相同序列号的消息,Kafka也只会视它为一条消息,不会重复添加。

序列号(Sequence Number)的作用:

  • 序列号是为了确保消息的唯一性和有序性。它有助于Kafka在消息传递过程中跟踪消息,防止消息丢失或被重复传递。
  • 序列号还用于保持消息的顺序。在Kafka中,每个分区都有一个顺序的消息日志,序列号帮助确保消息按照正确的顺序添加到分区中。

事务原理

kafka引入了Transaction Coordinator(类似Seata AT模式中的TC组件)用于协调管理事务。

伪代码如下:

// 创建 Producer 实例,并且指定 transaction id
KafkaProducer producer = createKafkaProducer(
  “bootstrap.servers”, “localhost:9092”,
  “transactional.id”, “my-transactional-id”);

// 初始化事务,这里会向 TC 服务申请 producer id
producer.initTransactions();

// 创建 Consumer 实例,并且订阅 topic
KafkaConsumer consumer = createKafkaConsumer(
  “bootstrap.servers”, “localhost:9092”,
  “group.id”, “my-group-id”,
  "isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));

while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  // 开始新的事务
  producer.beginTransaction();
  for (ConsumerRecord record : records) {
    // 发送消息到分区
    producer.send(producerRecord(“outputTopic_1”, record));
    producer.send(producerRecord(“outputTopic_2”, record));
  }
  // 提交 offset
  producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");  
  // 提交事务
  producer.commitTransaction();
}

第一阶段
TC 服务收到事务提交请求后,会先将提交信息先持久化到事务 topic 。持久化成功后,服务端就立即发送成功响应给 Producer。然后找到该事务涉及到的所有分区,为每个分区生成提交请求,存到队列里等待发送。此时事务消息状态为事务提交.

第二阶段
后台线程会不停的从队列里,拉取请求并且发送到分区。当一个分区收到事务结果消息后,会将结果保存到分区里,并且返回成功响应到 TC服务。当 TC 服务收到所有分区的成功响应后,会持久化一条事务完成的消息到事务 topic。至此,一个完整的事务流程就完成了。

区别于一般的二阶段提交,协调者需要收到所有参与者的响应后,才能判断此事务是否成功,最后才将结果返回给客户。

kafka的处理逻辑则为:如果 TC 服务在发送响应给 Producer 后,还没来及向分区发送请求就挂掉了。因为每次事务的信息都会持久化,所以 TC 服务挂掉重新启动后,会先从 事务 topic 加载事务信息,如果发现只有事务提交信息,却没有后来的事务完成信息,说明存在事务结果信息没有提交到分区。

这里的事务消息就是事务日志。

image

参考

Kafka 事务实现原理

Exactly Once语义与事务机制原理

Flink将两阶段提交协议中的通用逻辑抽象为了一个类——TwoPhaseCommitSinkFunction。

我们在实现端到端exactly-once的应用程序时,只需实现这个类的4个方法即可:

  • beginTransaction:开始事务时,会在目标文件系统上的临时目录中创建一个临时文件,之后将处理数据写入该文件。

  • preCommit:在预提交时,我们会刷新文件,关闭它并不再写入数据。我们还将为下一个Checkpoint的写操作启动一个新事务。

  • commit:在提交事务时,我们自动将预提交的文件移动到实际的目标目录。

  • abort:中止时,将临时文件删除。

第一阶段

Checkpoint的开始表示两阶段提交协议的"pre-commit"阶段,当触发Checkpoint时,Flink JobManager会向数据流注入一个barrier(它将数据流中的记录划分为进入当前Checkpoint的部分和进入下一个Checkpoint的部分)。Barrier会随着数据流在operator之间传递,对于每一个operator,都会触发它的状态后端来保存其状态数据。

预提交阶段在Checkpoint成功完成之后结束。在第一个阶段结束时,数据会被写入到外部存储。

第二阶段

当所有的实例做快照完成,并且都执行完 preCommit 时,会把快照完成的消息发送给 JobManager,JobManager(TC协调器)收到后会认为本次 Checkpoint 完成了,会向所有的实例发送 Checkpoint 完成的通知(Notify Checkpoint Completed),当 Sink 算子收到这个通知之后,就会执行 commit 方法正式提交。

这里的状态后端/外部存储对应的是事务日志。用于持久化日志信息。

image

Flink Checkpoint机制也是基于二阶段提交与事务日志来实现的。
可参考 <<Flink 内核原理与实现>>一书的第13章,见详细描述

参考

Flink——Flink CheckPoint之两阶段提交协议

剖析 Flink 端到端的一致性

与从kafka与Flink的事务原理来看二阶段提交与事务日志的结合使用相似的内容:

从kafka与Flink的事务原理来看二阶段提交与事务日志的结合使用

两阶段提交的成立要基于以下假设: - 该分布式系统中,存在一个节点作为协调者,其他节点作为参与者,且节点之间可以进行网络通信。 - 所有节点都采用预写式日志,且日志被写入后即被保存在可靠的存储设备上,即使节点损坏也不会导致日志数据的丢失。 - 所有节点不会永久性损坏,即使损坏后也可以恢复。 ###

聊聊Kafka的生产者消费者确认机制

## 生产者确认机制 消息从生产者客户端发送至broker服务端topic,需要ack确认。`acks`与`min.insync.replicas`是两个配置参数.其中`acks`是producer的配置参数,`min.insync.replicas`是Broker端的配置参数,这两个参数对于生产者

从Kafka中学习高性能系统如何设计

相信各位小伙伴之前或多或少接触过消息队列,比较知名的包含Rocket MQ和Kafka,在京东内部使用的是自研的消息中间件JMQ,从JMQ2升级到JMQ4的也是带来了性能上的明显提升,并且JMQ4的底层也是参考Kafka去做的设计。在这里我会给大家展示Kafka它的高性能是如何设计的,大家也可以学习相关方法论将其利用在实际项目中,也许下一个顶级项目就在各位的代码中产生了。

可重入锁思想,设计MQ迁移方案

如果你的MQ消息要从Kafka切换到RocketMQ且不停机,怎么做?在让这个MQ消息调用第三方发奖接口,但无幂等字段又怎么处理?今天小傅哥就给大家分享一个关于MQ消息在这样的场景中的处理手段。 这是一种比较特例的场景,需要保证切换的MQ消息不被两端同时消费,并且还需要在一段消费失败后的MQ还可以继

Kafka多维度调优

优化金字塔 应用程序层面 框架层面(Broker层面) JVM层面 操作系统层面 应用程序层面:应当优化业务代码合理使用kafka,合理规划主题,合理规划分区,合理设计数据结构; 框架层面:在不改动源码的情况下,从kafka参数配置入手,结合业务体量和运行数据进行调优 JVM层面:在出现明显缓慢和可

大数据 - DWD&DIM 行为数据

我们前面采集的日志数据已经保存到 Kafka 中,作为日志数据的 ODS 层,从 Kafka 的ODS 层读取的日志数据分为 3 类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回 Kafka 不同主题中,作为日

[转帖]kafka-console-ui v1.0.6发布

前言 kafka-console-ui 是一款web版的kafka管理平台,从第一次发布到现在已经两年了,断断续续也更新了7个版本了(v1.0.0~v1.0.6)。 一些常用的功能也陆续完善了不少,相对最新的kafka版本,某些功能上还是有所欠缺,当前支持的功能如下: 源码 github: http

专为小白打造—Kafka一篇文章从入门到入土

Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。

kettle从入门到精通 第六十九课 ETL之kettle kettle cdc mysql,轻松实现实时增量同步

1、之前kettle cdc mysql的时候使用的方案是canal+kafka+kettle,今天我们一起学习下使用kettle的插件Debezium直接cdc mysql。 注:CDC (Change Data Capture) 是一种技术,用于捕获和同步数据库中的更改。 1)Debezium步

Kafka 线上性能调优

Kafka 线上性能调优是一项综合工程,不仅仅是 Kafka 本身,还应该从硬件(存储、网络、CPU)以及操作系统方面来整体考量,首先我们要有一套生产部署方案,基于这套方案再进行调优,这样就有了可靠的底层保证,才能保证 Kafka 集群整体的稳定性。 1. 线上部署方案 1.1 操作系统 我们知道