[转帖]Kafka 性能优化与问题深究

kafka,性能,优化,问题,深究 · 浏览次数 : 0

小编点评

**2.4 Kafka 启动失败分析** **2.4.1 Fatal error during KafkaServerStartable startup** * 错误日志意思为:Kafka-logs2日志文件在另一个进程中已被占用 * 解决方法: ps -ef|grep kafka,杀掉使用该目录的进程即可 **2.4.2 Message size over limit** * Exception in thread \"main\" org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {test-peng-0=51285860} whose size is larger than the fetch size 1048576 and hence cannot be ever returned * 解决方法: 修改脚本kafka-server-start.sh中的最小启动内存,设置为较小值 **2.4.3 Kafka启动时内存异常** * java.lang.OutOfMemoryError: Java heap space * 解决方法:修改脚本kafka-server-start.sh中的最小启动内存,设置为较大值 **2.4.3.2.启动时出现oom** * java.lang.OutOfMemoryError: Java heap space * 解决方法:同样脚本kafka-server-start.sh中的最小启动内存,设置为较大的值 **2.4.3.3.kafka 启动时报java.io.IOException: No space left on device** * 发现Kafka所在磁盘已满,清理磁盘 Linux 查看磁盘空间: df df 以磁盘分区为单位查看文件系统,可以获取硬盘被占用了多少空间,目前还剩下多少空间等信息 * 解决方法: 清理数据,释放空间: linux删除某个文件下的所有文件: 进入这个文件夹 然后用命令 rm -rf * * **2.4.3.3. Kafka 启动时报java.io.IOException: No space left on device** * 发现Kafka所在磁盘已满,清理磁盘 Linux 查看磁盘空间: df df 以磁盘分区为单位查看文件系统,可以获取硬盘被占用了多少空间,目前还剩余多少空间等信息

正文

Kafka 性能优化与问题深究

一.Kafka深入探究

1.1 kafka整体介绍

1. 1.1 Kafka 如何做到高吞吐、低延迟的呢?

Kafka是一个分布式高吞吐量的消息系统,这里提下 Kafka 写数据的大致方式:先写操作系统的页缓存(Page Cache),然后由操作系统自行决定何时刷到磁盘。

因此 Kafka 达到高吞吐、低延迟的原因主要有以下 4 点:

  • 页缓存是在内存中分配的,所以消息写入的速度很快。
  • Kafka 不必和底层的文件系统进行交互,所有繁琐的 I/O 操作都由操作系统来处理。
  • Kafka 采用追加写的方式,避免了磁盘随机写操作。
  • 使用以 sendfile 为代表的零拷贝技术提高了读取数据的效率。

PS: 使用页缓存而非堆内存还有一个好处

Linux 总会把系统中还没被应用使用的内存挪来给 Page Cache,在命令行输入free,或者 cat /proc/meminfo ,“Cached”的部分就是 Page Cache。Page Cache 中每个文件是一棵 Radix 树(又称 PAT 位树, 一种多叉搜索树),节点由 4k 大小的 Page 组成,可以通过文件的偏移量(如 0x1110001)快速定位到某个Page。

当写操作发生时,它只是将数据写入 Page Cache 中,并将该页置上 dirty 标志。

当读操作发生时,它会首先在 Page Cache 中查找,如果有就直接返回,没有的话就会从磁盘读取文件写入 Page Cache 再读取。

可见,只要生产者与消费者的速度相差不大,消费者会直接读取之前生产者写入Page Cache的数据,大家在内存里完成接力,根本没有磁盘访问

而比起在内存中维护一份消息数据的传统做法,这既不会重复浪费一倍的内存,Page Cache 又不需要 GC (可以放心使用60G内存了),而且即使 Kafka 重启了,Page Cache 还依然在。

 

就是当 Kafka broker 的进程崩溃时,堆内存的数据会丢失,但是页缓存的数据依然存在,重启 Kafka broker 后可以继续提供服务。

  1. 2. kafka和传统企业消息系统相比有哪些独特之处?

Kafka作为一个消息系统kafka的consumer group概念引申出两个概念。作为队列时,consumer group支持多个进程的集合同时分担数据处理。作为发布-订阅模式时,kafka支持发布消息到多个consumer groups。

kafka模式的优点在于每个topic都有这两个特征-既可以扩展处理能力又可以支持多个订阅者---不需要选择这个却不得不舍弃那个。

kafka要比传统消息系统有更强的次序保证。通过并行处理机制-partition-在topic内部,kafka可以提供次序保证以及consumer 进程组之间负载均衡。这是通过分配topic的每个partition给consumer group中某个consumer,这样可以保证一个partition只会被一个consumer消费。这样就保证了consumer是partition唯一的消费者,即可以获得有序的消息。由于有多个partitions存在,也就需要多个consumer实例来保证负载均衡。注意,同一个consumer group中consumers实例的个数不能多于partitions的数目

1.1.3.Kafka整体架构

 

 Fig1.Kafka整体架构图

 

Fig2. kafka Cluster 图

一个典型的 Kafka Cluster(集群)中包含:

  • 若干 Producer(消息生产者): 将 record(记录,相当于消息) Publish(发布,Push方式) 至 Broker,Producer 可以是前端页面、服务器日志、系统CPU、内存等;
  • 若干 Broker(用来存储消息的主服务器): 支持水平扩展(数量越多,集群吞吐越好),消息的存储是按 Topic(主题,消息的分类)+Partition(主题分区) 划分; 特定的 Topic/Partition 内各消息的 offset(偏移) 与消息的时间戳一起保存,当消息存储至过期时间(服务器中可配置)后,将自动删除以释放空间(无论是否已被消费);
  • 若干 Consumer(消息的消费者): Subscribe(订阅) Topic 并从某个 Partition 中拉取消息(Pull); 每个主题针对每个消费者都保存了其当前消费位置(offset,该值可人为移动),下次消费时会从该位置拉取,然后 offset 向后移动; 每个 Consumer 属于一个特定的 Consumer Group(可明确指定,也可不指定默认为 group);
  • 一个 Zookeeper 集群: 

1.管理 broker 与 consumer 的动态加入与离开。(Producer 不需要管理,随便一台计算机都可以作为Producer 向 Kakfa Broker 发消息)

2.触发负载均衡,当 broker 或 consumer 加入或离开时会触发负载均衡算法,使得一个 consumer group 内的多个 consumer 的消费负载平衡。

3.维护消费关系及每个 partition 的消费信息。

1.2.Producer 端配置优化

1.2.1 producer 工作流程

 

 

Fig3. Producer发送消息流程

Kafka的客户端发送数据到服务器,一般都是要经过缓冲的,也就是说,你通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的。

Producer的默认配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("buffer.memory", 33554432); //缓存区内存32M
props.put("batch.size", 16384); //批量发送消息的大小为16KB
props.put("linger.ms", 0); //batch发送时间
props.put("max.request.size", 1048576); //最大的请求大小为1M
props.put("acks", "1"); //持久化机制
props.put("retries", 0); //重试机制
props.put("max.block.ms", 6000);//阻塞时长为60s

 

重要参数解释:

  1. "batch.size"决定了你的每个Batch要存放多少数据就可以发送出去了理论上来说,提升batch的大小,可以允许更多的数据缓冲在里面,那么一次Request发送出去的数据量就更多了,这样吞吐量可能会有所提升

避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况。

 

2."linger.ms"避免一个Batch迟迟凑不满,导致消息一直积压在内存里发送不出去的情况比如正常情况下13ms凑够一个batch,那么设置就可以为20ms哪怕遇到低峰时期,13ms凑不满一个Batch,还是会在20ms之后强制Batch发送出去。

 

  1. 3.“buffer.memory”的本质就是用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值是32MB如果要是内存设置的太小,可能导致一个问题:消息快速的写入内存缓冲里面,但是Sender线程来不及把Request发送到Kafka服务器。一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了。将会出现producer无法产生发送数据,停止工作,并在客户端显示每60S请求一次。(props.put("max.block.ms", 6000);
    “max.request.size”这个参数决定了每次发送给Kafka服务器请求的最大大小,同时也会限制你一条消息的最大大小也不能超过这个参数设置的值,所以根据我们的实际生产情况设置为1M。
  2. "retries"设置重试次数可以在发送失败时进行重试,提高发送的可靠性,可改为1.

 

  1. "acks"参数决定了发送出去的消息要采用什么样的持久化策略

acks = 0 : KafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,直接就认为这个消息发送成功了。

acks = all 或 -1: Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。必须跟ISR列表里至少有2个以上的副本配合使用)。

acks = 1: 写入本地日志即可,Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。是上述二者的折衷方案,也是默认值。

1.2.2 如何保证宕机的时候数据不丢失?

Kafka的高可用架构:多副本冗余几乎是现在任何一个优秀的分布式系统都一般要具备的功能。

Broker是整个Kafka集群的核心引擎,负责消息的存储转发,并对外提供服务。我们可以看到,Kafka集群可以非常简单的通过增删Broker,实现整个集群的扩缩容。Kafka对外提供服务的基本单位是Topic,那么实现Topic级别的平行扩展能力,也就实现了应用级的平行扩展能力。Kafka的Broker集群中,每台机器上都存储了一些Partition,也就存放了Topic的一部分数据,每个Partition都有多个副本,其中一个副本叫做leader,其他的副本叫做follower这样就实现了Topic的数据分布式存储在一个Broker集群上。

   

 


如上图所示,假设一个Topic拆分为了3个Partition,分别是Partition0,Partiton1,Partition2,此时每个Partition都有2个副本。这样的多副本冗余机制,可以保证任何一台机器挂掉,都不会导致数据彻底丢失,因为起码还是有副本在别的机器上的。

 

 

 

1.2.3多副本之间数据如何同步?

任何一个Partition,只有Leader是对外提供读写服务的

也就是说,如果有一个客户端往一个Partition写入数据,此时一般就是写入这个Partition的Leader副本。

然后Leader副本接收到数据之后,Follower副本会不停的给他发送请求尝试去拉取最新的数据,拉取到自己本地后,写入磁盘中。如下图所示:


 

1.2.4如何判断Follower 与Leader是否保持同步状态?

如果说某个Follower所在的Broker因为JVM FullGC之类的问题,导致自己卡顿了,无法及时从Leader拉取同步数据,那么是不是会导致Follower的数据比Leader要落后很多?

 

ISR全称是“In-Sync Replicas”,也就是保持同步的副本,他的含义就是,跟Leader始终保持同步的Follower有哪些。只要Follower一直及时从Leader同步数据,就可以保证他们是处于同步的关系的。

