📜  如何检查 kafka 主题中的消息数量 - Java (1)

📅  最后修改于: 2023-12-03 15:24:50.017000             🧑  作者: Mango

如何检查 Kafka 主题中的消息数量 - Java

在使用 Kafka 时,有时需要检查每个主题中的消息数量。本文将介绍几种方法来检查 Kafka 主题中的消息数量。

方法一:使用 Kafka命令行工具

Kafka 提供了一个命令行工具 kafka-run-class.sh ,可以使用该工具查看主题中的消息数量。

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic my_topic --broker-list localhost:9092 --time -1 --offsets 1 | awk -F : '{sum += $3} END {print sum}'

其中 --topic 参数指定要查询的主题名称,--broker-list 参数指定 Kafka 集群中的 broker 地址,--time 参数指定消息的时间戳,这里使用 -1 表示获取最新的消息数量,--offsets 参数表示返回的结果是当前已消费的消息偏移量。

方法二:使用 KafkaConsumer API

通过使用 KafkaConsumer API,可以在 Java 中检查该主题中的消息数量。

import java.util.*;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;

public class ConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        Map<String, List<PartitionInfo>> topics = consumer.listTopics();
        long count = 0;

        for (String topic : topics.keySet()) {
            List<PartitionInfo> partitions = topics.get(topic);
            for (PartitionInfo partition : partitions) {
                TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
                consumer.assign(Arrays.asList(tp));
                consumer.seekToEnd(Collections.singleton(tp));
                count += consumer.position(tp);
            }
        }

        System.out.println("Total message count: " + count);
        consumer.close();
    }
}

在上述 Java 代码中,我们使用 KafkaConsumer API 来获取所有主题并计算每个主题中的消息数量。

总结

本文介绍了两种方法来检查 Kafka 主题中的消息数量,每种方法均有其优缺点。对于大型集群,使用命令行工具可能比 Java 代码更有效率。相反,对于小型集群,Java 代码可能是更好的选择。