📜  使用 Rest Proxy 将 JSON 数据提供给 Kafka 主题

📅  最后修改于: 2022-05-13 01:58:43.745000             🧑  作者: Mango

使用 Rest Proxy 将 JSON 数据提供给 Kafka 主题

此 POC 描述了使用 Kafka REST 代理将 JSON 格式数据提供给 Kafka 主题的过程,该代理为 Kafka 集群提供 RESTful 接口。

先决条件:

在开始此过程之前,请确保:

  • 对正在运行的 Kafka 虚拟机进行管理访问,并且该虚拟机必须具有加载先决条件中所述的连接。
  • 识别并记下 Zoo-Keeper 主机名和端口。
  • 识别并记下 Kafka 代理的主机名和端口。
  • 识别并记下 Kafka Rest Proxy 的主机名和端口。

注意:此过程假定您已安装 Apache Kafka 发行版。如果您使用的是不同的 Kafka 发行版,您可能需要在此过程中调整某些命令。

在这里,在这个用例中,我们使用以下配置了主机名和端口

  • 休息代理本地主机:8082
  • 动物园管理员本地主机:2182
  • 引导服务器本地主机:9095

将 JSON 数据提供给 Kafka 主题的过程:

第 1 步:登录到 Kafka 虚拟机中的主机。

$ cd kafka_2.12-2.4.0  /*if this directory does not exit, Use ls command to view the folder and copy/paste the existing folder*/ 

要列出该 kafka 主题中存在的所有主题,请使用以下 cmd

$ bin/kafka-topics.sh --list --zookeeper localhost:2182  /*To check/verify and to display all the topics*/  

第 2 步:创建 Kafka 主题。在这里,创建一个名为“topic-test-1”的主题,它只有一个分区和一个副本:

例如:

$ bin/kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic topic-test-1

$ bin/kafka-topics.sh --list --zookeeper localhost:2182  /*To verify or to list out the created topic*/

第 3 步:创建 JSON 文件。在您选择的编辑器中创建一个名为 sample-json-data.json 的文件。

例如:

$ vi sample-json-data.json

然后,粘贴一些json格式的文本并添加到一个文件中,然后保存文件并退出

例如:

{
 "first_name": "Tom",
 "last_name": "Cruze",
 "email": "cruze@gmail.com",
 "gender": "Male",
 "ip_address": "1.2.3.4"
}

第 4 步:将 json 文件的内容流式传输到 Kafka 控制台生产者

$ bin/kafka-console-producer.sh --broker-list localhost:9095 --topic topic-test-1 < sample-json-data.json

第 5 步:通过运行 Kafka 控制台消费者来验证 Kafka 控制台生产者是否将消息发布到主题

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning

第 6 步:将其他 JSON 文件的内容流式传输到 Kafka 控制台生产者

例如:

$ vi sample.json

然后,粘贴一些 JSON 格式的文本并将其添加到文件中,然后保存文件并退出

卡夫卡 REST 代理:

Kafka REST 代理为 Kafka 集群提供了一个 RESTful 接口。它使生产和消费消息、查看集群状态以及执行管理操作变得容易,而无需使用本机 Kafka 协议或客户端。

使用 curl 获取主题列表

$ curl "http://localhost:8082/topics"

获取一个主题的信息

$ curl http://localhost:8082/topics/

例如:

$ curl "http://localhost:8082/topics/topic-test-1"

第 1 步:使用带有主题值的 JSON 生成消息

例如,使用 JSON 向主题 topic-test-1 生成一条消息,其值为 '{ “month”: 12}'

通过运行 Kafka 控制台使用者来验证 Kafka 控制台生产者是否将消息发布到主题

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning

第 2 步:为 JSON 数据创建消费者,从主题的开头开始

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \  http://localhost:8082/consumers/my_json_consumer
/* Expected output from preceding command*/
{
"instance_id":"my_consumer_instance",
"base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance"
OR
"base_uri":"http://rest-proxy:8082/consumers/my_json_consumer/instances/my_consumer_instance"  
}  

第 3 步:登录并订阅主题。

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topic-test-1"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
/* Expected output from preceding command*/
# No content in response

第 4 步:使用第一个响应中的基本 URL 来使用一些数据。

$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

可选步骤:

步骤1:最后,使用DELETE关闭消费者,使其离开组并清理其资源。

$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance
/* Expected output from preceding command*/
# No content in response

第 2 步:使用以下命令验证消费者实例

$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
/* Expected output from preceding command*/
{ “error_code”: 40403, “message”: “Consumer instance not found.” }