📜  弹簧靴 |如何使用 Apache Kafka 使用字符串消息

📅  最后修改于: 2021-09-07 02:08:22             🧑  作者: Mango

Apache Kafka 是一个发布-订阅消息队列,用于实时数据流。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将看到如何从 apache kafka 向 Spring Boot 应用程序的控制台发送字符串消息。
方法:
第 1 步:转到 spring initializr 并创建一个具有以下依赖项的入门项目:

  • Apache Kafka 的 Spring


注意:我们也可以创建一个maven项目,在pom.xml文件中加入如下代码。


        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.kafka
            spring-kafka
        
  
        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        
        
            org.springframework.kafka
            spring-kafka-test
            test
        
    
  
    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

第 2 步:在 IDE 中打开项目并同步依赖项。现在创建一个新类Config并添加注释@Configuration@EnableKafka

第 3 步:现在使用 String 对象创建 bean ConsumerFactoryConcurrentKafkaListenerContainerFactory

// Java program to consume string
// messages using spring kafka
  
@EnableKafka
@Configuration
public class Config {
  
    // Function to establish
    // connection between spring
    // application and kafka server
    @Bean
    public ConsumerFactory
    consumerFactory()
    {
  
        // HashMap to store the configurations
        Map map = new HashMap<>();
  
        // put the host IP inn the map
        map.put(ConsumerConfig
                    .BOOTSTRAP_SERVERS_CONFIG,
                "127.0.0.1:9092");
  
        // put the group ID in the map
        map.put(ConsumerConfig
                    .GROUP_ID_CONFIG,
                "id");
        map.put(ConsumerConfig
                    .KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        map.put(ConsumerConfig
                    .VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
  
        return new DefaultKafkaConsumerFactory<>(map);
    }
  
    @Bean
    public ConcurrentKafkaListenerContainerFactory
    kafkaListner()
    {
        ConcurrentKafkaListenerContainerFactory
            obj
            = new ConcurrentKafkaListenerContainerFactory<>();
        obj.setConsumerFactory(consumerFactory());
        return obj;
    }
}

第 4 步:创建一个带有@Service注释的KafkaService类。此类将包含在控制台上发布消息的侦听器方法。

@Service
public class KafkaService {
  
    // Annotation required to listen the
    // message from kafka server
    @KafkaListener(topics = "StringProducer",
                   groupId = "id")
    public void
    publish(String message)
    {
        System.out.println(
            "You have a new message: "
            + message);
    }
}

第 5 步:使用以下命令启动 zookeeper,然后启动 kafka 服务器。

第 6 步:现在我们需要创建一个名为StringProducer的新主题。为此,请打开一个新的命令提示符窗口并将目录更改为 kafka 目录。使用下面给出的命令创建一个新主题:

第 7 步:现在要运行 kafka 生产者控制台,请使用以下命令:

第 7 步:运行应用程序并在 kafka 生产者上键入消息并按 Enter。
输出:

这里在kafka生产者的字符串输入一条消息

>Hello
>Welcome to GeeksForGeeks

输出: