kafka事务流程

kafka · 浏览次数 : 0

小编点评

以下是Kafka事务中使用到的5个API的归纳总结: **1. 初始化事务** * **功能**:初始化事务,为后续操作做好准备。 * **API**:`void initTransactions()` **2. 开启事务** * **功能**:正式开始一个新的事务。 * **API**:`void beginTransaction() throws ProducerFencedException` **3. 在事务内提交已经消费的偏移量** * **功能**:将消费者已消费的偏移量提交到事务中。 * **API**:`void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException` **4. 提交事务** * **功能**:提交事务,确认事务中的所有操作都已成功执行。 * **API**:`void commitTransaction() throws ProducerFencedException` **5. 放弃事务** * **功能**:放弃事务,撤销事务中的所有操作。 * **API**:`void abortTransaction() throws ProducerFencedException` 以上是Kafka事务中使用到的5个API的简要总结。这些API共同确保了事务的原子性、一致性和持久性,从而保证了数据的一致性和可靠性。

正文

流程

image
kafka事务使用的5个API

// 1. 初始化事务
void initTransactions();
// 2. 开启事务
void beginTransaction() throws ProducerFencedException;
// 3. 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
// 4. 提交事务
void commitTransaction() throws ProducerFencedException;
// 5. 放弃事务(类似回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

(1)初始化事务
生产者带着自定义的transactional.id发起initTransactions(),事务协调器会分配一个Pid给生产者,同幂等性一样;

(2)开启事务
生产者调用beginTransaction()开启事务,事务协调器会记录事务状态并保存在事务日志中,事务日志是存储与内部topic __transaction_state里的,对消费者不可见;

(3)提交位移Offset
生产者发送OffsetCommitRequest 请求到消费组协调者,消费组协调者会把 offset 存储到 Kafka 内部主题 __consumer_offsets 中。

(4)提交事务or回滚事务
生产者在完成消息发送后,决定提交事务
调用commitTransaction()或者abortTransaction()
事务协调器会向每个分区Leader写入一个控制消息(abort或者commit),会先发送EndTransaction请求给事务协调器
TC会向内部主题写入prepare_commit或者prepare_abort消息,后会发送给主题分区Leader一个控制消息commit或者abort,告知消息是接受还是丢弃;
如果事务成功提交,事务的消息将对消费者可见,而中止的消息会不可见,消费者在read_committed隔离级别下对于中止消息是不可见的。




与kafka事务流程相似的内容:

kafka事务流程

流程 kafka事务使用的5个API // 1. 初始化事务 void initTransactions(); // 2. 开启事务 void beginTransaction() throws ProducerFencedException; // 3. 在事务内提交已经消费的偏移量(主要用于消费

从kafka与Flink的事务原理来看二阶段提交与事务日志的结合使用

两阶段提交的成立要基于以下假设: - 该分布式系统中,存在一个节点作为协调者,其他节点作为参与者,且节点之间可以进行网络通信。 - 所有节点都采用预写式日志,且日志被写入后即被保存在可靠的存储设备上,即使节点损坏也不会导致日志数据的丢失。 - 所有节点不会永久性损坏,即使损坏后也可以恢复。 ###

【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤

问题描述 Azure Event Hubs -- Kafka 生产者发送消息存在延迟接收和丢失问题, 在客户端的日志中发现如下异常: 2023-06-05 02:00:20.467 [kafka-producer-thread | producer-1] ERROR com.deloitte.com

【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能

问题描述 Azure Event Hub支持 kafka,所以为了测试消息生产者所在环境与Azure Event Hub之间发送消息的性能如何,特别使用 kafka 官方测试生产者,消费者的性能工具 : kafka-producer-perf-test.bat kafka-consumer-perf

【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls

问题描述 参考Github上 Event Hub的示例代码(Using Apache Flink with Event Hubs for Apache Kafka Ecosystems : https://github.com/Azure/azure-event-hubs-for-kafka/tre

如何借助Kafka持久化存储K8S事件数据?

大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。 ``` $ kubectl get events 15m Warning FailedCreate replicaset/ml-pipe

【Azure事件中心】使用Python SDK(Confluent)相关方法获取offset或lag时提示SSL相关错误

问题描述 使用Python SDK(Confluent)相关方法获取offset或lag时, 提示SSL相关错误, 是否有更清晰的实例以便参考呢? 问题解决 执行代码,因为一直连接不成功,所以检查 confluent_kafka 的连接配置,最后定位是 sasl.password 值设置有误。此处,

大数据 - DWD&DIM 业务数据

业务数据的变化,我们可以通过 FlinkCDC 采集到,但是 FlinkCDC 是把全部数据统一写入一个 Topic 中, 这些数据包括事实数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从 Kafka 的业务数据 ODS 层读取数据,经过处理后,将维度数据保存到 HBase,将事

剖析 Kafka 消息丢失的原因

Kafka消息丢失的原因通常涉及多个方面,包括生产者、消费者和Kafka服务端(Broker)的配置和行为。下面将围绕这三个关键点,详细探讨Kafka消息丢失的常见原因,并提供相应的解决方案和最佳实践。总的来说,Kafka消息丢失是一个涉及多个环节的问题,需要从生产者、Broker和消费者三个层面综...

Kafka多维度调优

优化金字塔 应用程序层面 框架层面(Broker层面) JVM层面 操作系统层面 应用程序层面:应当优化业务代码合理使用kafka,合理规划主题,合理规划分区,合理设计数据结构; 框架层面:在不改动源码的情况下,从kafka参数配置入手,结合业务体量和运行数据进行调优 JVM层面:在出现明显缓慢和可