Apache Kafka 是一个发布订阅消息系统。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将看到如何在 Spring Boot 应用程序中向 Apache Kafka 发送 JSON 消息。
为了学习如何创建一个spring boot项目,可以参考这篇文章。
JSON 的完整形式是 JavaScript Object Notation。 JSON 是一种用于数据交换的轻量级数据格式,易于人类读写,易于机器解析和生成。虽然它源自 JavaScript 的一个子集,但它是独立于语言的。它是一种完全独立于语言的文本格式。可以按照以下步骤将 JSON 消息发布到 Apache Kafka:
- 转到 spring initializr 并创建一个具有以下依赖项的入门项目:
- 春网
- Apache Kafka 的 Spring
- 在 IDE 中打开项目并同步依赖项。在本文中,我们将创建一个学生模型,我们将在其中发布学生详细信息。因此,创建一个模型类Student 。添加数据成员并创建构造函数并创建 getter 和 setter。下面是学生类的实现:
// Java program to implement a // student class // Creating a student class public class Student { // Data members of the // student class int id; String firstName; String lastName; // Constructor of the student // class public Student(int id, String firstName, String lastName) { this.id = id; this.firstName = firstName; this.lastName = lastName; } // Implementing the getters // and setters public int getId() { return id; } public void setId(int id) { this.id = id; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } }
- 现在,创建一个带有注释@RestController的新类Controller 。创建一个 GET API 并使用参数作为字符串和模型类对象初始化KafkaTemplate 。下面是控制器的实现:
// Java program to implement a // controller @RestController @RequestMapping("gfg") public class UserResource { @Autowired private KafkaTemplate
kafkaTemplate; private static final String TOPIC = "StudentExample"; @GetMapping("/publish/{id}/" + "{firstName}/{lastName}") public String post( @PathVariable("id") final int id, @PathVariable("firstName") final String firstName, @PathVariable("lastName") final String lastName) { kafkaTemplate.send( TOPIC, new Student( id, firstName, lastName)); return "Published successfully"; } } - 创建一个带有@Configuration注释的StudentConfig类。在这个类中,我们将序列化模型类的对象。
// Java program to serialize the // object of the model class @Configuration public class StudentConfig { @Bean public ProducerFactory
producerFactory() { // Create a map of a string // and object Map config = new HashMap<>(); config.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>( producerFactory()); } } - 现在,启动 zookeeper 和 Kafka 服务器。我们需要创建一个名为StudentExample的新主题。为此,请打开一个新的命令提示符窗口并将目录更改为 Kafka 文件夹。
- 现在,使用下面给出的命令创建一个新主题:
For Mac and Linux: bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
For Windows: .\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
- 现在要实时查看 Kafka 服务器上的消息,请使用以下命令:
For Mac and Linux: bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic_name –from-beginning
For Windows: .\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic topic_name –from-beginning
- 运行应用程序并将 API 调用为:
localhost:8080/gfg/publish/{id}/{first name}/{last name}
注意:如果使用了不同的端口,则将该端口替换为 8080。
输出:
- 调用API:
- 实时查看消息: