[转帖]Java 获取 Kafka 指定 topic 的消息总量

java,获取,kafka,指定,topic,消息,总量 · 浏览次数 : 0

小编点评

```java // SimpleConsumer the SimpleConsumer API获取 Kafka 指定 topic 的消息总量 SimpleConsumer consumer = null; try { // String[] hostAndPort = seed.split("::"); // SimpleConsumer consumer = new SimpleConsumer(hostAndPort[0], Integer.valueOf(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime()); List topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); TopicMetadataResponse resp = consumer.send(req); List metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { map.put(part.partitionId(), part); } } } catch (Throwable e) { logger.error("Broker ["" + seed + " to find Leader for ["" + a_topic + ""] Reason: " + e.getMessage(), e); } finally { if (consumer != null) { consumer.close(); } } ```

正文

 

Kafka Consumer API

Kafka 提供了两套 API 给 Consumer

  • The high-level Consumer API
  • The SimpleConsumer API

第一种高度抽象的 Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的 API。

SimpleConsumer 优势

那么第二种 The SimpleConsumer API 能够帮助我们做哪些事情?

  • 一个消息读取多次
  • 在一个处理过程中只消费 Partition 其中的一部分消息
  • 添加事务管理机制以保证消息被处理且仅被处理一次

SimpleConsumer 弊端

使用 SimpleConsumer 有哪些弊端呢?

  • 必须在程序中跟踪 offset 值
  • 必须找出指定 Topic Partition 中的 lead broker
  • 必须处理 broker 的变动

SimpleConsumer 步骤

使用 SimpleConsumer 的步骤

  1. 从所有活跃的 broker 中找出哪个是指定 Topic Partition 中的 leader broker
  2. 找出指定 Topic Partition 中的所有备份 broker
  3. 构造请求
  4. 发送请求查询数据
  5. 处理 leader broker 变更

命令行获取 topic 信息总量

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list XXX1:9092 --topic topicName1 --time -1
topicName1:2:73454
topicName1:5:73006
topicName1:4:73511
topicName1:1:73493
topicName1:3:73019
topicName1:0:72983

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list XXX1:9092 --topic topicName1 --time -2
topicName1:2:0
topicName1:5:0
topicName1:4:0
topicName1:1:0
topicName1:3:0
topicName1:0:0
 

--time -1 表示要获取指定 topic 所有分区当前的最大位移,**--time -2** 表示获取当前最早位移。

两个命令的输出结果相减便可得到所有分区当前的消息总数。

分区当前的消息总数 = [--time-1] - [--time-2]

相减是因为随着 kafka 的运行,topic 中有的消息可能会被删除,因此 --time -1 的结果其实表示的是历史上该 topic 生产的最大消息数,如果用户要统计当前的消息总数就必须减去 --time -2 的结果。

本例中没有任何消息被删除,故 --time -2 的结果全是 0,表示最早位移都是 0,消息总数等于历史上发送的消息总数。

Java 获取 topic 消息总量

high-level Consumer

The high-level Consumer API 获取 Kafka 指定 topic 的消息总量:

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetTools {
	private final static Logger logger = LoggerFactory.getLogger(KafkaOffsetTools.class);

	public static final String KAFKA_BOOTSTRAP_SERVERS = "XXX1:9092,XXX2:9092,XXX3:9092";
	public static final List<String> TOPIC_LIST = Arrays.asList("topicName1","topicName2");
	
	public static void main(String[] args) {
		for(String topic: TOPIC_LIST) {
			long totolNum = totalMessageCount(topic, KAFKA_BOOTSTRAP_SERVERS);
			System.out.println(topic+":"+totolNum);
		}
	}
	
    public static long totalMessageCount(String topic, String brokerList) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            List<TopicPartition> tps = Optional.ofNullable(consumer.partitionsFor(topic))
                    .orElse(Collections.emptyList())
                    .stream()
                    .map(info -> new TopicPartition(info.topic(), info.partition()))
                    .collect(Collectors.toList());
            Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps);
            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
 
            return tps.stream().mapToLong(tp -> endOffsets.get(tp) - beginOffsets.get(tp)).sum();
        }
    }
}
 

输出结果:

topicName1:5301171
topicName2:439466
 

SimpleConsumer

The SimpleConsumer API 获取 Kafka 指定 topic 的消息总量:

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;

public class KafkaOffsetTools {
    
	private final static Logger logger = LoggerFactory.getLogger(KafkaOffsetTools.class);

	public static final String KAFKA_BOOTSTRAP_SERVERS = "XXX1:9092,XXX2:9092,XXX3:9092";
	public static final List<String> TOPIC_LIST = Arrays.asList("topicName1","topicName2");
	
