📅  最后修改于: 2023-12-03 15:09:50.283000             🧑  作者: Mango
Apache Kafka 是一个高吞吐量、分布式的消息队列系统,适用于大规模的实时数据处理,它的消息以字节流的形式存储。本篇文章将介绍如何使用 Apache Kafka 来处理 JSON 格式的消息。
Kafka 中的每个消息都必须属于一个主题,我们需要首先创建一个主题。
可以使用命令行工具来创建一个主题,命令如下:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic json-topic
解释一下每个参数的含义:
--create
: 表示创建一个主题;--zookeeper localhost:2181
: 表示连接 Zookeeper,它负责 Kafaka 的元数据管理;--replication-factor 1
: 表示设置主题的备份数量为 1,一般设置为 3;--partitions 1
: 表示设置主题的频道数量为 1,可以设置为多个;--topic json-topic
: 表示主题的名称。接下来我们需要编写一个生产者,用于向主题发送 JSON 消息。
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONObject;
public class JsonProducer {
public static void main(String[] args) {
// 设置 Kafka 生产者的属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建 Kafka 生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 创建 JSON 对象
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", 1001);
jsonObject.put("name", "Tom");
jsonObject.put("age", 25);
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("json-topic", "key", jsonObject.toString());
producer.send(record);
// 关闭 Kafka 生产者
producer.close();
}
}
解释一下代码:
现在我们需要编写一个消费者,用于从主题中获取 JSON 消息。
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONObject;
public class JsonConsumer {
public static void main(String[] args) {
// 设置 Kafka 消费者的属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建 Kafka 消费者
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("json-topic"));
// 获取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
JSONObject jsonObject = new JSONObject(record.value());
System.out.println(jsonObject);
}
}
}
}
解释一下代码:
javac JsonProducer.java
java JsonProducer
javac JsonConsumer.java
java JsonConsumer
可以看到,消费者成功接收到了生产者发送的 JSON 消息。
本篇文章介绍了如何使用 Apache Kafka 来处理 JSON 消息,步骤包括创建主题、编写生产者代码、编写消费者代码,最后运行代码来进行测试。