使用 Rest Proxy 将 JSON 数据提供给 Kafka 主题
此 POC 描述了使用 Kafka REST 代理将 JSON 格式数据提供给 Kafka 主题的过程,该代理为 Kafka 集群提供 RESTful 接口。
先决条件:
在开始此过程之前,请确保:
- 对正在运行的 Kafka 虚拟机进行管理访问,并且该虚拟机必须具有加载先决条件中所述的连接。
- 识别并记下 Zoo-Keeper 主机名和端口。
- 识别并记下 Kafka 代理的主机名和端口。
- 识别并记下 Kafka Rest Proxy 的主机名和端口。
注意:此过程假定您已安装 Apache Kafka 发行版。如果您使用的是不同的 Kafka 发行版,您可能需要在此过程中调整某些命令。
在这里,在这个用例中,我们使用以下配置了主机名和端口
- 休息代理本地主机:8082
- 动物园管理员本地主机:2182
- 引导服务器本地主机:9095
将 JSON 数据提供给 Kafka 主题的过程:
第 1 步:登录到 Kafka 虚拟机中的主机。
$ cd kafka_2.12-2.4.0 /*if this directory does not exit, Use ls command to view the folder and copy/paste the existing folder*/
要列出该 kafka 主题中存在的所有主题,请使用以下 cmd
$ bin/kafka-topics.sh --list --zookeeper localhost:2182 /*To check/verify and to display all the topics*/
第 2 步:创建 Kafka 主题。在这里,创建一个名为“topic-test-1”的主题,它只有一个分区和一个副本:
例如:
$ bin/kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic topic-test-1
$ bin/kafka-topics.sh --list --zookeeper localhost:2182 /*To verify or to list out the created topic*/
第 3 步:创建 JSON 文件。在您选择的编辑器中创建一个名为 sample-json-data.json 的文件。
例如:
$ vi sample-json-data.json
然后,粘贴一些json格式的文本并添加到一个文件中,然后保存文件并退出
例如:
{
"first_name": "Tom",
"last_name": "Cruze",
"email": "cruze@gmail.com",
"gender": "Male",
"ip_address": "1.2.3.4"
}
第 4 步:将 json 文件的内容流式传输到 Kafka 控制台生产者
$ bin/kafka-console-producer.sh --broker-list localhost:9095 --topic topic-test-1 < sample-json-data.json
第 5 步:通过运行 Kafka 控制台消费者来验证 Kafka 控制台生产者是否将消息发布到主题
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning
第 6 步:将其他 JSON 文件的内容流式传输到 Kafka 控制台生产者
例如:
$ vi sample.json
然后,粘贴一些 JSON 格式的文本并将其添加到文件中,然后保存文件并退出
{ “cust_id”: 1313131, “month”: 12, “expenses”: 1313.13 }
{ “cust_id”: 3535353, “month”: 11, “expenses”: 761.35 }
{ “cust_id”: 7979797, “month”: 10, “expenses”: 4489.00 }
{ “cust_id”: 7979797, “month”: 11, “expenses”: 18.72 }
{ “cust_id”: 3535353, “month”: 10, “expenses”: 6001.94 }
{ “cust_id”: 7979797, “month”: 12, “expenses”: 173.18 }
{ “cust_id”: 1313131, “month”: 10, “expenses”: 492.83 }
{ “cust_id”: 3535353, “month”: 12, “expenses”: 81.12 }
{ “cust_id”: 1313131, “month”: 11, “expenses”: 368.27 }
卡夫卡 REST 代理:
Kafka REST 代理为 Kafka 集群提供了一个 RESTful 接口。它使生产和消费消息、查看集群状态以及执行管理操作变得容易,而无需使用本机 Kafka 协议或客户端。
使用 curl 获取主题列表
$ curl "http://localhost:8082/topics"
获取一个主题的信息
$ curl http://localhost:8082/topics/
例如:
$ curl "http://localhost:8082/topics/topic-test-1"
第 1 步:使用带有主题值的 JSON 生成消息
例如,使用 JSON 向主题 topic-test-1 生成一条消息,其值为 '{ “month”: 12}'
$ curl -X POST -H “Content-Type: application/vnd.kafka.json.v2+json” \
-H “Accept: application/vnd.kafka.v2+json” \
–data ‘{“records”:[{“value”:{“month”: 12}}]}’ “http://localhost:8082/topics/topic-test-1”
/*Expected output from preceding command*/
{
“offsets”:[{“partition”:0,”offset”:16,”error_code”:null,”error”:null}],”key_schema_id”:null,”value_schema_id”:null
}
通过运行 Kafka 控制台使用者来验证 Kafka 控制台生产者是否将消息发布到主题
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9095 --topic topic-test-1 --from-beginning
第 2 步:为 JSON 数据创建消费者,从主题的开头开始
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \ http://localhost:8082/consumers/my_json_consumer
/* Expected output from preceding command*/
{
"instance_id":"my_consumer_instance",
"base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance"
OR
"base_uri":"http://rest-proxy:8082/consumers/my_json_consumer/instances/my_consumer_instance"
}
第 3 步:登录并订阅主题。
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topic-test-1"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
/* Expected output from preceding command*/
# No content in response
第 4 步:使用第一个响应中的基本 URL 来使用一些数据。
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
可选步骤:
步骤1:最后,使用DELETE关闭消费者,使其离开组并清理其资源。
$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance
/* Expected output from preceding command*/
# No content in response
第 2 步:使用以下命令验证消费者实例
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
/* Expected output from preceding command*/
{ “error_code”: 40403, “message”: “Consumer instance not found.” }