	public static void main(String[] args) {
		String[] kafkaHosts = KAFKA_BOOTSTRAP_SERVERS.split(",");
		List<String> seeds = Arrays.asList(kafkaHosts);
		KafkaOffsetTools kot = new KafkaOffsetTools();
		Map<String, Integer> topicNumMap = new HashMap<String, Integer>();
		for (String topicName : TOPIC_LIST) {
			TreeMap<Integer, PartitionMetadata> metadatas = kot.findLeader(seeds, topicName);
			int logSize = 0;
			for (Entry<Integer, PartitionMetadata> entry : metadatas.entrySet()) {
				int partition = entry.getKey();
				String leadBroker = entry.getValue().leader().host();
				String clientName = "Client_" + topicName + "_" + partition;
				SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName);
				long readOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
				logSize += readOffset;
				if (consumer != null) {
					consumer.close();
				}
			}
			topicNumMap.put(topicName, logSize);
		}
		System.out.println(topicNumMap.toString());
	}

	private TreeMap<Integer, PartitionMetadata> findLeader(List<String> a_seedBrokers, String a_topic) {
		TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
		for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				String[] hostAndPort = seed.split(":");
				consumer = new SimpleConsumer(hostAndPort[0], Integer.valueOf(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime());
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						map.put(part.partitionId(), part);
					}
				}
			} catch (Throwable e) {
				logger.error("Broker [" + seed + "] to find Leader for [" + a_topic + "] Reason: " + e.getMessage(), e);
			} finally {
				if (consumer != null) {
					consumer.close();
				}
			}
		}
		return map;
	}
	
	public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime,
			String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
		OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}
}
 

输出结果:

{topicName1=5301171, topicName2=439466}
 

与[转帖]Java 获取 Kafka 指定 topic 的消息总量相似的内容:

[转帖]Java 获取 Kafka 指定 topic 的消息总量

发表于 2020-11-29 分类于 Java , Apache , JavaClass , Kafka Valine: 0 Kafka Consumer API Kafka 提供了两套 API 给 Consumer The high-level Consumer API The SimpleCon

[转帖]java获取到heapdump文件后,如何快速分析?

https://www.jianshu.com/p/aaf56385766d 简介 在之前的OOM问题复盘之后,本周,又一Java服务出现了内存问题,这次问题不严重,只会触发堆内存占用高报警,没有触发OOM,但好在之前的复盘中总结了dump脚本,会在堆占用高时自动执行jstack与jmap,使得我们

[转帖]查看docker中运行的JVM参数问题及解决方法

方法一、jcmd命令: 1、jps获取java的线程id 2、jcmd pidVM.flags获取 51152:-XX:CICompilerCount=3 -XX:InitialHeapSize=526385152 -XX:MaxHeapSize=1073741824 -XX:MaxNewSize=

[转帖]jcmd命令详解

1 基本知识 jcmd 是在 JDK1.7 以后,新增了一个命令行工具。 jcmd 是一个多功能的工具,相比 jstat 功能更为全面的工具,可用于获取目标 Java 进程的性能统计、JFR、内存使用、垃圾收集、线程堆栈、JVM 运行时间,也可以手动执行 GC、导出(TODO 能导出线程信息?)线程

[转帖]JVM-工具-jcmd

http://events.jianshu.io/p/011f0e3a39ff 一、jcmd 用法 1.1 基本知识 jcmd 是在 JDK1.7 以后,新增了一个命令行工具。 jcmd 是一个多功能的工具,相比 jstat 功能更为全面的工具,可用于获取目标 Java 进程的性能统计、JFR、内存

[转帖]jcmd命令详解

1 基本知识 jcmd 是在 JDK1.7 以后,新增了一个命令行工具。 jcmd 是一个多功能的工具,相比 jstat 功能更为全面的工具,可用于获取目标 Java 进程的性能统计、JFR、内存使用、垃圾收集、线程堆栈、JVM 运行时间,也可以手动执行 GC、导出(TODO 能导出线程信息?)线程

[转帖]jconsole远程监控认证,java远程监控,jmx监控应用,jmx ssl配置,jconsole ssl连接远程应用

知识普及 jmx JMX(java Management Extensions)是一个Java平台的管理和监控接口。任何程序,只要按JMX规范访问这个接口,就可以获取所有管理与监控信息,jconsole与Java VisualVM等常见监测工具都是基于jmx,JMX不但可以用于管理JVM,还可以管理

[转帖] shell管道咋堵住了

https://www.cnblogs.com/codelogs/p/16060378.html 背景# 起因是这样的,我们想开发一个小脚本,当cpu使用率过高时,使用jstack将java的线程栈保存下来,以便后面分析。 获取cpu使用率# 获取cpu使用率是比较容易的,使用vmstat就可以了,

[转帖]Java方法的JIT编译

https://www.jianshu.com/p/a6275e239eac Java方法执行一般会利用分层编译,先通过c1解释执行。方法执行编译等级逐渐提升,有机会通过JIT编译为特定平台汇编执行,以此获得最好的性能。 方法执行除了达到一定热度外,是否JIT编译也受到以下两个参数影响: -XX:+

[转帖]引人入胜,实战讲解“Java性能调优六大工具”之linux命令行工具

Java性能调优六大工具之Linux命令行工具 为了能准确获得程序的性能信息,需要使用各种辅助工具。本章将着重介绍用于系统性能分析的各种工具。熟练掌握这些工具,对性能瓶颈定位和系统故障排查都很有帮助。 1,Linux命令行工具2, Windows工具3,JDK命令行工具4,JConsole工具5,