所以每个Partition都有一个ISR,这个ISR里一定会有Leader自己,因为Leader肯定数据是最新的,然后就是那些跟Leader保持同步的Follower,也会在ISR里。

1.2.5 Producer端 模拟实测

根据对实际生产环境进行过车数据的测试,模拟producer端以1000nMesg/s的数据量进行生产发送,其中一条过程数据的所占内存空间大小大概为250Byte。、

VehiclePass{deviceIndexCode='1', ip='10.194.224.111', port=0, crossingIndexCode='4', crossingId='0', directionIndex=1, laneNo=8, reciveTime='20190827183327', passTime='20190827183327', city='上海', quantity=1234567, ttlNum=9999},过车数据占用内存大小:230Byte
综上计算:

当producer创建20个topic时,则1s生产1000*20*250/1024=5M,batch.size =16K时,则一个batch中包含65条数据,1s要发送305个batch。当出现网络延迟或者producer端。

所以要适当的提高batch.size的大小,可以适当的调整为32K,,这样相当于一个batch包含130条数据,每13ms发送一次batch;同时buffer.memory 相应调整为64M。

分析结论

  1.   buffer.memory 调的过小对性能会影响很大,因为内存不足就会导致发送被blockblock.on.buffer.fullTRUE
  2. 设置过大的buffer.memory对性能提高帮助不是很大,只要够用就好。 可以根据batch size比默认值提高多少倍,而相应提高buffer.memory多少倍即可。

 

1.3.Broker端的配置优化

  1. 3.1   参数描述

参数

默认值

推荐值

说明

num.network.threads

3

默认值

server用来处理网络请求的网络线程数目;一般你不需要更改这个属性。

num.io.threads

8

默认值

server用来处理请求的I/O线程的数目;这个线程数目至少要等于硬盘的个数。

queued.max.requests

500

默认值

在网络线程停止读取新请求之前,可以排队等待I/O线程处理的最大请求个数

  1. 3.2  原理分析

 

Fig.4. Broker 网络流程

其中num.network.threads是控制上图中的Processor Thread的个数, num.io.threads是控制API Thread的个数,queued.max_requests是控制Request Channel队列的容量。

  1. 调大num.network.threads能够增加处理网络io请求,但不读写具体数据,基本没有io等待。但如果过大,会带来线程切换的开销。
  2. 增大queued.max.requests能够缓存更多的请求,以撑过业务峰值。如果过大,会造成内存的浪费。
  3. 增大num.io.threads能提升线程处理能力,如果过大会代理线程切换开销影响处理能力。同时至少要等于硬盘的个数。

1.3.3 实测分析

num.network.threadsnum.io.threadsqueued.max.requests 这三个参数是kafka网络模型的相关参数,所以这里一起测试。 模拟一定的压力,使得API threads线程处理不过来,request channel队列阻塞,性能开始下降。此时增大queue.max.requests或者增加API threads,查看性能情况 

1G网卡下,10分区,2k数据,发送1000000, ack都为1, 3Producer3Consumer一起跑, Consumer每次从队列开始读取数据。

 

 

 

 

 

 

测试数据如下:

num.network.threads

num.io.threads

queued.max.requests

Producer

nMsg.sec

Consumer

nMsg.sec

3

8

1

48768.5443

48985.088

3

8

250

50942.3841

52804.523

3

8

500

51303.0474

55146.401

3

8

1000

51956.0971

54461.271

3

1

1

50699.6045

48399.7216

 

queued.max.requests值由1增加到1000,可以看出通过调大queue个数,性能可以稍稍提高6%左右。

 

修改num.io.threads个数,性能影响不大。

 

10G网卡 下,100分区,2k数据,发送1000000, ack都为1, 10Producer10Consumer一起跑, Consumer每次从队列开始读取数据,Producer BatchSize 512M    

num.network.threads

num.io.threads

queued.max.requests

Producer

nMsg.sec

Consumer

nMsg.sec

3

8

1

203500.2035

139385.4383

3

8

1000

228206.2985

139567.3412

50

8

1

187441.4246

137869.9555

50

8

1000

229042.6019

151199.0081

queued.max.requests的极端情况分别为11000,在不同的压力下, Producer性能能提升10% 如果把num.network.threads调大,这两者的差距就更大达到22%

从数据得到的结论:

1.从上面的测试来看,3个线程的压力下,就算queued.max.requests设置为1broker也能很快处理,不会造成性能剧烈下降。

2.10G 网卡下, queued.max.requests设置为1 与设置为1000比较,能提升22%        

3.按照目前的压力来看,用默认值就可以满足业务要求,发现性能瓶颈可以调大这三个参数就可以。

queued.max.requests的极端情况分别为11000,在不同的压力下, Producer性能能提升10% 如果把num.network.threads调大,这两者的差距就更大达到22%

从数据得到的结论:

1.从上面的测试来看,3个线程的压力下,就算queued.max.requests设置为1broker也能很快处理,不会造成性能剧烈下降。

2.10G 网卡下, queued.max.requests设置为1 与设置为1000比较,能提升22%        

3.按照目前的压力来看,用默认值就可以满足业务要求,发现性能瓶颈可以调大这三个参数就可以。

1.4 Consumer 端实现原理

1.4.1 Consumer Java实现

