📅  最后修改于: 2023-12-03 15:40:52.615000             🧑  作者: Mango
Apache Kafka是一个分布式流处理平台,其架构基于发布-订阅模式。Kafka使用者(Consumer)是用于接收Kafka消息的客户端。在本文中,我们将探讨如何使用Java创建Kafka使用者。
在开始之前,需要确保已经安装并启动了Kafka。Kafka的安装和启动可以参考官方文档。
此外,还需要引入Kafka客户端依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
创建Kafka使用者需要指定Kafka集群的地址和相关的配置信息。下面是一个简单的Kafka使用者示例:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerDemo {
private final static String TOPIC_NAME = "test_topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "test_group";
private final static int POLL_TIMEOUT = 1000;
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
在上述示例中,首先创建一个Properties
实例,将Kafka集群地址、消费者组ID以及序列化和反序列化类的全限定名添加进去。然后创建一个Kafka使用者实例,并通过subscribe
方法订阅一个主题。最后使用poll
方法进行消息接收,并对接收到的消息进行处理。
需要注意的是,Kafka使用者的接收是阻塞的。因此,需要在while循环中进行轮询。
在使用完Kafka使用者之后,需要进行关闭操作,以释放资源。下面是一个简单的关闭Kafka使用者的示例:
consumer.close();
Kafka使用者是接收Kafka消息的客户端。借助Kafka客户端依赖和相关配置,我们可以很容易地创建一个Kafka使用者。在接收到消息后,我们可以对消息进行处理。最后,不要忘记在使用完Kafka使用者后进行关闭操作。