📜  用Java创建Kafka使用者(1)

📅  最后修改于: 2023-12-03 15:40:52.615000             🧑  作者: Mango

用Java创建Kafka使用者

Apache Kafka是一个分布式流处理平台,其架构基于发布-订阅模式。Kafka使用者(Consumer)是用于接收Kafka消息的客户端。在本文中,我们将探讨如何使用Java创建Kafka使用者。

准备工作

在开始之前,需要确保已经安装并启动了Kafka。Kafka的安装和启动可以参考官方文档

此外,还需要引入Kafka客户端依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>
创建Kafka使用者

创建Kafka使用者需要指定Kafka集群的地址和相关的配置信息。下面是一个简单的Kafka使用者示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {
    private final static String TOPIC_NAME = "test_topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String GROUP_ID = "test_group";
    private final static int POLL_TIMEOUT = 1000;

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

在上述示例中,首先创建一个Properties实例,将Kafka集群地址、消费者组ID以及序列化和反序列化类的全限定名添加进去。然后创建一个Kafka使用者实例,并通过subscribe方法订阅一个主题。最后使用poll方法进行消息接收,并对接收到的消息进行处理。

需要注意的是,Kafka使用者的接收是阻塞的。因此,需要在while循环中进行轮询。

关闭Kafka使用者

在使用完Kafka使用者之后,需要进行关闭操作,以释放资源。下面是一个简单的关闭Kafka使用者的示例:

consumer.close();
总结

Kafka使用者是接收Kafka消息的客户端。借助Kafka客户端依赖和相关配置,我们可以很容易地创建一个Kafka使用者。在接收到消息后,我们可以对消息进行处理。最后,不要忘记在使用完Kafka使用者后进行关闭操作。