📜  Spring Boot – 将 JSON 对象传递到 Kafka 主题中(1)

📅  最后修改于: 2023-12-03 14:47:32.922000             🧑  作者: Mango

Spring Boot – 将 JSON 对象传递到 Kafka 主题中

在本篇文章中,我们将介绍如何使用 Spring Boot 将 JSON 对象传递到 Kafka 主题中。

整体思路

首先,我们需要创建一个生产者。生产者会将 JSON 对象转化为字符串,并将其发送到 Kafka 主题中。

其次,我们需要创建一个消费者。消费者会接收来自 Kafka 主题的消息,并将其转化为 JSON 对象。

最后,我们需要使用 Spring Boot 将这两段代码结合起来。Spring Boot 为我们提供了自动化配置和便捷的注入机制,使得我们无需手动配置 Kafka 的生产者和消费者,就可以愉快地完成这个项目。

实现代码

下面我们来看看具体的实现代码。首先是生产者部分。我们需要引入 Kafka 相关的依赖,并创建一个 Kafka 生产者对象。

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    public void sendJsonToKafka(String topic, Object object) throws JsonProcessingException {

        ObjectMapper mapper = new ObjectMapper();
        String jsonString = mapper.writeValueAsString(object);
        kafkaTemplate().send(topic, jsonString);
    }
}

接着是消费者部分。我们需要使用 @KafkaListener 注解来指定要监听的主题并处理消息。

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void consumeJson(String json) throws IOException {

        ObjectMapper mapper = new ObjectMapper();
        JsonNode rootNode = mapper.readTree(json);

        // do something with the json object
    }
}

最后是使用 Spring Boot 将它们结合起来的部分。

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    @Autowired
    private KafkaProducerConfig producer;

    @PostMapping(value = "/send")
    public ResponseEntity<String> sendMessage(@RequestBody Object object) throws JsonProcessingException {

        // send json to kafka
        producer.sendJsonToKafka("test", object);

        return ResponseEntity.ok().body("Message sent to Kafka topic");
    }
}

这里我们创建一个 REST API,接收 JSON 对象并将其发送到 Kafka 主题中。我们使用 @Autowired 来注入生产者的 bean,然后调用 sendJsonToKafka 方法发送消息即可。

总结

通过使用 Spring Boot,我们完成了将 JSON 对象传递到 Kafka 主题中的过程。在这个过程中,我们使用了 Kafka 生产者和消费者,并通过 REST API 发送消息。如果您有类似的需求,可以参考上述代码完成您的项目。