📜  弹簧靴 |如何在 Apache Kafka 上发布 JSON 消息

📅  最后修改于: 2021-10-28 02:55:44             🧑  作者: Mango

Apache Kafka 是一个发布订阅消息系统。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将看到如何在 Spring Boot 应用程序中向 Apache Kafka 发送 JSON 消息。

为了学习如何创建一个spring boot项目,可以参考这篇文章。

JSON 的完整形式是 JavaScript Object Notation。 JSON 是一种用于数据交换的轻量级数据格式,易于人类读写,易于机器解析和生成。虽然它源自 JavaScript 的一个子集,但它是独立于语言的。它是一种完全独立于语言的文本格式。可以按照以下步骤将 JSON 消息发布到 Apache Kafka:

  1. 转到 spring initializr 并创建一个具有以下依赖项的入门项目:
    • 春网
    • Apache Kafka 的 Spring
  2. 在 IDE 中打开项目并同步依赖项。在本文中,我们将创建一个学生模型,我们将在其中发布学生详细信息。因此,创建一个模型类Student 。添加数据成员并创建构造函数并创建 getter 和 setter。下面是学生类的实现:
    // Java program to implement a
    // student class
      
    // Creating a student class
    public class Student {
      
        // Data members of the
        // student class
        int id;
        String firstName;
        String lastName;
      
        // Constructor of the student
        // class
        public Student(int id, String firstName,
                       String lastName)
        {
            this.id = id;
            this.firstName = firstName;
            this.lastName = lastName;
        }
      
        // Implementing the getters
        // and setters
        public int getId()
        {
            return id;
        }
      
        public void setId(int id)
        {
            this.id = id;
        }
      
        public String getFirstName()
        {
            return firstName;
        }
      
        public void setFirstName(String firstName)
        {
            this.firstName = firstName;
        }
      
        public String getLastName()
        {
            return lastName;
        }
      
        public void setLastName(String lastName)
        {
            this.lastName = lastName;
        }
    }
    
  3. 现在,创建一个带有注释@RestController的新类Controller 。创建一个 GET API 并使用参数作为字符串和模型类对象初始化KafkaTemplate 。下面是控制器的实现:
    // Java program to implement a
    // controller
      
    @RestController
    @RequestMapping("gfg")
    public class UserResource {
      
        @Autowired
        private KafkaTemplate
            kafkaTemplate;
      
        private static final String TOPIC
            = "StudentExample";
      
        @GetMapping("/publish/{id}/"
                    + "{firstName}/{lastName}")
      
        public String post(
            @PathVariable("id") final int id,
            @PathVariable("firstName") final
                String firstName,
            @PathVariable("lastName") final
                String lastName)
        {
      
            kafkaTemplate.send(
                TOPIC,
                new Student(
                    id, firstName,
                    lastName));
      
            return "Published successfully";
        }
    }
    
  4. 创建一个带有@Configuration注释的StudentConfig类。在这个类中,我们将序列化模型类的对象。
    // Java program to serialize the
    // object of the model class
      
    @Configuration
    public class StudentConfig {
      
        @Bean
        public ProducerFactory
        producerFactory()
        {
            // Create a map of a string
            // and object
            Map config
                = new HashMap<>();
      
            config.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "127.0.0.1:9092");
      
            config.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
      
            config.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);
      
            return new DefaultKafkaProducerFactory<>(config);
        }
      
        @Bean
        public KafkaTemplate
        kafkaTemplate()
        {
            return new KafkaTemplate<>(
                producerFactory());
        }
    }
    
  5. 现在,启动 zookeeper 和 Kafka 服务器。我们需要创建一个名为StudentExample的新主题。为此,请打开一个新的命令提示符窗口并将目录更改为 Kafka 文件夹。
  6. 现在,使用下面给出的命令创建一个新主题:
  7. 现在要实时查看 Kafka 服务器上的消息,请使用以下命令:
  8. 运行应用程序并将 API 调用为:

    注意:如果使用了不同的端口,则将该端口替换为 8080。

输出:

  • 调用API:
  • 实时查看消息: