Apache Kafka 是用于实时数据流的发布-订阅消息队列。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将看到如何从 apache kafka 向 Spring Boot 应用程序的控制台发送字符串消息。
方法:
第 1 步:转到 spring initializr 并创建一个具有以下依赖项的入门项目:
- Apache Kafka 的 Spring
注意:我们也可以创建一个maven项目,在pom.xml文件中加入如下代码。
Xml
org.springframework.boot
spring-boot-starter
org.springframework.kafka
spring-kafka
org.springframework.boot
spring-boot-starter-test
test
org.junit.vintage
junit-vintage-engine
org.springframework.kafka
spring-kafka-test
test
org.springframework.boot
spring-boot-maven-plugin
Java
// Java program to consume string
// messages using spring kafka
@EnableKafka
@Configuration
public class Config {
// Function to establish
// connection between spring
// application and kafka server
@Bean
public ConsumerFactory
consumerFactory()
{
// HashMap to store the configurations
Map map = new HashMap<>();
// put the host IP inn the map
map.put(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
// put the group ID in the map
map.put(ConsumerConfig
.GROUP_ID_CONFIG,
"id");
map.put(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
map.put(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(map);
}
@Bean
public ConcurrentKafkaListenerContainerFactory
kafkaListner()
{
ConcurrentKafkaListenerContainerFactory
obj
= new ConcurrentKafkaListenerContainerFactory<>();
obj.setConsumerFactory(consumerFactory());
return obj;
}
}
Java
@Service
public class KafkaService {
// Annotation required to listen the
// message from kafka server
@KafkaListener(topics = "StringProducer",
groupId = "id")
public void
publish(String message)
{
System.out.println(
"You have a new message: "
+ message);
}
}
第 2 步:在 IDE 中打开项目并同步依赖项。现在创建一个新类Config并添加注释@Configuration和@EnableKafka 。
第 3 步:现在使用 String 对象创建 bean ConsumerFactory和ConcurrentKafkaListenerContainerFactory 。
Java
// Java program to consume string
// messages using spring kafka
@EnableKafka
@Configuration
public class Config {
// Function to establish
// connection between spring
// application and kafka server
@Bean
public ConsumerFactory
consumerFactory()
{
// HashMap to store the configurations
Map map = new HashMap<>();
// put the host IP inn the map
map.put(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
// put the group ID in the map
map.put(ConsumerConfig
.GROUP_ID_CONFIG,
"id");
map.put(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
map.put(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(map);
}
@Bean
public ConcurrentKafkaListenerContainerFactory
kafkaListner()
{
ConcurrentKafkaListenerContainerFactory
obj
= new ConcurrentKafkaListenerContainerFactory<>();
obj.setConsumerFactory(consumerFactory());
return obj;
}
}
第 4 步:创建一个带有@Service注释的KafkaService类。此类将包含在控制台上发布消息的侦听器方法。
Java
@Service
public class KafkaService {
// Annotation required to listen the
// message from kafka server
@KafkaListener(topics = "StringProducer",
groupId = "id")
public void
publish(String message)
{
System.out.println(
"You have a new message: "
+ message);
}
}
第 5 步:使用以下命令启动 zookeeper,然后启动 kafka 服务器。
For windows:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
For mac and linux:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
第 6 步:现在我们需要创建一个名为StringProducer的新主题。为此,请打开一个新的命令提示符窗口并将目录更改为 kafka 目录。使用下面给出的命令创建一个新主题:
// For Mac and Linux
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
// For Windows
.\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name
第 7 步:现在要运行 kafka 生产者控制台,请使用以下命令:
// For Mac and Linux
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example
// For Windows
.\bin\windows\kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example
第 7 步:运行应用程序并在 kafka 生产者上键入消息并按 Enter。
输出:
这里在kafka生产者的字符串输入一条消息
>Hello
>Welcome to GeeksForGeeks
输出: