聊聊Kafka的生产者消费者确认机制

聊聊,kafka,生产者,消费者,确认,机制 · 浏览次数 : 677

小编点评

**生产者确认机制消息** 生产者确认机制消息从生产者客户端发送至broker服务端topic,需要ack确认。acks与min.insync.replicas是两个配置参数,对于生产者不丢失数据起到了很大的作用ISRIn-sync replica(ISR)称之为同步副本,ISR中的副本都是与Leader进行同步的副本,所以不在该列表的follower会被认为与Leader是不同步的。 **acks参数** acks参数指定了必须有多少个分区副本收到消息,生产者才认为该消息是写入成功的。acks=0,表示生产者在成功写入消息之前不会等待任何来自服务器的响应。 **min.insync.replicas参数** min.insync.replicas参数指定了生产者需要同步副本的最小副本数量。如果配置的replicas值设置为0,生产者就不会同步副本。 **ISR** ISR (in-sync replica) 是一个同步副本的列表,它包含与 Leader 进行同步的副本的 IP 地址和端口号。ISR 是一个动态列表,根据副本与 Leader 同步的状态动态增删。 **生产者确认机制的工作机制** 1. 生产者发送消息到 topic。 2. 客户端发送acks确认消息给 broker。 3. Broker 记录acks 信息并将其保存在 topic 中。 4. 消费者从 topic 中读取acks 信息,并从其中获取副本列表。 5. 消费者从副本列表中选择一个副本,并从其获取该副本的IP地址和端口号。 6. 消费者连接到指定的 broker,并将副本地址和端口号传递给 Leader。 7. Leader 从副本中获取消息,并将消息写入其自己的副本。 8. Leader 发送ack 消息给生产者,表示消息已写入成功。 9. 生产者收到ack 消息后,认为消息已写入成功。

正文

生产者确认机制

消息从生产者客户端发送至broker服务端topic,需要ack确认。acksmin.insync.replicas是两个配置参数.其中acks是producer的配置参数,min.insync.replicas是Broker端的配置参数,这两个参数对于生产者不丢失数据起到了很大的作用

ISR

In-sync replica(ISR)称之为同步副本,ISR中的副本都是与Leader进行同步的副本,所以不在该列表的follower会被认为与Leader是不同步的。该同步副本的列表是一个动态的,根据副本与leader同步的情况动态增删。

acks确认机制

acks参数指定了必须要有多少个分区副本收到消息,生产者才认为该消息是写入成功的。

  • acks=0,表示生产者在成功写入消息之前不会等待任何来自服务器的响应. 换句话说,一旦出现了问题导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了.

  • acks=1,表示只要集群的leader分区副本接收到了消息,就会向生产者发送一个成功响应的ack,此时生产者接收到ack之后就可以认为该消息是写入成功的. 一旦消息无法写入leader分区副本(比如网络原因、leader节点崩溃),生产者会收到一个错误响应。

  • acks =all,表示只有所有参与复制的节点(ISR列表的副本)全部收到消息时,生产者才会接收到来自服务器的响应. 这种模式是最高级别的,也是最安全的,可以确保不止一个Broker接收到了消息. 该模式的延迟会很高.

对于消息的发送,支持同步阻塞、异步回调两种方式,一般建议是使用后者,提高应用的吞吐量。

消费者确认机制

在Kafka中,消费者确认是通过消费者位移的提交实现的。类似RabbitMQ的ACK机制。

消费者位移

每个 consumer 实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条消息。这在 Kafka 中有一个特有的术语:位移(offset)。

相比较将offset保存在服务器端(broker),这样虽然简单,但是有如下的问题:

  1. broker变成了有状态的,增加了同步成本,影响伸缩性。

  2. 需要引入应答机制来确定消费成功。

  3. 由于需要保存众多consumer的offset,可能需要引入复杂的数据结构,对资源有一定的浪费。

在Kafka中,消费者组(Consumer Group)负责管理分发消费消息,因此将offset保存在消费者组中是比较合适的选择。其数据格式只需要是特定格式的整形数据即可。

offset 对于 consumer 非常重要,因为它是实现消息交付语义保证(message delivery semantic)的基石。

消息交付语义即最多一次、最少一次、精确一次。

位移提交

consumer客户端需要定期地向Kafka集群汇报自己消费数据的进度,这一过程被称为位移提交(offset commit)。位移提交这件事情对于 consumer 而言非常重要,它不仅表征了consumer 端的消费进度,同时也直接决定了 consumer 端的消费语义保证。

新版的Kafka由topic管理提交的位移,该topic是__consumer_offsets。默认是有50个分区,编号从0到49。

每个位移提交请求都会往__consumer_offsets 对应分区上追加写入一条消息。消息的 key 是group.id、topic和分区的元组,而 value就是位移值。

