弹簧靴 |如何使用 Apache Kafka 使用 JSON 消息
Apache Kafka 是一个流处理系统,可让您在进程、应用程序和服务器之间发送消息。在本文中,我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。
要了解如何创建 Spring Boot 项目,请参阅本文。
工作步骤:
- 转到 Spring 初始化程序并创建一个具有以下依赖项的启动项目:
- Apache Kafka 的春天
- 在 IDE 中打开项目并同步依赖项。在本文中,我们将创建一个学生模型,我们将在其中发布学生详细信息。因此,创建一个模型类Student 。添加数据成员并创建构造函数并覆盖toString方法以查看 JSON 格式的消息。以下是学生类的实现:
Student Model
// Java program to implement a
// student class
// Creating a student class
public class Student {
// Data members of the class
int id;
String firstName;
String lastName;
// Constructor of the student
// class
public Student()
{
}
// Parameterized constructor of
// the student class
public Student(int id, String firstName,
String lastName)
{
this.id = id;
this.firstName = firstName;
this.lastName = lastName;
}
@Override
public String toString()
{
return "Student{"
+ "id = " + id
+ ", firstName = '" + firstName + "'"
+ ", lastName = '" + lastName + "'"
+ "}";
}
}
Config clas
@EnableKafka
@Configuration
public class Config {
// Function to establish a connection
// between Spring application
// and Kafka server
@Bean
public ConsumerFactory
studentConsumer()
{
// HashMap to store the configurations
Map map
= new HashMap<>();
// put the host IP in the map
map.put(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
// put the group ID of consumer in the map
map.put(ConsumerConfig
.GROUP_ID_CONFIG,
"id");
map.put(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
map.put(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// return message in JSON formate
return new DefaultKafkaConsumerFactory<>(
map, new StringDeserializer(),
new JsonDeserializer<>(Student.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory
studentListner()
{
ConcurrentKafkaListenerContainerFactory
factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(studentConsumer());
return factory;
}
}
KafkaService Class
@Service
public class KafkaService {
// Annotation required to listen
// the message from Kafka server
@KafkaListener(topics = "JsonTopic",
groupId = "id", containerFactory
= "studentListner")
public void
publish(Student student)
{
System.out.println("New Entry: "
+ student);
}
}
- 创建一个新类Config并添加注释@Configuration和@EnableKafka 。现在使用 Student 类对象创建 beans ConsumerFactory和ConcurrentKafkaListenerContainerFactory 。
配置类
@EnableKafka
@Configuration
public class Config {
// Function to establish a connection
// between Spring application
// and Kafka server
@Bean
public ConsumerFactory
studentConsumer()
{
// HashMap to store the configurations
Map map
= new HashMap<>();
// put the host IP in the map
map.put(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
// put the group ID of consumer in the map
map.put(ConsumerConfig
.GROUP_ID_CONFIG,
"id");
map.put(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
map.put(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// return message in JSON formate
return new DefaultKafkaConsumerFactory<>(
map, new StringDeserializer(),
new JsonDeserializer<>(Student.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory
studentListner()
{
ConcurrentKafkaListenerContainerFactory
factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(studentConsumer());
return factory;
}
}
- 使用@Service注解创建一个类KafkaService 。此类将包含在控制台上发布消息的侦听器方法。
KafkaService 类
@Service
public class KafkaService {
// Annotation required to listen
// the message from Kafka server
@KafkaListener(topics = "JsonTopic",
groupId = "id", containerFactory
= "studentListner")
public void
publish(Student student)
{
System.out.println("New Entry: "
+ student);
}
}
- 启动 zookeeper 和 Kafka 服务器。现在我们需要创建一个名为JsonTopic的新主题。为此,请打开一个新的命令提示符窗口并将目录更改为 Kafka 目录。
- 现在使用下面给出的命令创建一个新主题:
bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for mac and linux
.\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // for windows
- 现在要运行 Kafka 生产者控制台,请使用以下命令:
bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // for mac and linux
.\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // for windows
- 运行应用程序并在 Kafka 生产者上输入消息,然后按 Enter。