📜  Apache Kafka使用者和使用者组(1)

📅  最后修改于: 2023-12-03 15:13:26.041000             🧑  作者: Mango

Apache Kafka使用者和使用者组

在Apache Kafka中,使用者(consumer)是指从主题(topic)读取数据并处理数据的客户端应用程序。使用者组(consumer group)通常由一组使用者组成,它们共同消费主题中的消息。

使用者

使用者从主题订阅数据,并按其自己的节奏处理数据。下面是使用者的一些常见用途:

  • 实时数据处理:使用者可以将数据实时处理,例如将数据写入到数据库或将其转发到另一个主题中。
  • 流数据处理:使用者还可以将数据流式传输到外部系统中,例如Spark或Flink,以进行更高级的分析。

在Kafka中,使用者使用轮询(polling)机制从主题中读取数据。使用者可以按照自己的时间表配置轮询间隔。例如,如果使用者需要进行实时处理,则可以使用较短的轮询间隔。

使用者可以从多个分区(partition)中读取数据。使用者必须指定要读取的分区。如果分区未指定,则读取所有分区。

使用者组

使用者组是多个使用者的逻辑组合。使用者组可以协调多个使用者,这些使用者共同消费主题中的消息。

每个分区都只能由使用者组中的一个使用者进行处理。这意味着使用者组可以水平扩展以增加处理能力。

使用者组有两个重要的属性:group.id和offset。组ID用于将使用者分组。在组内,使用者将共享分配的分区。偏移量用于跟踪使用者消费消息的位置。每个使用者将跟踪自己的偏移量。

使用者组中的使用者可以动态添加或删除,并且使用者组可以自动重新平衡以重新分配分区。重新平衡时,Kafka将停止正在处理的使用者,并将其分配给其他使用者。

示例代码

以下代码示例演示如何创建一个使用者和一个使用者组:

// 创建配置对象
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group"); // 使用者组ID

// 创建使用者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());

// 订阅主题和分区
consumer.subscribe(Arrays.asList("my-topic"));

// 读取数据
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
            record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
}

以上示例创建了一个使用者组my-group,使用者将订阅主题my-topic,并每秒轮询一次。每当使用者读取到一条消息时,它将打印消息的元数据和内容。