📅  最后修改于: 2023-12-03 14:56:06.220000             🧑  作者: Mango
在 Kafka 中,消费者可以通过以下几种方式提交偏移量:
Kafka 可以选择将偏移量存储在 Zookeeper 中。在这种情况下,消费者可以直接通过 Zookeeper API 来提交偏移量。调用 commitOffsets()
方法提交偏移量,该方法将偏移量写入 Zookeeper 中的 /consumers/[group_id]/offsets/
路径下对应的节点中。
示例代码:
import { Zookeeper } from 'kafka-node';
const zookeeperClient = new Zookeeper('localhost:2181');
function commitOffsets(groupId: string, topic: string, partition: number, offset: number) {
const offsetPath = `/consumers/${groupId}/offsets/${topic}/${partition}`;
zookeeperClient.a_create(offsetPath, offset.toString(), null, (err: Error) => {
if (err) {
console.error(`Error committing offset: ${err}`);
} else {
console.log('Offset committed successfully');
}
});
}
commitOffsets('my-group', 'my-topic', 0, 100);
Kafka 还可以将偏移量存储在内置的 __consumer_offsets
主题中。在这种情况下,消费者可以通过直接向 __consumer_offsets
主题发送一条消息来提交偏移量。该主题分为多个分区,每个消费者组的偏移量存储在对应分区中的消息中。
示例代码:
import { KafkaProducer, Message } from 'kafka-node';
const kafkaProducer = new KafkaProducer(/* Kafka配置 */);
function commitOffsets(groupId: string, topic: string, partition: number, offset: number) {
const message: Message = {
topic: '__consumer_offsets',
partition: partition,
key: `${groupId}-${topic}-${partition}`,
messages: [JSON.stringify({ offset })],
};
kafkaProducer.send([message], (err: Error) => {
if (err) {
console.error(`Error committing offset: ${err}`);
} else {
console.log('Offset committed successfully');
}
});
}
commitOffsets('my-group', 'my-topic', 0, 100);
最常用的提交偏移量的方式是与 Group coordinator 进行交互。在这种模式下,消费者会定期向 Group coordinator 提交偏移量,并通过心跳机制保持与 Group coordinator 的连接。消费者可以通过设置 autoCommit
配置项来实现自动提交偏移量,或者通过调用 commitOffsets()
方法来手动提交偏移量。
示例代码:
import { KafkaClient, OffsetFetchRequest } from 'kafka-node';
const kafkaClient = new KafkaClient(/* Kafka配置 */);
function commitOffsets(groupId: string, topic: string, partition: number, offset: number) {
const offsetFetchRequest: OffsetFetchRequest = {
groupId: groupId,
topics: [{ topic: topic, partitions: [partition] }],
};
kafkaClient.sendOffsetFetchRequest(offsetFetchRequest, (err: Error, data: any) => {
if (err) {
console.error(`Error committing offset: ${err}`);
} else {
console.log('Offset committed successfully');
}
});
}
commitOffsets('my-group', 'my-topic', 0, 100);
以上是在 Kafka 中提交偏移量的几种常见方式。根据实际需求和使用情况,可以选择适合的提交偏移量的方法。