📅  最后修改于: 2021-01-05 02:49:54             🧑  作者: Mango
通常,Kafka消费者属于特定的消费群体。使用者组基本上代表了应用程序的名称。为了使用使用者组中的消息,使用了-group命令。
让我们看看消费者将如何消费来自Kafka主题的消息:
步骤1:打开Windows命令提示符。
第2步:将“ -group ”命令用作: “ kafka-console-consumer -bootstrap-server localhost:9092 -topic
在上面的快照中,组的名称为“ first_app ”。可以看到没有显示任何消息,因为没有为此主题生成新消息。如果使用“ -from-beginning ”命令,则将显示所有先前的消息。
步骤3:要查看一些新消息,请从生产者控制台生成一些即时消息(如上一节所述)。
因此,生产者产生的新消息可以在消费者的控制台中看到。
第4步:但是,只有一个消费者在组中读取数据。让我们创建更多的消费者来了解消费者群体的力量。为此,请打开一个新终端并键入与以下命令完全相同的使用者命令:
'kafka-console-consumer.bat-引导服务器127.0.0.1:9092 –topic
在上面的快照中,很明显生产者正在将数据发送到Kafka主题。两个使用者正在使用消息。查看消息的顺序。由于为“ myfirst”主题创建了三个分区(前面已经讨论过),因此仅按该顺序拆分消息。
我们可以在同一组中进一步创建更多使用者,每个使用者将根据分区数使用消息。尝试自己更好地理解。
注意:组ID应该相同,然后只有消息将在使用者之间分配。
但是,如果终止任何使用者,则将分区重新分配给活动使用者,并且这些活动使用者将接收到消息。
因此,以这种方式,消费者组中的各个消费者都消费了来自Kafka主题的消息。
生产者在数据中附加了键值后,它将被存储到该指定分区。如果未指定键值,则数据将移动到任何分区。因此,当消费者使用密钥读取消息时,如果未指定密钥,则该消息将显示为空。要使用来自Kafka主题的消息,需要' print.key '和' key.seperator '。使用的命令是:
'kafka-console-consumer -bootstrap-server localhost:9092 -topic
使用上述命令,使用者可以使用指定的键读取数据。
'–from-beginning'命令
该命令用于从头开始阅读消息(前面已经讨论过)。因此,在使用者组中使用它会得到以下输出:
可以注意到,新的使用者组“ second_app”用于从头开始读取消息。如果再执行一次相同的命令,它将不会显示任何输出。这是因为偏移量是在Apache Kafka中提交的。因此,一旦消费者组读取了所有直到写入的消息,下一次它将仅读取新消息。
例如,在下面的快照中,当再次使用'-from-beginning '命令时,仅读取新消息。这是因为所有先前的消息仅在较早时使用。
'kafka-consumer-groups'命令
此命令提供了整个文档,以列出所有组,描述组,删除使用者信息或重置使用者组偏移量。
它需要引导服务器,客户端才能在使用者组上执行不同的功能。
列出消费者群体
“ -list ”命令用于列出Kafka群集中可用的使用者组数量。该命令用作:
'kafka-consumer-groups.bat -bootstrap-server localhost:9092 -list'。
快照如下所示,其中存在三个消费者组。
描述消费者群
' –describe '命令用于描述使用者组。该命令用作:
'kafka-consumer-groups.bat -bootstrap-server localhost:9092-描述组
此命令描述是否存在任何活动的使用者,当前偏移值,滞后值为0-表示使用者已读取所有数据。
偏移在Apache Kafka中提交。因此,如果用户想再次阅读消息,则需要重置偏移值。 ' Kafka-consumer-groups '命令提供了重置偏移量的选项。重置偏移值意味着定义用户要从其再次读取消息的位置。它一次仅支持一个消费者组,并且该组不应有活动实例。
重置偏移量时,用户需要选择三个参数:
有两个执行选项可用:
'-dry-run':这是默认执行选项。此选项用于计划需要重置的偏移量。
-execute':此选项用于更新偏移值。
有以下复位规格可用:
'-to-datetime':根据从datetime开始的偏移量重置偏移量。使用的格式为:“ YYYY-MM-DDTHH:mm:SS.sss”。
'–to-estally':将偏移量重置为最早的偏移量。
'–to-latest':将偏移量重置为最新的偏移量。
'–shift-':通过将当前偏移值偏移'n'来重置偏移。 “ n”的值可以为正或负。
'–from-file':将偏移量重置为CSV文件中定义的值。
'–to-current':将偏移量重置为当前偏移量。
可以定义两个范围:
'-all-topics':重置组中所有可用主题的偏移值。
'-topics':仅重置指定主题的偏移值。用户需要指定主题名称以重置偏移值。
让我们尝试看看:
1)使用-至最早的命令
在上面的快照中,偏移量被重置为新的偏移量0。这是因为使用了“ -to-最早”命令,该命令已将偏移量值重置为0。
2)使用-shift-by命令
在第一个快照中,偏移值从“ 0”移到“ +2”。在第二个中,偏移值从“ 2”转换为“ -1”。
注意:要将偏移值移至正数,无需在其上使用“ +”符号。默认情况下,仅将其视为正数。