📅  最后修改于: 2023-12-03 15:32:27.300000             🧑  作者: Mango
Kafka 是一个分布式的消息队列系统,广泛应用于数据传输、数据处理、日志记录等场景。在 Node.js 中,可以使用 kafka-node 模块来连接 Kafka,并实现生产消息和消费消息的功能。
本示例将演示如何使用 Node.js 和 kafka-node 模块连接 Kafka,并进行生产和消费消息的操作。
在使用本示例前,需要先安装 Node.js 并安装 kafka-node 模块。
npm install kafka-node
在连接 Kafka 前,需要明确 Kafka 的配置信息,包括 hostname
和 port
。接下来,首先创建一个 Kafka 客户端,然后创建一个生产者和一个消费者:
const kafka = require('kafka-node');
const Producer = kafka.Producer;
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' });
const producer = new Producer(client);
const consumer = new Consumer(client, [{ topic: 'test' }], { autoCommit: false });
其中,kafka.KafkaClient
创建了一个 Kafka 客户端,设定了 Kafka 的 hostname
和 port
,kafka.Producer
和 kafka.Consumer
分别创建了生产者和消费者。kafka.Consumer
的第二个参数是需要订阅的主题(topic),autoCommit: false
关闭自动提交偏移量的功能。
在连接 Kafka 成功后,就可以通过生产者发送消息了。首先,需要检查生产者的状态,然后发送消息:
producer.on('ready', function() {
console.log('Producer is ready');
var message = {
topic: 'test',
messages: ['Hello, Kafka']
};
producer.send([message], function(err, result) {
console.log(err || result);
process.exit();
});
});
producer.on('error', function(err) {
console.error('Error in producer:', err);
});
在生产者连接成功后('ready'
事件),首先输出一条消息,然后发送一个格式化的消息。producer.send
发送消息,第一个参数是一个消息数组,可以一次发送多条消息。第二个参数是回调函数,用于处理发送结果。在发送完成后,关闭生产者的连接。
在连接 Kafka 成功后,就可以通过消费者接收消息了。消费者需要订阅主题,然后监听消息事件:
consumer.on('message', function(message) {
console.log('Message:', message);
});
consumer.on('error', function(err) {
console.error('Error in consumer:', err);
});
在接收到消息后,输出消息内容。'message'
事件用于处理消息,'error'
事件用于处理错误。
将生产消息和消费消息的代码放在同一个文件中,然后运行 Node.js 程序:
node example.js
在生产者发送消息后,消费者便会接收到消息并输出消息内容。如果出现错误,则会输出错误信息。
本示例演示了如何使用 Node.js 和 kafka-node 模块连接 Kafka,并实现生产和消费消息的功能。在实际应用中,可以根据需要定制更复杂的功能,例如消息过滤、消息分区、消息压缩等。同时,也需要注意 Kafka 的配置和性能优化,以保证系统的可靠性和效率。