📅  最后修改于: 2023-12-03 15:22:16.567000             🧑  作者: Mango
Kafka 是一种流式处理平台,用于处理实时数据流。Rest Proxy 是一个用于将 HTTP 请求传递到 Kafka 集群的工具,可以用于将 JSON 数据提供给 Kafka 主题。
Rest Proxy 是 Kafka 附带的一个工具,您可以使用 Kafka 安装程序中的命令来启动 Rest Proxy。
使用 Rest Proxy 将 JSON 数据发送到 Kafka 主题非常简单。您只需要将 JSON 数据以 POST 请求的形式发送到 Rest Proxy 的 URL 即可。例如,假设您正在使用默认端口8082,您可以使用以下命令将 JSON 数据发送到主题my_topic:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/my_topic"
请注意,您需要使用适当的内容类型设置头文件。此示例使用Kafka JSON 格式 v2。
您可以使用 Kafka 消费者来从主题中接收 JSON 数据。使用 Kafka JSON 格式 v2 的消费者代码示例:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.*;
public class ConsumerExample {
public static void main(String[] args) {
String topicName = "my_topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", Deserializer.class.getName());
props.put("group.id", "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
Rest Proxy 是一个将 HTTP 请求传递到 Kafka 集群的工具,可以用于将 JSON 数据提供给 Kafka 主题。要将 JSON 数据发送到主题,请将 JSON 数据以 POST 请求的形式发送到 Rest Proxy 的 URL。您可以使用 Kafka 消费者来从主题中接收 JSON 数据。