两阶段提交的成立要基于以下假设:
该分布式系统中,存在一个节点作为协调者,其他节点作为参与者,且节点之间可以进行网络通信。
所有节点都采用预写式日志,且日志被写入后即被保存在可靠的存储设备上,即使节点损坏也不会导致日志数据的丢失。
所有节点不会永久性损坏,即使损坏后也可以恢复。
kafka实现了Exactly Once(精确一次)语义,主要是基于生产者端幂等以及kafka服务端事务保障。
生产者幂等的实现主要是通过序列号(Sequence Number)标识分区消息顺序:
序列号(Sequence Number)的作用:
kafka引入了Transaction Coordinator(类似Seata AT模式中的TC组件)用于协调管理事务。
伪代码如下:
// 创建 Producer 实例,并且指定 transaction id
KafkaProducer producer = createKafkaProducer(
“bootstrap.servers”, “localhost:9092”,
“transactional.id”, “my-transactional-id”);
// 初始化事务,这里会向 TC 服务申请 producer id
producer.initTransactions();
// 创建 Consumer 实例,并且订阅 topic
KafkaConsumer consumer = createKafkaConsumer(
“bootstrap.servers”, “localhost:9092”,
“group.id”, “my-group-id”,
"isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
// 开始新的事务
producer.beginTransaction();
for (ConsumerRecord record : records) {
// 发送消息到分区
producer.send(producerRecord(“outputTopic_1”, record));
producer.send(producerRecord(“outputTopic_2”, record));
}
// 提交 offset
producer.sendOffsetsToTransaction(currentOffsets(consumer), "my-group-id");
// 提交事务
producer.commitTransaction();
}
第一阶段
TC 服务收到事务提交请求后,会先将提交信息先持久化到事务 topic 。持久化成功后,服务端就立即发送成功响应给 Producer。然后找到该事务涉及到的所有分区,为每个分区生成提交请求,存到队列里等待发送。此时事务消息状态为事务提交.
第二阶段
后台线程会不停的从队列里,拉取请求并且发送到分区。当一个分区收到事务结果消息后,会将结果保存到分区里,并且返回成功响应到 TC服务。当 TC 服务收到所有分区的成功响应后,会持久化一条事务完成的消息到事务 topic。至此,一个完整的事务流程就完成了。
区别于一般的二阶段提交,协调者需要收到所有参与者的响应后,才能判断此事务是否成功,最后才将结果返回给客户。
kafka的处理逻辑则为:如果 TC 服务在发送响应给 Producer 后,还没来及向分区发送请求就挂掉了。因为每次事务的信息都会持久化,所以 TC 服务挂掉重新启动后,会先从 事务 topic 加载事务信息,如果发现只有事务提交信息,却没有后来的事务完成信息,说明存在事务结果信息没有提交到分区。
这里的事务消息就是事务日志。
参考
Flink将两阶段提交协议中的通用逻辑抽象为了一个类——TwoPhaseCommitSinkFunction。
我们在实现端到端exactly-once的应用程序时,只需实现这个类的4个方法即可:
beginTransaction:开始事务时,会在目标文件系统上的临时目录中创建一个临时文件,之后将处理数据写入该文件。
preCommit:在预提交时,我们会刷新文件,关闭它并不再写入数据。我们还将为下一个Checkpoint的写操作启动一个新事务。
commit:在提交事务时,我们自动将预提交的文件移动到实际的目标目录。
abort:中止时,将临时文件删除。
第一阶段
Checkpoint的开始表示两阶段提交协议的"pre-commit"阶段,当触发Checkpoint时,Flink JobManager会向数据流注入一个barrier(它将数据流中的记录划分为进入当前Checkpoint的部分和进入下一个Checkpoint的部分)。Barrier会随着数据流在operator之间传递,对于每一个operator,都会触发它的状态后端来保存其状态数据。
预提交阶段在Checkpoint成功完成之后结束。在第一个阶段结束时,数据会被写入到外部存储。
第二阶段
当所有的实例做快照完成,并且都执行完 preCommit 时,会把快照完成的消息发送给 JobManager,JobManager(TC协调器)收到后会认为本次 Checkpoint 完成了,会向所有的实例发送 Checkpoint 完成的通知(Notify Checkpoint Completed),当 Sink 算子收到这个通知之后,就会执行 commit 方法正式提交。
这里的状态后端/外部存储对应的是事务日志。用于持久化日志信息。
Flink Checkpoint机制也是基于二阶段提交与事务日志来实现的。
可参考 <<Flink 内核原理与实现>>一书的第13章,见详细描述
参考