public static void main(String[] args){

        String topicName = "test-topic";

        String groupId = "test-group";

  

        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092"); //必须指定

        props.put("group.id", groupId);

        props.put("enable.auto.commit", "true");//自动提交offset

        props.put("auto.commit.interval.ms", "1000");//自动提交时间间隔

        props.put("auto.offset.reset", "earliest");//重新读取offset位置

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topicName));

        try {

            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(1000);//轮询1000次

                for(ConsumerRecord<String, String> record : records) {

                    System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),record.value());

                }

            }

        } finally {

            consumer.close();

        }

    }

 

消费者 & 消费者组

 

kafka中有一个概念叫做consumer group,每个group 去订阅对应的topic,topic的每条消息只能发送到订阅它的消费者组的其中一个实例上,并且每个消费者至多使用一个消费者组来标示自己。

 

Fig.5. 单Comsumer Group多consumer

而当某个topic 被多个消费者组订阅,而每个组仅有一个消费者时,每条消息就会被广播到每个消费者上。

 

Fig.6. 多Comsumer Group

这里需要注意下,还有个叫做独立消费者(standalone consumer)的概念,对于consumer group 是以group 为单位进行消息消费的,而standalone 会单独的执行消费,以consumer 实例为单位进行消费的。

1.4.2 group 状态机 & group管理协议

coordinator 实现组的管理,依赖的主要是consumer group的状态,仅有 Empty(组内没有任何active consumer)、PreparingRebalance(group 正在准备进行rebalance)、AwaitingSync(所有组员已经加入组并等待leader consumer发送分区的分配方案)、Stable(group开始正常消费)、Dead(该group 已经被废弃)这五个状态,那他们是如何轮转的可以简单的看一下状态机.

 

 

Fig.7. group 状态机

就整个过程来说,可以大致分为加入组阶段状态同步阶段。

加入组阶段:当明确group的coordinator之后,组内成员需要显式的发送JoinGroup请求(主要包括 订阅信息、成员id等元数据信息)给对应的coordinator,然后coordinator选择对应的consumer 作为leader,然后再给其他成员产生响应(一个空数组)。

1.4.3 offset & broker 中的offset

在Kafka 里面存在两个offset的概念,一个指的是consumer 中的offset,一个是broker中的offset.

concumer offset 用来记录当前消费了多少条消息,这个offset的状态是由consumer group来维护的,通过检查点机制对于offset的值进行持久化(内部就是一个map)

broker offset 消息在broker 端的位移值,根据之前说过的几个概念可以大致的理解为一个<topic,partition,offset>可以唯一的标示到一条消息。

1.4.4 _consumer_offset topic & zookeeper 位移管理

因为新版本和旧版本Kafka 所采用的offset保存策略是不同的,旧版本中主要依赖于Zookeeper,kafka 在数量很大的消费发生时,zookeeper读写会异常的频繁,导致很容易成为整个Kafka系统的瓶颈。所以新版本(0.9版本之后)对这种方式作出了重大更新,不再依赖于Zookeeper 来进行状态的保存,而是在broker 端直接开一个内部使用的topic,也就是_consumer_offsets topic,并且kafka 为了兼容老版本的consumer 还提供了 offsets.storage=kafka这样一个适配参数。

1.4.5 Rebalance & 场景剖析

最后要说的一点就是consumer 端的Rebalance 过程(rebalance是针对consumer group来说的,如果是standalone consumer 则没有这个概念),rebalance也就是如何达成一致来分配订阅topic的所有分区。这个rebalance的代价还是不小的,我们是需要避免高频的rebalance的。

常见的rebalance 场景有:新成员加入组、组内成员崩溃(这种场景无法主动通知,需要被动的检测才行,并且需要一个session.timeout 才检测到)、成员主动离组。

consumer 是可以执行任意次rebalance的,为了区分两次rebalance上的数据(防止无效或者延迟的offset提交),consumer 设计了一个叫做rebalance generation的标示。

对应常见的rebalance请求有:

JoinGroup:consumer 请求加入组

SyncGroup:group leader把分配方案同步给组内所有成员

Heartbeat:consumer 定期向coordination汇报心跳表示自己还存活

LeaveGroup:consumer 主动通知coordinator该consumer即将离组

DescribeGroup:查看组的所有信息。

  • Kafka 常见问题分析与解决方案

2.1 丢失数据的场景

2.1.1 producer 端

场景:I/O 线程发送消息之前,producer 崩溃, 则 producer 的内存缓冲区的数据将丢失。

解决方案:

  1. 同步发送,性能差,不推荐。
  2. 仍然异步发送,通过“无消息丢失配置”(来自胡夕的《Apache Kafka 实战》)极大降低丢失的可能性:
  1. block.on.buffer.full = true 尽管该参数在0.9.0.0已经被标记为“deprecated”,但鉴于它的含义非常直观,所以这里还是显式设置它为true,使得producer将一直等待缓冲区直至其变为可用。否则如果producer生产速度过快耗尽了缓冲区,producer将抛出异常
  2. acks=all 所有follower都响应了才认为消息提交成功,即"committed"
  3. retries = MAX 无限重试,直到你意识到出现了问题:)
  4. max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序
  5. 使用KafkaProducer.send(record, callback)而不是send(record)方法 自定义回调逻辑处理消息发送失败
  6. callback逻辑中最好显式关闭producer:close(0) 注意:设置此参数是为了避免消息乱序
  7. unclean.leader.election.enable=false 关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失
  8. replication.factor >= 3 这
  9. min.insync.replicas > 1 消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用
  10. 保证replication.factor > min.insync.replicas 如果两者相等,当一个副本挂掉了分区也就没法正常工作了。通常设置replication.factor = min.insync.replicas + 1即可

