📜  弹簧靴 |如何使用 Apache Kafka 使用 JSON 消息

📅  最后修改于: 2022-05-13 01:55:08.343000             🧑  作者: Mango

弹簧靴 |如何使用 Apache Kafka 使用 JSON 消息

Apache Kafka 是一个流处理系统,可让您在进程、应用程序和服务器之间发送消息。在本文中,我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。

要了解如何创建 Spring Boot 项目,请参阅本文。

工作步骤:

  1. 转到 Spring 初始化程序并创建一个具有以下依赖项的启动项目:
    • Apache Kafka 的春天
  2. 在 IDE 中打开项目并同步依赖项。在本文中,我们将创建一个学生模型,我们将在其中发布学生详细信息。因此,创建一个模型类Student 。添加数据成员并创建构造函数并覆盖toString方法以查看 JSON 格式的消息。以下是学生类的实现:
Student Model
// Java program to implement a
// student class
 
// Creating a student class
public class Student {
 
    // Data members of the class
    int id;
    String firstName;
    String lastName;
 
    // Constructor of the student
    // class
    public Student()
    {
    }
 
    // Parameterized constructor of
    // the student class
    public Student(int id, String firstName,
                   String lastName)
    {
        this.id = id;
        this.firstName = firstName;
        this.lastName = lastName;
    }
 
    @Override
    public String toString()
    {
        return "Student{"
            + "id = " + id
            + ", firstName = '" + firstName + "'"
            + ", lastName = '" + lastName + "'"
            + "}";
    }
}


Config clas
@EnableKafka
@Configuration
public class Config {
 
    // Function to establish a connection
    // between Spring application
    // and Kafka server
    @Bean
    public ConsumerFactory
    studentConsumer()
    {
 
        // HashMap to store the configurations
        Map map
            = new HashMap<>();
 
        // put the host IP in the map
        map.put(ConsumerConfig
                    .BOOTSTRAP_SERVERS_CONFIG,
                "127.0.0.1:9092");
 
        // put the group ID of consumer in the map
        map.put(ConsumerConfig
                    .GROUP_ID_CONFIG,
                "id");
        map.put(ConsumerConfig
                    .KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        map.put(ConsumerConfig
                    .VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
 
        // return message in JSON formate
        return new DefaultKafkaConsumerFactory<>(
            map, new StringDeserializer(),
            new JsonDeserializer<>(Student.class));
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory
    studentListner()
    {
        ConcurrentKafkaListenerContainerFactory
            factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(studentConsumer());
        return factory;
    }
}


KafkaService Class
@Service
public class KafkaService {
 
    // Annotation required to listen
    // the message from Kafka server
    @KafkaListener(topics = "JsonTopic",
                   groupId = "id", containerFactory
                                   = "studentListner")
    public void
    publish(Student student)
    {
        System.out.println("New Entry: "
                           + student);
    }
}


  1. 创建一个新类Config并添加注释@Configuration@EnableKafka 。现在使用 Student 类对象创建 beans ConsumerFactoryConcurrentKafkaListenerContainerFactory

配置类

@EnableKafka
@Configuration
public class Config {
 
    // Function to establish a connection
    // between Spring application
    // and Kafka server
    @Bean
    public ConsumerFactory
    studentConsumer()
    {
 
        // HashMap to store the configurations
        Map map
            = new HashMap<>();
 
        // put the host IP in the map
        map.put(ConsumerConfig
                    .BOOTSTRAP_SERVERS_CONFIG,
                "127.0.0.1:9092");
 
        // put the group ID of consumer in the map
        map.put(ConsumerConfig
                    .GROUP_ID_CONFIG,
                "id");
        map.put(ConsumerConfig
                    .KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        map.put(ConsumerConfig
                    .VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
 
        // return message in JSON formate
        return new DefaultKafkaConsumerFactory<>(
            map, new StringDeserializer(),
            new JsonDeserializer<>(Student.class));
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory
    studentListner()
    {
        ConcurrentKafkaListenerContainerFactory
            factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(studentConsumer());
        return factory;
    }
}
  1. 使用@Service注解创建一个类KafkaService 。此类将包含在控制台上发布消息的侦听器方法。

KafkaService 类

@Service
public class KafkaService {
 
    // Annotation required to listen
    // the message from Kafka server
    @KafkaListener(topics = "JsonTopic",
                   groupId = "id", containerFactory
                                   = "studentListner")
    public void
    publish(Student student)
    {
        System.out.println("New Entry: "
                           + student);
    }
}
  1. 启动 zookeeper 和 Kafka 服务器。现在我们需要创建一个名为JsonTopic的新主题。为此,请打开一个新的命令提示符窗口并将目录更改为 Kafka 目录。
  2. 现在使用下面给出的命令创建一个新主题:
  1. 现在要运行 Kafka 生产者控制台,请使用以下命令:
  1. 运行应用程序并在 Kafka 生产者上输入消息,然后按 Enter。