📜  Spring Boot – 将 JSON 对象传递到 Kafka 主题中

📅  最后修改于: 2022-05-13 01:54:36.569000             🧑  作者: Mango

Spring Boot – 将 JSON 对象传递到 Kafka 主题中

Spring Boot 是Java编程语言中最流行和最常用的框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作可用于生产的应用程序需要的时间非常少。 Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。所以下面列出了 Spring boot 的一些主要特性。

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow。
  • 提供“入门”依赖项以简化构建配置。
  • 尽可能自动配置 Spring 和 3rd 方库。
  • 提供生产就绪功能,例如运行状况检查、指标和外部化配置。
  • 几乎不需要代码生成,也不需要 XML 配置。

Apache Kafka是一个发布-订阅消息系统。消息传递系统允许您在进程、应用程序和服务器之间发送消息。从广义上讲,Apache Kafka 是可以定义和进一步处理主题(一个主题可能是一个类别)的软件。应用程序可以连接到该系统并将消息传输到主题上。消息可以包含任何类型的信息,来自您个人博客上的任何事件,也可以是非常简单的文本消息,会触发任何其他事件。在文章Spring Boot Kafka Producer Example中阅读有关 Kafka 的更多信息,我们已经讨论了如何使用 Spring Boot 将消息发布到 Kafka 主题。但是在一个复杂的程序中,我们需要将 JSON 对象传递给 Kafka 主题。因此,让我们看看如何在本文中做到这一点。

执行:

第 1 步:转到此链接 https://start.spring.io/ 并创建一个 Spring Boot 项目。将以下依赖项添加到您的 Spring Boot 项目中。

  • 春季网络
  • Apache Kafka 的春天

第 2 步:创建一个名为Book的简单POJO类。

Java
// Java Program to Illustrate Book Class
  
package com.amiya.kafka.apachekafkaproducer;
  
// Class
public class Book {
  
    // Class data members
    private String bookName;
    private String isbn;
  
    // Constructor 1
    public Book() {}
  
    // Constructor 2
    public Book(String bookName, String isbn)
    {
        this.bookName = bookName;
        this.isbn = isbn;
    }
  
    // Getter
    public String getBookName() { return bookName; }
  
    // Setter
    public void setBookName(String bookName)
    {
        this.bookName = bookName;
    }
  
    // Getter
    public String getIsbn() { return isbn; }
  
    // Setter
    public void setIsbn(String isbn) { this.isbn = isbn; }
}


Java
// Java Program Illustrating Kafka Configurations 
  
package com.amiya.kafka.apachekafkaproducer;
  
// Importing required classes 
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
  
// Annotation
@Configuration
  
// Class
public class KafkaConfig {
  
    // Annotation
    @Bean
  
    // Method
    public ProducerFactory producerFactory()
    {
  
        // Creating a Map
        Map config = new HashMap<>();
  
        // Adding Configuration
  
        // 127.0.0.1:9092 is the default port number for
        // kafka
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "127.0.0.1:9092");
        config.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
        config.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);
  
        return new DefaultKafkaProducerFactory<>(config);
    }
  
    // Annotation
    @Bean
  
    // Method
    public KafkaTemplate kafkaTemplate()
    {
  
        return new KafkaTemplate<>(producerFactory());
    }
}


Java
// Java Program to Illustrate DemoController Class
  
package com.amiya.kafka.apachekafkaproducer;
  
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
  
// Annotation
@RestController
  
// Class
public class DemoController {
  
    // Autowiring Kafka Template
    @Autowired KafkaTemplate kafkaTemplate;
  
    private static final String TOPIC = "NewTopic";
  
    // Annotation
    @PostMapping("/publish")
    public String publishMessage(@RequestBody Book book)
    {
        // Sending the message
        kafkaTemplate.send(TOPIC, book);
  
        return "Published Successfully";
    }
}


第 3 步:创建一个名为KafkaConfig的配置文件

Java

// Java Program Illustrating Kafka Configurations 
  
package com.amiya.kafka.apachekafkaproducer;
  
// Importing required classes 
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
  
// Annotation
@Configuration
  
// Class
public class KafkaConfig {
  
    // Annotation
    @Bean
  
    // Method
    public ProducerFactory producerFactory()
    {
  
        // Creating a Map
        Map config = new HashMap<>();
  
        // Adding Configuration
  
        // 127.0.0.1:9092 is the default port number for
        // kafka
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "127.0.0.1:9092");
        config.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
        config.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);
  
        return new DefaultKafkaProducerFactory<>(config);
    }
  
    // Annotation
    @Bean
  
    // Method
    public KafkaTemplate kafkaTemplate()
    {
  
        return new KafkaTemplate<>(producerFactory());
    }
}

第 4 步:现在让我们创建一个名为DemoController 的控制器类。

Java

// Java Program to Illustrate DemoController Class
  
package com.amiya.kafka.apachekafkaproducer;
  
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
  
// Annotation
@RestController
  
// Class
public class DemoController {
  
    // Autowiring Kafka Template
    @Autowired KafkaTemplate kafkaTemplate;
  
    private static final String TOPIC = "NewTopic";
  
    // Annotation
    @PostMapping("/publish")
    public String publishMessage(@RequestBody Book book)
    {
        // Sending the message
        kafkaTemplate.send(TOPIC, book);
  
        return "Published Successfully";
    }
}

第 5 步:现在我们必须执行以下操作才能使用 Spring Boot 将 JSON 对象发布到 Kafka 主题

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 收听来自新主题的 JSON 对象

使用此命令运行 Apache Zookeeper 服务器

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

同样,使用此命令运行 Apache Kafka 服务器

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令来监听来自新主题的 JSON 对象

C:\kafka>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic NewTopic --from-beginning

第 6 步:现在运行您的 Spring Boot 应用程序。确保您已更改application.properties文件中的端口号

server.port=8081

让我们在 ApacheKafkaProducerApplication 文件中运行 Spring Boot 应用程序

第 7 步:让我们在 Postman 中测试这个 URL。点击以下 URL,在请求中,body 以 JSON 格式添加您的数据,如下图所示。在响应中,您可以看到“已成功发布”消息已返回。

http://localhost:8081/publish