提交方式

默认情况下,consumer是自动提交位移的,自动提交间隔是5秒。这就是说若不做特定的设置,consumer程序在后台自动提交位移。通过设置auto.commit.interval.ms参数可以控制自动提交的间隔。

手动位移提交就是用户自行确定消息何时被真正处理完并可以提交位移。在一个典型的 consumer 应用场景中,用户需要对 poll 方法返回的消息集合中的消息执行业务级的处理。用户想要确保只有消息被真正处理完成后再提交位移。如果使用自动位移提交则无法保证这种时序性,因此在这种情况下必须使用手动提交位移。设置使用手动提交位移非常简单,仅仅需要在构建 KafkaConsumer 时设置enable.auto.commit=false,然后调用 commitSync 或commitAsync方法即可。

两者的区别与优劣如下:

image

参考

书籍:<<Apache Kafka实战>>

与聊聊Kafka的生产者消费者确认机制相似的内容:

聊聊Kafka的生产者消费者确认机制

## 生产者确认机制 消息从生产者客户端发送至broker服务端topic,需要ack确认。`acks`与`min.insync.replicas`是两个配置参数.其中`acks`是producer的配置参数,`min.insync.replicas`是Broker端的配置参数,这两个参数对于生产者

说说RabbitMQ延迟队列实现原理?

使用 RabbitMQ 和 RocketMQ 的人是幸运的,因为这两个 MQ 自身提供了延迟队列的实现,不像用 Kafka 的同学那么苦逼,还要自己实现延迟队列。当然,这都是题外话,今天咱们重点来聊聊 RabbitMQ 延迟队列的实现原理,以及 RabbitMQ 实现延迟队列的优缺点有哪些? 很多人

聊聊我认为的分布式、集群实现关键点

基于常见的中间件(Mysql、ElasticSearch、Zookeeper、Kafka、Redis)等分布式集群设计的机制,自己总结了在在集群设计过程中需要考虑的通用问题。 ### 节点通信机制 主节点的增加、删除、通信机制。 ### 路由算法 即数据路由到哪个节点的策略机制。在集群内有多个节点,

聊聊GLM-4-9B开源模型的微调loss计算

概述 Github官方地址:GLM-4 网上已经有很多关于微调的文章,介绍各种方式下的使用,这里不会赘述。我个人比较关心的是微调时的loss计算逻辑,这点在很多的文章都不会有相关的描述,因为大多数人都是关心如何使用之类的应用层,而不是其具体的底层逻辑,当然咱也说不清太底层的计算。 可了解其它loss

聊聊一个差点被放弃的项目以及近期的开源计划

前言 自从 StarBlog 和 SiteDirectory 之后,我还没写新的关于开源项目的系列,最近又积累了很多想法,正好写一篇博客来总结一下。 关于差点被放弃的项目,就是最近一直在做的单点认证(IdentityServerLite) IdentityServerLite 开发这个项目的起因,是

聊聊 JSON Web Token (JWT) 和 jwcrypto 的使用

哈喽大家好,我是咸鱼。 最近写的一个 Python 项目用到了 jwcrypto 这个库,这个库是专门用来处理 JWT 的,JWT 全称是 JSON Web Token ,JSON 格式的 Token。 今天就来简单入门一下 JWT。 官方介绍:https://jwt.io/introduction

聊聊MySQL是如何处理排序的

在MySQL的查询中常常会用到 order by 和 group by 这两个关键字,它们的相同点是都会对字段进行排序,那查询语句中的排序是如何实现的呢?

聊聊 Linux iowait

哈喽大家好,我是咸鱼。 我们在使用 top 命令来查看 Linux 系统整体 CPU 使用情况的时候,往往看的是下面这一列: %Cpu(s): 0.0 us, 0.0 sy, 0.0 ni,100.0 id, 68.0 wa, 0.0 hi, 0.0 si, 0.0 st 其中,man 手册解释 w

聊聊Mybatis框架原理

好久没有写博客了。最近工作中封装了一个类似ORM框架的东西。大概的原理就是将Excel数据初始化到本地sqlite数据库后,通过json配置文件,对数据库的数据做增删改查等操作。 其实大概的思考了下,就是半ORM框架mybatis的逻辑,只是我们自己封装的简陋蛮多。想想有现成的轮子没用,反而是自己写

聊聊Spring的工厂方法与FactoryBean

概述 工厂方法是比较常见,常用的一种设计模式。FactoryBean是Spring提供的一种Bean注入IOC容器的方式。 工厂方法 在做日常开发时,一般都会避免直接new对象,而且将new的操作丢给IOC容器,但对于第三方系统的集成,我们不太好直接丢给IOC容器,此时可以通过工厂模式, 提供一个工