📜  使用 Rest Proxy 将 JSON 数据提供给 Kafka 主题(1)

📅  最后修改于: 2023-12-03 15:22:16.567000             🧑  作者: Mango

使用 Rest Proxy 将 JSON 数据提供给 Kafka 主题

Kafka 是一种流式处理平台,用于处理实时数据流。Rest Proxy 是一个用于将 HTTP 请求传递到 Kafka 集群的工具,可以用于将 JSON 数据提供给 Kafka 主题。

安装 Rest Proxy

Rest Proxy 是 Kafka 附带的一个工具,您可以使用 Kafka 安装程序中的命令来启动 Rest Proxy。

将 JSON 数据发送到主题

使用 Rest Proxy 将 JSON 数据发送到 Kafka 主题非常简单。您只需要将 JSON 数据以 POST 请求的形式发送到 Rest Proxy 的 URL 即可。例如,假设您正在使用默认端口8082,您可以使用以下命令将 JSON 数据发送到主题my_topic:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      --data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/my_topic"

请注意,您需要使用适当的内容类型设置头文件。此示例使用Kafka JSON 格式 v2。

从主题接收 JSON 数据

您可以使用 Kafka 消费者来从主题中接收 JSON 数据。使用 Kafka JSON 格式 v2 的消费者代码示例:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.*;

public class ConsumerExample {
  public static void main(String[] args) {
    String topicName = "my_topic";
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", Deserializer.class.getName());
    props.put("group.id", "test");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topicName));
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
      }
    }
  }
}
总结

Rest Proxy 是一个将 HTTP 请求传递到 Kafka 集群的工具,可以用于将 JSON 数据提供给 Kafka 主题。要将 JSON 数据发送到主题,请将 JSON 数据以 POST 请求的形式发送到 Rest Proxy 的 URL。您可以使用 Kafka 消费者来从主题中接收 JSON 数据。