📜  Spring Boot – 使用来自 Kafka 主题的 JSON 对象

📅  最后修改于: 2022-05-13 01:55:05.417000             🧑  作者: Mango

Spring Boot – 使用来自 Kafka 主题的 JSON 对象

Apache Kafka是一个发布-订阅消息系统。消息传递系统允许某人在进程、应用程序和服务器之间发送消息。从广义上讲,Apache Kafka 是可以定义和进一步处理主题(一个主题可能是一个类别)的软件。应用程序可以连接到该系统并将消息传输到主题上。消息可以包含任何类型的信息,来自您个人博客上的任何事件,也可以是非常简单的文本消息,会触发任何其他事件。在此处阅读有关卡夫卡的更多信息。在本文Spring Boot Kafka 消费者示例中,我们讨论了如何使用 Spring Boot 消费来自 Kafka 主题的消息。但是在一个复杂的程序中,我们需要使用来自 Kafka 主题的 JSON 对象。

执行:

第 1 步:转到此链接 https://start.spring.io/ 并创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。

第 2 步:在 Model 包中创建一个名为Book的简单POJO类。下面是本书的代码。 Java文件。

Java
// Java Program to Illustrate Book Class
  
package com.amiya.kafka.apachekafkaconsumer.Model;
  
// Class
public class Book {
  
    // Class data members
    private String bookName;
    private String isbn;
  
    // Constructor 1
    public Book() {}
  
    // Constructor 2
    public Book(String bookName, String isbn)
    {
        // This keyword refers to
        // current instance itself
        this.bookName = bookName;
        this.isbn = isbn;
    }
  
    // Setter
    public String getBookName() { return bookName; }
  
    // Setter
    public void setBookName(String bookName)
    {
        this.bookName = bookName;
    }
  
    // Setter
    public String getIsbn() { return isbn; }
  
    // Setter
    public void setIsbn(String isbn) { this.isbn = isbn; }
}


Java
// Java Program to Illustrate Configuration Class
  
package com.amiya.kafka.apachekafkaconsumer.config;
  
// Importing required classes
import com.amiya.kafka.apachekafkaconsumer.Model.Book;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
  
// Annotation
@EnableKafka
@Configuration
  
// Class
public class KafkaConfig {
  
    @Bean
    public ConsumerFactory consumerFactory()
    {
  
        // Creating a map of string-object type
        Map config = new HashMap<>();
  
        // Adding the Configuration
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,
                   "group_id");
        config.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        config.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);
  
        // Returning message in JSON format
        return new DefaultKafkaConsumerFactory<>(
            config, new StringDeserializer(),
            new JsonDeserializer<>(Book.class));
    }
  
    // Creating a Listener
    @Bean
    public ConcurrentKafkaListenerContainerFactory
    bookListener()
    {
        ConcurrentKafkaListenerContainerFactory<
            String, Book> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
                                                       
        return factory;
    }
}


Java
// Java Program to Illustrate kafka Consumer Class
  
package com.amiya.kafka.apachekafkaconsumer.consumer;
  
// Importing required classes
import com.amiya.kafka.apachekafkaconsumer.Model.Book;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
  
// Annotation
@Component
  
// Class
public class KafkaConsumer {
  
    @KafkaListener(topics = "NewTopic",
                   groupId = "group_id",
                   containerFactory = "bookListener")
  
    // Method
    public void
    consume(Book book)
    {
        // Print statement
        System.out.println("message = " + book);
    }
}


第 3 步:创建一个名为KafkaConfig的配置文件。下面是KafkaConfig 的代码。 Java文件。代码中添加了注释以更详细地理解代码。

例子

Java

// Java Program to Illustrate Configuration Class
  
package com.amiya.kafka.apachekafkaconsumer.config;
  
// Importing required classes
import com.amiya.kafka.apachekafkaconsumer.Model.Book;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
  
// Annotation
@EnableKafka
@Configuration
  
// Class
public class KafkaConfig {
  
    @Bean
    public ConsumerFactory consumerFactory()
    {
  
        // Creating a map of string-object type
        Map config = new HashMap<>();
  
        // Adding the Configuration
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                   "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,
                   "group_id");
        config.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        config.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);
  
        // Returning message in JSON format
        return new DefaultKafkaConsumerFactory<>(
            config, new StringDeserializer(),
            new JsonDeserializer<>(Book.class));
    }
  
    // Creating a Listener
    @Bean
    public ConcurrentKafkaListenerContainerFactory
    bookListener()
    {
        ConcurrentKafkaListenerContainerFactory<
            String, Book> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
                                                       
        return factory;
    }
}

第 4 步:创建一个名为KafkaConsumer的消费者文件。

文件:卡夫卡消费者。Java

Java

// Java Program to Illustrate kafka Consumer Class
  
package com.amiya.kafka.apachekafkaconsumer.consumer;
  
// Importing required classes
import com.amiya.kafka.apachekafkaconsumer.Model.Book;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
  
// Annotation
@Component
  
// Class
public class KafkaConsumer {
  
    @KafkaListener(topics = "NewTopic",
                   groupId = "group_id",
                   containerFactory = "bookListener")
  
    // Method
    public void
    consume(Book book)
    {
        // Print statement
        System.out.println("message = " + book);
    }
}

第 5 步:现在我们必须执行以下操作才能使用 Spring Boot 使用来自 Kafka 主题的消息

  1. 运行 Apache Zookeeper 服务器
  2. 运行 Apache Kafka 服务器
  3. 从 Kafka 主题发送 JSON 对象

使用以下命令运行 Apache Zookeeper 服务器:

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

同样,使用此命令运行 Apache Kafka 服务器

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令从 Kafka 主题发送 JSON 对象

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

第 6 步:现在运行您的 Spring Boot 应用程序。确保您已更改application.properties文件中的端口号。

server.port=8081

让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring Boot 应用程序。

输出:

在输出中,可以看到当您从 Kafka 主题发送 JSON 对象时,它会实时显示在控制台上。

输出