Kafka Consumer API
Kafka 提供了两套 API 给 Consumer
- The high-level Consumer API
- The SimpleConsumer API
第一种高度抽象的 Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的 API。
SimpleConsumer 优势
那么第二种 The SimpleConsumer API
能够帮助我们做哪些事情?
- 一个消息读取多次
- 在一个处理过程中只消费 Partition 其中的一部分消息
- 添加事务管理机制以保证消息被处理且仅被处理一次
SimpleConsumer 弊端
使用 SimpleConsumer 有哪些弊端呢?
- 必须在程序中跟踪 offset 值
- 必须找出指定 Topic Partition 中的 lead broker
- 必须处理 broker 的变动
SimpleConsumer 步骤
使用 SimpleConsumer 的步骤
- 从所有活跃的 broker 中找出哪个是指定 Topic Partition 中的 leader broker
- 找出指定 Topic Partition 中的所有备份 broker
- 构造请求
- 发送请求查询数据
- 处理 leader broker 变更
命令行获取 topic 信息总量
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list XXX1:9092 --topic topicName1 --time -1
topicName1:2:73454
topicName1:5:73006
topicName1:4:73511
topicName1:1:73493
topicName1:3:73019
topicName1:0:72983
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list XXX1:9092 --topic topicName1 --time -2
topicName1:2:0
topicName1:5:0
topicName1:4:0
topicName1:1:0
topicName1:3:0
topicName1:0:0
--time -1 表示要获取指定 topic 所有分区当前的最大位移,**--time -2** 表示获取当前最早位移。
两个命令的输出结果相减便可得到所有分区当前的消息总数。
分区当前的消息总数 = [--time-1] - [--time-2]
相减是因为随着 kafka 的运行,topic 中有的消息可能会被删除,因此 --time -1 的结果其实表示的是历史上该 topic 生产的最大消息数,如果用户要统计当前的消息总数就必须减去 --time -2 的结果。
本例中没有任何消息被删除,故 --time -2 的结果全是 0,表示最早位移都是 0,消息总数等于历史上发送的消息总数。
Java 获取 topic 消息总量
high-level Consumer
The high-level Consumer API 获取 Kafka 指定 topic 的消息总量:
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaOffsetTools {
private final static Logger logger = LoggerFactory.getLogger(KafkaOffsetTools.class);
public static final String KAFKA_BOOTSTRAP_SERVERS = "XXX1:9092,XXX2:9092,XXX3:9092";
public static final List<String> TOPIC_LIST = Arrays.asList("topicName1","topicName2");
public static void main(String[] args) {
for(String topic: TOPIC_LIST) {
long totolNum = totalMessageCount(topic, KAFKA_BOOTSTRAP_SERVERS);
System.out.println(topic+":"+totolNum);
}
}
public static long totalMessageCount(String topic, String brokerList) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
List<TopicPartition> tps = Optional.ofNullable(consumer.partitionsFor(topic))
.orElse(Collections.emptyList(