📜  用Java创建Kafka使用者

📅  最后修改于: 2021-01-05 02:54:38             🧑  作者: Mango

用Java创建Kafka Consumer

在上一节中,我们学习了用Java创建生产者。在本节中,我们将学习在Java中实现Kafka使用者。

要创建使用者,需要执行以下步骤:

  • 创建记录器
  • 创建消费者属性。
  • 创建消费者。
  • 为消费者订阅特定主题。
  • 轮询一些新数据

让我们讨论学习Java使用者实现的每个步骤。

创建记录器

记录器被实现为在程序执行期间写入日志消息。用户需要创建一个Logger对象,这将需要导入“ org.slf4j class ”。下面的快照显示了Logger的实现:

创建消费者属性

与生产者属性类似,Apache Kafka还提供了用于创建消费者的各种不同属性。要了解每个消费者的财产,请访问Apache Kafa的官方网站>文档>配置>用户配置。在这里,我们将列出使用者的必需属性,例如:

key.deserializer:它是密钥的反序列化器类,用于实现org.apache.kafka.common.serialization.Deserializer接口。

value.deserializer:值的反序列化器类,实现了org.apache.kafka.common.serialization.Desrializer接口。

bootstrap.servers:这是主机/端口对的列表,用于建立与Kafka集群的初始连接。它不包含客户端所需的全套服务器。仅需要引导所需的服务器。

group.id:这是一个唯一字符串,用于标识使用者组的使用者。当消费者通过订阅主题使用基于Kafka的偏移管理策略或组管理功能时,需要此属性。

auto.offset.reset:当不存在初始偏移量或服务器上不再存在当前偏移量时,此属性是必需的。有以下值可用于重置偏移值:

最早:此偏移量变量会自动将值重置为其最早的偏移量。

最新:此偏移量变量将偏移值重置为其最新偏移量。

none:如果未找到上一组的先前偏移量,则会向使用方抛出异常。

其他:引发消费者异常。

注意:在我们的代码中,我们使用了“最早的”变量将值重置为最早的值。

这些是实现使用者所必需的一些基本属性。让我们使用IntelliJ IDEA来实现。

步骤1)定义一个新的java类为“ consumer1.java ”。

步骤2)在类中描述使用者属性,如以下快照所示:

在快照中,描述了所有必要的属性。

创造消费者

创建一个KafkaConsumer对象以创建使用者,如下所示:

在创建使用者时传递上述属性。

订阅消费者

要从主题读取消息,我们需要将使用者连接到指定的主题。消费者可以通过各种订阅API进行订阅。在这里,我们使用Arrays.asList()是因为用户可能想订阅一个或多个主题。因此,Arrays.asList()允许订阅方订阅多个主题。

下面的代码显示了使用者订阅的实现:

用户需要直接或通过字符串变量指定主题名称以阅读消息。可以有多个主题,也用逗号分隔。

轮询新数据

消费者通过轮询方法从Kafka读取数据。

poll方法返回从当前分区的偏移量获取的数据。指定等待数据的持续时间,否则将空的ConsumerRecord返回给使用者。同样,记录器将获取记录键,分区,记录偏移量及其值。

下面列出了影响Java使用者的完整代码:

package com.firstgroupapp.aktutorial;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class consumer1 {
    public static void main(String[] args) {
        Logger logger= LoggerFactory.getLogger(consumer1.class.getName());
        String bootstrapServers="127.0.0.1:9092";
        String grp_id="third_app";
        String topic="my_first";
        //Creating consumer properties
        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //creating consumer
        KafkaConsumer consumer= new KafkaConsumer(properties);
        //Subscribing
                consumer.subscribe(Arrays.asList(topic));
        //polling
        while(true){
            ConsumerRecords records=consumer.poll(Duration.ofMillis(100));
            for(ConsumerRecord record: records){
                logger.info("Key: "+ record.key() + ", Value:" +record.value());
                logger.info("Partition:" + record.partition()+",Offset:"+record.offset());
            }


        }
    }
}

这样,消费者可以通过依次执行每个步骤来阅读消息。

使用者实现的输出可在以下快照中看到:

键值为空。这是因为我们之前没有指定任何密钥。由于“最早”,从头开始显示所有消息。

读取消费者组中的数据

用户可以让一个以上的消费者总共读取数据。这可以通过消费群体来完成。在消费者组中,一个或多个消费者将能够从Kafka中读取数据。如果用户想从头开始阅读消息,请重置group_id或更改group_id。这将重置用户的应用程序,并从头开始显示消息。