📜  Spring Boot Kafka 消费者示例(1)

📅  最后修改于: 2023-12-03 15:35:03             🧑  作者: Mango

Spring Boot Kafka 消费者示例

本文将介绍如何使用 Spring Boot 创建 Kafka 消费者,包括创建 Kafka 消费者配置、编写 Kafka 消费者代码和测试 Kafka 消费者。

准备工作
  1. 安装 ZooKeeper 和 Kafka。
  2. 创建一个 Kafka 主题。
  3. 创建一个 Spring Boot 项目。
创建 Kafka 消费者配置

在 Spring Boot 项目的 application.yml 中添加 Kafka 消费者的配置信息,配置信息包括 Kafka 集群地址、Kafka 消费者组 id 和 Kafka 主题等信息。

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    listener:
      poll-timeout: 3000
      ack-mode: manual_immediate
    properties:
      enable.auto.commit: false
  • bootstrap-servers:Kafka 集群地址。
  • consumer.group-id:Kafka 消费者组 id。
  • consumer.auto-offset-reset:设置消费者在启动时的偏移量。earliest 表示从最早的偏移量开始消费,latest 表示从最新的偏移量开始消费。
  • listener.poll-timeout:设置消费者等待下一条消息到达的时间。如果在此时间内无消息到达,则回调方法将被调用。
  • listener.ack-mode:设置消费者应答模式。manual_immediate 表示手动确认,即消费者接收到一条消息后,必须调用 Acknowledgment.acknowledge() 方法以确认消息已被处理。
  • properties.enable.auto.commit:是否开启自动提交偏移量。该属性值为 false 表示关闭自动提交。
编写 Kafka 消费者代码

创建一个 Kafka 消费者类,使用 @KafkaListener 注解标记消费者监听的主题和回调方法。在回调方法中处理接收到的消息。

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic")
    public void receive(String message, Acknowledgment acknowledgment) {
        System.out.println("Receive message:" + message);
        acknowledgment.acknowledge();
    }
}

@KafkaListener 注解中指定要监听的 Kafka 主题,接收到消息后在回调方法中处理消息。Acknowledgment 参数用于手动确认消息已被处理。

测试 Kafka 消费者

启动 Spring Boot 项目,开始接收 Kafka 消息。可以使用 Kafka 发送消息到指定的主题,观察消费者是否接收到消息。

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String message) {
        kafkaTemplate.send("my-topic", message);
    }
}

以上代码为 Kafka 生产者代码,用于发送消息到 Kafka 主题。在测试时,可以在控制台或其他方式向 Kafka 主题发送消息,观察消费者是否接收到消息。

总结

本文介绍了如何使用 Spring Boot 创建 Kafka 消费者,包括创建 Kafka 消费者配置、编写 Kafka 消费者代码和测试 Kafka 消费者。通过本文的介绍,你可以了解到 Spring Boot 如何与 Kafka 集成,并且可以通过代码示例快速了解 Kafka 消费者的实现过程。