2.1.2 Consumer 端

consumer 端:不是严格意义的丢失,其实只是漏消费了。
场景:设置了 auto.commit.enable=true ,当 consumer fetch 了一些数据但还没有完全处理掉的时候,刚好到 commit interval 触发了提交 offset 操作,接着 consumer 挂掉。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。

解决方案:同步和异步组合提交

enable.auto.commit=false 关闭自动提交位移,在消息被完整处理之后再手动提交位移

        //设置手动提交消息偏移

 properties.put("enable.auto.commit","false");

  //一次拉取的最大消息条数

  properties.put("max.poll.records",100);consumer.subscribe(Collections.singletonList("Demo3"));

int count = 0;

        try {

            while (true){

                ConsumerRecords<String,String> records = consumer.poll(10);

                for(ConsumerRecord<String ,String> record : records){

                    count ++;

                    System.out.println(record.topic() + "," + record.partition() + "," + record.offset() + "," + record.key() + "," + record.value());

                }

                consumer.commitAsync();//只管发送提交请求无需等待broker返回

                          }

        } finally {

            try {

                consumer.commitSync();//由poll()方法返回的最新偏移量,提交成功后马上返回,否则跑出异常。每处理一次消息提交一次offset。

            } finally {

                consumer.close();

            }

            //consumer.close();

        }

对于offset的commit,Kafka Consumer Java Client支持两种模式:由KafkaConsumer自动提交,或者是用户通过调用commitSync、commitAsync方法的方式完成offset的提交。

在多partition多consumer的场景下自动提交总会发生一些不可控的情况。所以消费者API也为我们提供了另外一种提交偏移量的方式。开发者可以在程序中自己决定何时提交,而不是基于时间间隔。

1. 自动提交

最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是5s。消费者每次获取新数据时都会先把上一次poll()方法返回的最大偏移量提交上去。

可能造成的问题:数据重复读

假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s,所以在这 3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。

2. 手动提交

(1) 同步提交

// 把auto.commit.offset设为false,让应用程序决定何时提交偏移量

props.put("auto.commit.offset", false);

try{

    while(true) {

        ConsumerRecords<String, String> records = consumer.poll(1000);

        for(ConsumerRecord<String, String> record : records) {

            // 假设把记录内容打印出来就算处理完毕

            System.out.println("value = " + record.value() + ", topic = " + record.topic() +

                    ", partition = " + record.partition() + ", offset = " + record.offset());

        }

        

        try{

            // 只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功

            // 如果提交失败,我们也只能把异常记录到错误日志里

            consumer.commitSync();

        }catch(CommitFailedException e) {

            System.err.println("commit  failed!" + e.getMessage());

        }

    }}finally {

    consumer.close();}

(2) 异步提交

手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。

这个时候可以使用异步提交,只管发送提交请求,无需等待 broker 的响应。

// 把auto.commit.offset设为false,让应用程序决定何时提交偏移量

props.put("auto.commit.offset", false);

try{

    while(true) {

        ConsumerRecords<String, String> records = consumer.poll(1000);

        for(ConsumerRecord<String, String> record : records) {

            System.out.println("value = " + record.value() + ", topic = " + record.topic() +

                    ", partition = " + record.partition() + ", offset = " + record.offset());

        }

        // 提交最后一个偏移量,然后继续做其他事情。

        consumer.commitAsync();

    }}finally {

    consumer.close();}

在成功提交或碰到无法恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会,这也是commitAsync()不好的一个地方。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。

假设我们发出一个请求用于提交偏移量2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量3000。如果commitAsync()重新尝试提交偏移量2000,它有可能在偏移量3000之后提交成功。这个时候如果发生再均衡,就会出现重复消息。

commitAsync()也支持回调,在broker作出响应时会执行回调:

// 把auto.commit.offset设为false,让应用程序决定何时提交偏移量

props.put("auto.commit.offset", false);

try {

    while (true) {

        ConsumerRecords<String, String> records = consumer.poll(1000);

        for (ConsumerRecord<String, String> record : records) {

            System.out.println("value = " + record.value() + ", topic = " + record.topic() + ", partition = "

                    + record.partition() + ", offset = " + record.offset());

        }

        consumer.commitAsync(new OffsetCommitCallback() {

            @Override

            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {

                if(offsets != null) {

                    System.out.println("commit offset successful!");

                }

                if(exception != null) {

                    System.out.println("commit offset fail!" + exception.getMessage());

                }

            }

        });

    }} finally {

    consumer.close();}

可以在回调中重试失败的提交,以下为思路: 使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试。如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。

(3) 同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。

try {

    while (true) {

        ConsumerRecords<String, String> records = consumer.poll(1000);

        for (ConsumerRecord<String, String> record : records) {

            System.out.println("value = " + record.value() + ", topic = " + record.topic() + ", partition = "

                    + record.partition() + ", offset = " + record.offset());

        }

        // 如果一切正常,我们使用 commitAsync() 方法来提交

        // 这样速度更快,而且即使这次提交失败,下一次提交很可能会成功

        consumer.commitAsync();

    }}catch (Exception e) {

    e.printStackTrace();}finally {

    try {

        // 使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误

        // 确保关闭消费者之前成功提交了偏移量

        consumer.commitSync();

    }finally {

        consumer.close();

    }}

2.1.3 Broker端

broker failover机制

Follower 还没有来得及同步数据,Leader 挂了,然后选举某个 Follower 成 Leader 之后,这就会丢了一些数据。

解决方法:

1.给 Topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 Partition 必须有至少 2 个副本。

2.在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 Leader 至少感知到有至少一个 Follower 还跟自己保持联系,没掉队,这样才能确保 Leader 挂了还有一个 Follower 吧。

kafka 集群启动后,所有的 broker 都会被 controller 监控,一旦有 broker 宕机,ZK 的监听机制会通知到 controller, controller 拿到挂掉 broker 中所有的 partition,以及它上面的存在的 leader,然后从 partition的 ISR 中选择一个 follower 作为 leader,更改 partition 的 follower 和 leader 状态。

3.在 Producer 端设置 acks=all:要求每条数据,必须是写入所有 Replica 之后,才能认为是写成功了

4.在 Producer 端设置 retries=MAX:一旦写入失败,就无限重试。

2.2 Kafka-Topic 操作常见问题

2.2.1 创建topic出错

Error while executing topic command : Replication factor: 1 larger than available brokers: 0.

解决方法

1.很可能是之前在server.properties配置文件夹里面和执行命令的zookeeper目录不一致。--zookeeper的值需要带上根目录,否则就会报这样的错误。例如配置文件里面写的连接目录是zookeeper.connect=master:2181,slave1:2181,slave3:2181/kafka,但是在执行命令时少写了kafka目录,写成一下

--zookeeper master:2181,slave1:2181,slave3:2181。就会报上述的错误,因此,务必要保证zookeeper的目录一致。

 

当Topic成功创建时,会输出Created topic “mobilePhone”,如上图。

注意:replication-factor不能大于broker数。

2.出现这个问题的原因是kafka没有启动的情况下想去create tpoic,所以应该是先去进到kafka安装目录的bin路径下,执行./kafka-server-start.sh ../config/server.properties 打开kafka即可!

 

2.2.2删除:删除某些不需要的Topic

  1.  

cd /usr/kafka/bin

 

 

./kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave3:2181/kafka --topic mobilePhone

 

 

 

step1:

如果需要被删除topic 此时正在被程序 produce和consume,则这些生产和消费程序需要停止。

因为如果有程序正在生产或者消费该topic,则该topic的offset信息一致会在broker更新。调用kafka delete命令则无法删除该topic。

同时,需要设置 auto.create.topics.enable = false,默认设置为true。

step2:

server.properties 设置 delete.topic.enable=true

如果没有设置 delete.topic.enable=true,则调用kafka 的delete命令无法真正将topic删除,而是显示(marked for deletion)

step3:

调用命令删除topic:

./bin/kafka-topics --delete --zookeeper 【zookeeper server:port】 --topic 【topic name】

step4:

删除kafka存储目录(server.properties文件log.dirs配置,默认为"/data/kafka-logs")相关topic的数据目录。

注意:如果kafka 有多个 broker,且每个broker 配置了多个数据盘(比如 /data/kafka-logs,/data1/kafka-logs ...),且topic也有多个分区和replica,则需要对所有broker的所有数据盘进行扫描,删除该topic的所有分区数据。

step5:

找一台部署了zk的服务器,使用命令:

bin/zkCli.sh -server 【zookeeper server:port】

登录到zk shell,然后找到topic所在的目录:ls /brokers/topics,找到要删除的topic,然后执行命令:

rmr /brokers/topics/【topic name】

即可,此时topic被彻底删除。

如果topic 是被标记为 marked for deletion,则通过命令 ls /admin/delete_topics,找到要删除的topic,然后执行命令:

rmr /admin/delete_topics/【topic name】

备注:

网络上很多其它文章还说明,需要删除topic在zk上面的消费节点记录、配置节点记录,比如:

rmr /consumers/【consumer-group】

rmr /config/topics/【topic name】

其实正常情况是不需要进行这两个操作的,

step6:

完成之后,调用命令:

./bin/kafka-topics.sh --list --zookeeper 【zookeeper server:port】

查看现在kafka的topic信息。正常情况下删除的topic就不会再显示。

但是,如果还能够查询到删除的topic,则重启zk和kafka即可。

2.2.3 Producer 发送卡顿60S

出现问题:

kafka 发送消息卡顿 60s:Kafka 在 producer 调用 send 发送数据的时候卡住一分钟,精确的一分钟,然后函数返回,没有抛出异常

解决方案:

 1.Try set the bootstrap.servers to 127.0.0.1:9092

 2.发现是没有配置 host

把 kafka 的 broker list 从主机名改成了 ip

 

 3.设置

props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, “0”);

这可能并不是IP或主机名的问题

 

 4.引入jar包的版本问题,原来引入的kafka的版本是<version>0.10.0.0</version>修改之后的版本是<version>0.10.1.0</version>

 5.因为producer端缓存区满了,,消息放入缓存的速度大于发送的速度,导致阻塞时长,因为默认max.block.ms是60s.

2.3 Consumer 端消费问题

2.3.1 Fetch offset 3012 is out of range for partition  

出现问题:

Kafka offset is out of range错误原因

使用Kafka的过程中,发现consumer在启动的时候常常会抛出“offset is out of range”的错误,然后将offset设置为0,然后正常运行。

原因如下:

当一个consumer启动的时候,它不知道从哪个offset开始读取数据,于是它使用一个最大的长整数,这样会引发OffsetOutOfRangeException,当接收到这个错误后,consumer会根据配置中“autooffset.reset”的值来重置offset的值。

最终解决方案

(1)调整以下2个参数,减低replica从leader同步数据的速度:

message.max.bytes=10000000

replica.fetch.max.bytes=10737418

num.replica.fetchers=2

2.3.2.Consumer消费能力很低的情况下的处理方案

此类场景为kafka的consumer会从broker里面取出一批数据,�给消费线程进行消费。

由于取出的一批消息数量太大,consumer在session.timeout.ms时间之内没有消费完成consumer coordinator 会由于没有接受到心跳而挂掉。

出现问题:

INFO[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator coordinatorDead 529: kafka-example|NTI|Marking the coordinator 2147483646 dead.

[rhllor]DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator sendGroupMetadataRequest 465: kafka-example|NTI|Issuing group metadata request to broker 1

[rhllor]ERROR[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1

日志的意思大概是coordinator挂掉了,然后自动提交offset失败,然后重新分配partition给客户端;由于自动提交offset失败,导致重新分配了partition的客户端又重新消费之前的一批数据;接着consumer重新消费,又出现了消费超时,无限循环下去。

解决方案:

1.提高partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力

2.对于单partition的消费线程,增加了一个固定长度的阻塞队列和工作线程池进一步提高并行消费的能力

3.enable.auto.commit设置成false,使用同步异步组合的方式进行offset提交。

2.3.3 Consumer.commitfailedException:无法完成提交

测试过程中报错:

14:06:29.751[thread-36]错误O.A.K.C.C.I.ConsumerCoordinator-[consumer clientid=consumer-3,group id=group id dynamic feature interval]带偏移量的偏移量提交min5_feature_result-6=offsetandmetadata_offset=128783,metadata='',min5_feature_result-7=offsetandmetadata_offset=242999,元数据='',min5_feature结果-4=offsetandmetadata偏移量=42803,metadata='',min5_feature结果-5=offsetandmetadata偏移量=87464,metadata='',min5_feature结果-8=offsetandmetadata偏移量=0,metadata='',min5_feature结果-9=offsetandmetadata偏移量=0,metadata='',min5_feature结果-2=offsetandmetadata offset=70240,metadata='',min5_feature result-3=offsetandmetadata offset=275868,metadata='',min5_feature result-0=offsetandmetadata offset=401720,metadata='',min5_feature result-1=offsetandmetadata offset=139135,metadata=''失败              org.apache.kafka.clients.consumer.commitfailedException:无法完成提交,

 

问题分析:

 

因为组已重新平衡并将分区分配给另一个成员,这意味着随后调用poll()之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息,这可以通过增加会话超时或减少poll()中使用max.poll.records返回的批的最大大小来实现。

解决方案:

1.增大会话时间:默认:10000,现改为60000

session.timeout.ms:

当使用Kafkagroup管理用法时,这个超时时间用来检测consumer是否失效。consumer通过发送心跳信息给broker,用来表明自己还有效。如果broker在这个超时时间内没有收到来自consumer的心跳信息,则broker会从consumergroup中移除这个consumer,并重新进行负载均衡。注意,这个值必须在broker配置的允许范围之内:即group.min.session.timeout.ms和group.max.session.timeout.ms之间。(6000-300000之间)

 

2.减少max.poll.records返回的批的最大大小:默认500,改为:

max.poll.records:一次单独调用poll()可以返回的消息的最大条数。一次单独调用poll()可以返回的消息的最大条数。

个人认为:

3.应相应增大heartbeat.interval.ms;默认3000,改为 session.timeout.ms=session.timeout.ms/3

当使用Kafka的group管理用法时,consumer协作器两次心跳之间的时间间隔。心跳链接用来保证consumer的会话依然活跃,以及在新consumer加入consumergroup时可以重新进行负载均衡。这个值要比session.timeout.ms小,但是一般要比session.timeout.ms的1/3要大。这个值可以适当的减小,以控制重负载均衡的时间。

2.4 kafka 启动失败

2.4.1FATAL Fatal error during KafkaServerStartable startup.

出现问题:

[2019-08-14 15:50:45,442] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)

kafka.common.KafkaException: Failed to acquire lock on file .lock in /usr/lib/LOCALCLUSTER/kafka/kafka-logs2. A Kafka instance in another process or thread is using this directory.

 

分析问题:

错误日志意思为:Kafka-logs2日志文件在另一个进程中已被占用

解决方法:

ps -ef|grep kafka,杀掉使用该目录的进程即可;

 

2.4.2 消息的大小超过了设置的message.max.size

分析问题:

message.max.size(默认是1048567及1M)

Exception in thread "main" org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {test-peng-0=51285860} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size on the client (using max.partition.fetch.bytes), or decrease the maximum message size the broker will allow (using message.max.bytes).

解决方案:

重新设置message.max.bytes",33554432 ,是为了限制batch在一次request中发送太大的信息量。

2.4.3  kafka启动时内存异常

2.4.3.1启动时内存不足

##There is insufficient memory for the Java Runtime Environment to continue.

#Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory.

#An error report file with more information is saved as:

#//hs_err_pid6500.logOpenJDK

64-BitServer VM warning:INFO: os::commit_memory(0x00000000bad30000,

986513408,0)failed; error='Cannotallocate memory'(errno=12)

原因:kafka启动脚本kafka-server-start.sh中指定了kafka启动时需要的最小内存,默认为1G

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

虚拟机分配的虚拟内存在1G以下时就会出现该错误。

 

解决方法:修改脚本kafka-server-start.sh中的最小启动内存,设置为较小值。

2.4.3.2.启动时出现oom

[ FATAL ] Fatal error during KafkaServerStable startup. Prepare to shutdown

 java.lang.OutOfMemoryError: Java heap space

        at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)

        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)

        at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:42)

..........

==========================================================

原因:kafka启动时分配的内存过小导致

解决方法:同样脚本kafka-server-start.sh中的最小启动内存,设置为较大值。

2.4.3.3.kafka 启动时报java.io.IOException: No space left on device

发现Kafka所在磁盘已满,清理磁盘

Linux 查看磁盘空间:

df

df 以磁盘分区为单位查看文件系统,可以获取硬盘被占用了多少空间,目前还剩下多少空间等信息。

例如,我们使用df -h命令来查看磁盘信息, -h 选项为根据大小适当显示:

显示内容参数说明:

Filesystem:文件系统

Size: 分区大小

Used: 已使用容量

Avail: 还可以使用的容量

Use%: 已用百分比

Mounted on: 挂载点

解决方案:

清理数据,释放空间:

linux删除某个文件下的所有文件:

进入这个文件夹

然后用命令 rm -rf *。

文章知识点与官方知识档案匹配,可进一步学习相关知识
云原生入门技能树首页概览13440 人正在系统学习中

与[转帖]Kafka 性能优化与问题深究相似的内容:

[转帖]Kafka 性能优化与问题深究

Kafka 性能优化与问题深究 一.Kafka深入探究 1.1 kafka整体介绍 1. 1.1 Kafka 如何做到高吞吐、低延迟的呢? Kafka是一个分布式高吞吐量的消息系统,这里提下 Kafka 写数据的大致方式:先写操作系统的页缓存(Page Cache),然后由操作系统自行决定何时刷到磁

[转帖]对比测试:Apache Pulsar 与 Kafka 在金融场景下的性能分析

https://baijiahao.baidu.com/s?id=1680081990582501220&wfr=spider&for=pc Apache Pulsar 是下一代分布式消息流平台,采用计算存储分层架构,具备多租户、高一致、高性能、百万 topic、数据平滑迁移等诸多优势。越来越多的企

[转帖]java性能分析之火焰图

http://t.zoukankan.com/lemon-le-p-13820204.html 原由 最近因为kafka、zookeeper、ES和相关的Java应用的内存问题搞的头大,做运维将近4年,对Java调优、性能方面的知识了解的少之又少,是时候下定决心来对他多一个学习了。不能一口吃成一个胖

[转帖]Jmeter笔记:使用Jmeter向kafka发送消息

https://www.cnblogs.com/daydayup-lin/p/14124816.html 日常工作中有时候需要向kafka中发送消息来测试功能或者性能,这时候我们怎么办呢?我之前是自己写个简单的python脚本来模拟发送消息的,其实用Jmeter来实现也比较简单方便。 1、我们必须有

[转帖]Kafka 基本概念大全

https://my.oschina.net/jiagoushi/blog/5600943 下面给出 Kafka 一些重要概念,让大家对 Kafka 有个整体的认识和感知,后面还会详细的解析每一个概念的作用以及更深入的原理 ・Producer:消息生产者,向 Kafka Broker 发消息的客户端

[转帖]Kafka 与RocketMQ 落盘机制比较

https://www.jianshu.com/p/fd50befccfdd 引言 前几期的评测中,我们对比了Kafka和RocketMQ的吞吐量和稳定性,本期我们要引入一个新的评测标准——软件可靠性。 何为“可靠性”? 先看下面这种情况:有A,B两辆越野汽车,在城市的周边地区均能很好应对泥泞的路况

[转帖]Kafka关键参数设置

https://www.cnblogs.com/wwcom123/p/11181680.html 生产环境中使用Kafka,参数调优非常重要,而Kafka参数众多,我们的java的Configuration代码中,经常设置的参数如下: Properties props = new Propertie

[转帖]kafka压测多维度分析实战

设置虚拟机不同的带宽来进行模拟压测 kafka数据压测 1、公司生产kafka集群硬盘:单台500G、共3台、日志保留7天。 1.1 版本:1.1.0 2、压测kafka。 2.1 使用kafka自带压测工具:bin/kafka-producer-perf-test.sh 命令参数解释: --num

[转帖]Kafka—配置SASL/PLAIN认证客户端及常用操作命令

介绍 SASL/PLAIN 是一种简单的 username/password安全认证机制,本文主要总结服务端开启该认证后,命令行客户端进行配置的操作流程。 配置 增加jaas.properties 在kafka的config目录下增加jaas.properties文件指定认证协议为SASL_PLAI

[转帖]kafka 配置认证与授权

https://www.cnblogs.com/yjt1993/p/14739130.html 本例不使用kerberos做认证,使用用户名和密码的方式来进行认证 1、服务端配置 1.0 配置server.properties 添加如下配置 #配置 ACL 入口类 authorizer.class.