📜  弹簧靴 |如何使用 Apache Kafka 使用 JSON 消息(1)

📅  最后修改于: 2023-12-03 15:09:50.283000             🧑  作者: Mango

弹簧靴 | 如何使用 Apache Kafka 使用 JSON 消息

Apache Kafka 是一个高吞吐量、分布式的消息队列系统,适用于大规模的实时数据处理,它的消息以字节流的形式存储。本篇文章将介绍如何使用 Apache Kafka 来处理 JSON 格式的消息。

1. 准备工作
  1. 安装 Apache Kafka,详细步骤可以参考官方文档:https://kafka.apache.org/quickstart
  2. 安装 Java SDK,版本要求至少为 1.8
2. 创建 Kafka 主题

Kafka 中的每个消息都必须属于一个主题,我们需要首先创建一个主题。

可以使用命令行工具来创建一个主题,命令如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic json-topic

解释一下每个参数的含义:

  • --create: 表示创建一个主题;
  • --zookeeper localhost:2181: 表示连接 Zookeeper,它负责 Kafaka 的元数据管理;
  • --replication-factor 1: 表示设置主题的备份数量为 1,一般设置为 3;
  • --partitions 1: 表示设置主题的频道数量为 1,可以设置为多个;
  • --topic json-topic: 表示主题的名称。
3. 编写生产者代码

接下来我们需要编写一个生产者,用于向主题发送 JSON 消息。

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONObject;

public class JsonProducer {

    public static void main(String[] args) {

        // 设置 Kafka 生产者的属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建 Kafka 生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 创建 JSON 对象
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", 1001);
        jsonObject.put("name", "Tom");
        jsonObject.put("age", 25);

        // 发送消息
        ProducerRecord<String, String> record = new ProducerRecord<>("json-topic", "key", jsonObject.toString());
        producer.send(record);

        // 关闭 Kafka 生产者
        producer.close();
    }

}

解释一下代码:

  1. 创建 KafkaProducer 对象,设置一些属性;
  2. 创建一个 JSON 对象,并添加一些属性;
  3. 创建一个 ProducerRecord 对象,用于封装消息;
  4. 调用 KafkaProducer 的 send() 方法,发送消息;
  5. 关闭 KafkaProducer。
4. 编写消费者代码

现在我们需要编写一个消费者,用于从主题中获取 JSON 消息。

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONObject;

public class JsonConsumer {

    public static void main(String[] args) {

        // 设置 Kafka 消费者的属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "json-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建 Kafka 消费者
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("json-topic"));

        // 获取消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (var record : records) {
                JSONObject jsonObject = new JSONObject(record.value());
                System.out.println(jsonObject);
            }
        }
    }
}

解释一下代码:

  1. 创建 KafkaConsumer 对象,设置一些属性;
  2. 订阅主题;
  3. 调用 KafkaConsumer 的 poll() 方法,获取消息;
  4. 遍历消息,将其转换成 JSON 对象,并打印出来。
5. 运行示例代码
  • 运行生产者代码:
javac JsonProducer.java
java JsonProducer
  • 运行消费者代码:
javac JsonConsumer.java
java JsonConsumer

可以看到,消费者成功接收到了生产者发送的 JSON 消息。

总结

本篇文章介绍了如何使用 Apache Kafka 来处理 JSON 消息,步骤包括创建主题、编写生产者代码、编写消费者代码,最后运行代码来进行测试。