📜  消费者如何在 Kafka 中提交偏移量?它直接在 Zookeeper 中提交偏移量 它直接向 __consumer_offset 发送一条消息 它与 Group coordinator 交互 无 - TypeScript (1)

📅  最后修改于: 2023-12-03 14:56:06.220000             🧑  作者: Mango

提交偏移量的方法

在 Kafka 中,消费者可以通过以下几种方式提交偏移量:

1. 直接在 Zookeeper 中提交偏移量

Kafka 可以选择将偏移量存储在 Zookeeper 中。在这种情况下,消费者可以直接通过 Zookeeper API 来提交偏移量。调用 commitOffsets() 方法提交偏移量,该方法将偏移量写入 Zookeeper 中的 /consumers/[group_id]/offsets/ 路径下对应的节点中。

示例代码:

import { Zookeeper } from 'kafka-node';

const zookeeperClient = new Zookeeper('localhost:2181');

function commitOffsets(groupId: string, topic: string, partition: number, offset: number) {
  const offsetPath = `/consumers/${groupId}/offsets/${topic}/${partition}`;
  zookeeperClient.a_create(offsetPath, offset.toString(), null, (err: Error) => {
    if (err) {
      console.error(`Error committing offset: ${err}`);
    } else {
      console.log('Offset committed successfully');
    }
  });
}

commitOffsets('my-group', 'my-topic', 0, 100);
2. 直接向 __consumer_offset 发送一条消息

Kafka 还可以将偏移量存储在内置的 __consumer_offsets 主题中。在这种情况下,消费者可以通过直接向 __consumer_offsets 主题发送一条消息来提交偏移量。该主题分为多个分区,每个消费者组的偏移量存储在对应分区中的消息中。

示例代码:

import { KafkaProducer, Message } from 'kafka-node';

const kafkaProducer = new KafkaProducer(/* Kafka配置 */);

function commitOffsets(groupId: string, topic: string, partition: number, offset: number) {
  const message: Message = {
    topic: '__consumer_offsets',
    partition: partition,
    key: `${groupId}-${topic}-${partition}`,
    messages: [JSON.stringify({ offset })],
  };

  kafkaProducer.send([message], (err: Error) => {
    if (err) {
      console.error(`Error committing offset: ${err}`);
    } else {
      console.log('Offset committed successfully');
    }
  });
}

commitOffsets('my-group', 'my-topic', 0, 100);
3. 与 Group coordinator 交互

最常用的提交偏移量的方式是与 Group coordinator 进行交互。在这种模式下,消费者会定期向 Group coordinator 提交偏移量,并通过心跳机制保持与 Group coordinator 的连接。消费者可以通过设置 autoCommit 配置项来实现自动提交偏移量,或者通过调用 commitOffsets() 方法来手动提交偏移量。

示例代码:

import { KafkaClient, OffsetFetchRequest } from 'kafka-node';

const kafkaClient = new KafkaClient(/* Kafka配置 */);

function commitOffsets(groupId: string, topic: string, partition: number, offset: number) {
  const offsetFetchRequest: OffsetFetchRequest = {
    groupId: groupId,
    topics: [{ topic: topic, partitions: [partition] }],
  };

  kafkaClient.sendOffsetFetchRequest(offsetFetchRequest, (err: Error, data: any) => {
    if (err) {
      console.error(`Error committing offset: ${err}`);
    } else {
      console.log('Offset committed successfully');
    }
  });
}

commitOffsets('my-group', 'my-topic', 0, 100);

以上是在 Kafka 中提交偏移量的几种常见方式。根据实际需求和使用情况,可以选择适合的提交偏移量的方法。