📅  最后修改于: 2023-12-03 15:24:50.017000             🧑  作者: Mango
在使用 Kafka 时,有时需要检查每个主题中的消息数量。本文将介绍几种方法来检查 Kafka 主题中的消息数量。
Kafka 提供了一个命令行工具 kafka-run-class.sh
,可以使用该工具查看主题中的消息数量。
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic my_topic --broker-list localhost:9092 --time -1 --offsets 1 | awk -F : '{sum += $3} END {print sum}'
其中 --topic
参数指定要查询的主题名称,--broker-list
参数指定 Kafka 集群中的 broker 地址,--time
参数指定消息的时间戳,这里使用 -1 表示获取最新的消息数量,--offsets
参数表示返回的结果是当前已消费的消息偏移量。
通过使用 KafkaConsumer API,可以在 Java 中检查该主题中的消息数量。
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
public class ConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
long count = 0;
for (String topic : topics.keySet()) {
List<PartitionInfo> partitions = topics.get(topic);
for (PartitionInfo partition : partitions) {
TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(Collections.singleton(tp));
count += consumer.position(tp);
}
}
System.out.println("Total message count: " + count);
consumer.close();
}
}
在上述 Java 代码中,我们使用 KafkaConsumer API 来获取所有主题并计算每个主题中的消息数量。
本文介绍了两种方法来检查 Kafka 主题中的消息数量,每种方法均有其优缺点。对于大型集群,使用命令行工具可能比 Java 代码更有效率。相反,对于小型集群,Java 代码可能是更好的选择。