📜  kafka nodejs 示例 (1)

📅  最后修改于: 2023-12-03 15:32:27.300000             🧑  作者: Mango

Kafka Node.js 示例

Kafka 是一个分布式的消息队列系统,广泛应用于数据传输、数据处理、日志记录等场景。在 Node.js 中,可以使用 kafka-node 模块来连接 Kafka,并实现生产消息和消费消息的功能。

本示例将演示如何使用 Node.js 和 kafka-node 模块连接 Kafka,并进行生产和消费消息的操作。

安装

在使用本示例前,需要先安装 Node.js 并安装 kafka-node 模块。

npm install kafka-node
连接 Kafka

在连接 Kafka 前,需要明确 Kafka 的配置信息,包括 hostnameport。接下来,首先创建一个 Kafka 客户端,然后创建一个生产者和一个消费者:

const kafka = require('kafka-node');
const Producer = kafka.Producer;
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' });
const producer = new Producer(client);
const consumer = new Consumer(client, [{ topic: 'test' }], { autoCommit: false });

其中,kafka.KafkaClient 创建了一个 Kafka 客户端,设定了 Kafka 的 hostnameportkafka.Producerkafka.Consumer 分别创建了生产者和消费者。kafka.Consumer 的第二个参数是需要订阅的主题(topic),autoCommit: false 关闭自动提交偏移量的功能。

生产消息

在连接 Kafka 成功后,就可以通过生产者发送消息了。首先,需要检查生产者的状态,然后发送消息:

producer.on('ready', function() {
  console.log('Producer is ready');
  var message = {
    topic: 'test',
    messages: ['Hello, Kafka']
  };
  producer.send([message], function(err, result) {
    console.log(err || result);
    process.exit();
  });
});

producer.on('error', function(err) {
  console.error('Error in producer:', err);
});

在生产者连接成功后('ready' 事件),首先输出一条消息,然后发送一个格式化的消息。producer.send 发送消息,第一个参数是一个消息数组,可以一次发送多条消息。第二个参数是回调函数,用于处理发送结果。在发送完成后,关闭生产者的连接。

消费消息

在连接 Kafka 成功后,就可以通过消费者接收消息了。消费者需要订阅主题,然后监听消息事件:

consumer.on('message', function(message) {
  console.log('Message:', message);
});

consumer.on('error', function(err) {
  console.error('Error in consumer:', err);
});

在接收到消息后,输出消息内容。'message' 事件用于处理消息,'error' 事件用于处理错误。

运行示例

将生产消息和消费消息的代码放在同一个文件中,然后运行 Node.js 程序:

node example.js

在生产者发送消息后,消费者便会接收到消息并输出消息内容。如果出现错误,则会输出错误信息。

总结

本示例演示了如何使用 Node.js 和 kafka-node 模块连接 Kafka,并实现生产和消费消息的功能。在实际应用中,可以根据需要定制更复杂的功能,例如消息过滤、消息分区、消息压缩等。同时,也需要注意 Kafka 的配置和性能优化,以保证系统的可靠性和效率。