📜  kafka 列表主题 (1)

📅  最后修改于: 2023-12-03 14:43:38.224000             🧑  作者: Mango

Kafka 列表主题

Kafka 是一个分布式的消息中间件,它提供了一个高吞吐量、低延迟的平台来处理实时数据流。Kafka 的列表主题是 Kafka 中的一种重要概念,本文将对 Kafka 列表主题进行详细介绍。

什么是 Kafka 列表主题

Kafka 列表主题是一个逻辑上的概念,它表示了由一个或多个分区组成的消息流。每个分区都是有序的,并且只能被一个消费者组内的一个消费者实例消费。Kafka 提供了一个类似于文件系统的层次结构,可以将不同主题组织在不同的目录中。

如何创建 Kafka 列表主题

可以使用 Kafka 提供的命令行工具来创建 Kafka 列表主题。下面是一个创建名为 test 的主题,有 3 个分区、1 个副本的命令:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
如何发送消息到 Kafka 列表主题

可以使用 Kafka 的 Producer API 来发送消息到 Kafka 列表主题。下面是一个简单的 Java 程序,向名为 test 的主题发送消息:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
    producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));
}

producer.close();
如何消费 Kafka 列表主题的消息

可以使用 Kafka 的 Consumer API 来消费 Kafka 列表主题的消息。下面是一个简单的 Java 程序,从名为 test 的主题消费消息:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

consumer.close();
总结

Kafka 列表主题是 Kafka 中的一个重要概念,它表示了由一个或多个分区组成的消息流。可以使用 Kafka 提供的命令行工具来创建 Kafka 列表主题,使用 Kafka 的 Producer API 来发送消息,使用 Kafka 的 Consumer API 来消费消息。