📜  kafkajs - Javascript (1)

📅  最后修改于: 2023-12-03 15:32:27.365000             🧑  作者: Mango

KafkaJS - 消息队列库

KafkaJS 是一个用于构建生产级 Apache Kafka 客户端的现代化库,它完全基于 JavaScript 构建。它提供了一个简单的 API 使您可以使用 Kafka 的基本功能,例如自动提交、消费者置备和协调管理等。它非常适用于在 Node.js 中编写高性能的实时数据处理管道或流式处理应用程序。

安装

你可以使用 npm 来安装 KafkaJS:

npm install kafkajs
生产者

下面是将消息发送到 Kafka 集群的简单示例:

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const producer = kafka.producer()

const sendMessage = async () => {
  await producer.connect()
  await producer.send({
    topic: 'my-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })
  await producer.disconnect()
}

sendMessage().catch(console.error)
消费者

下面是消费 Kafka 消息的简单示例:

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const consumeMessage = async () => {
  await consumer.connect()
  await consumer.subscribe({ topic: 'my-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })

  await consumer.disconnect()
}

consumeMessage().catch(console.error)
总结

KafkaJS 是建立在 Node.js 上的 Apache Kafka 库,并提供了生产者和消费者的简单 API。这个库的开发者专注于它的性能和与官方 Kafka 协议的兼容性,是一个可靠的工具,使 Node.js 应用能够无缝地与 Apache Kafka 集成。它是一个适合构建实时数据管道和流式处理应用程序的完美选择。