📅  最后修改于: 2023-12-03 15:13:26.041000             🧑  作者: Mango
在Apache Kafka中,使用者(consumer)是指从主题(topic)读取数据并处理数据的客户端应用程序。使用者组(consumer group)通常由一组使用者组成,它们共同消费主题中的消息。
使用者从主题订阅数据,并按其自己的节奏处理数据。下面是使用者的一些常见用途:
在Kafka中,使用者使用轮询(polling)机制从主题中读取数据。使用者可以按照自己的时间表配置轮询间隔。例如,如果使用者需要进行实时处理,则可以使用较短的轮询间隔。
使用者可以从多个分区(partition)中读取数据。使用者必须指定要读取的分区。如果分区未指定,则读取所有分区。
使用者组是多个使用者的逻辑组合。使用者组可以协调多个使用者,这些使用者共同消费主题中的消息。
每个分区都只能由使用者组中的一个使用者进行处理。这意味着使用者组可以水平扩展以增加处理能力。
使用者组有两个重要的属性:group.id和offset。组ID用于将使用者分组。在组内,使用者将共享分配的分区。偏移量用于跟踪使用者消费消息的位置。每个使用者将跟踪自己的偏移量。
使用者组中的使用者可以动态添加或删除,并且使用者组可以自动重新平衡以重新分配分区。重新平衡时,Kafka将停止正在处理的使用者,并将其分配给其他使用者。
以下代码示例演示如何创建一个使用者和一个使用者组:
// 创建配置对象
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group"); // 使用者组ID
// 创建使用者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
// 订阅主题和分区
consumer.subscribe(Arrays.asList("my-topic"));
// 读取数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
以上示例创建了一个使用者组my-group,使用者将订阅主题my-topic,并每秒轮询一次。每当使用者读取到一条消息时,它将打印消息的元数据和内容。