📅  最后修改于: 2021-01-05 02:53:39             🧑  作者: Mango
在上一节中,我们了解了生产者如何将数据发送到Kafka。为了更深入地了解数据,即数据是否正确生成,在何处生成,有关其偏移量和分区值等。让我们学习更多。
为了执行回调,用户需要实现一个回调函数。实现此函数是为了异步处理请求完成。这就是为什么它的返回类型将为空。该函数将在生产者向Kafka发送数据的块中实现。无需更改其他代码块。
生产者使用的回调函数是onCompletion()。基本上,此方法需要两个参数:
记录的元数据:记录的元数据意味着获取有关分区及其偏移量的信息。如果不为null,则将引发错误。
异常:处理过程中可能引发以下异常:
1)可重试的异常:此异常表示可能已发送消息。
2)不可检索的异常:此异常引发错误,消息将永远不会发送。
让我们在下面的快照中查看Producer回调的实现:
first_producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
Logger logger=LoggerFactory.getLogger(producer1call.class);
if (e== null) {
logger.info("Successfully received the details as: \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition:" + recordMetadata.partition() + "\n" +
"Offset" + recordMetadata.offset() + "\n" +
"Timestamp" + recordMetadata.timestamp());
}
else {
logger.error("Can't produce,getting error",e);
}
}
});
已创建“记录器”对象,该对象允许导入“ slf4j.Logger ”和“ slf4j.LoggerFactory ”。该记录器对象将记录有关分区,偏移量和时间戳的信息。如果异常值等于null,则记录器将显示该信息,否则将显示错误。执行上述代码后,用户将知道发送消息的主题名称,分区号,时间戳,偏移值。
输出快照如下所示:
在上面的输出中,可以看到消息生成为“ my_first”,存储在具有“偏移值9”的“分区0”中。
注意:到目前为止,我们发送的消息没有密钥,因此,没有密钥的消息将存储在随机分区中并异步运行。
当用户要将消息发送到同一分区时,键变得很有用。为了发送数据,用户需要指定一个密钥。该密钥将从其他分区中唯一标识该分区。用户需要将同步消息发送到Kafka。
下面显示了一种实现密钥的方法:
在上面的快照中,我们指定了主题名称,其值和键。创建ProducerRecord时,将其中三个作为参数传递。如果异常“ e”等于null,则记录器将获取有关密钥的信息。最后,将数据发送到Kafka时,将使用get()函数。此方法同步并强制发送数据。用户可以尝试自己的方式来实现密钥。
注意:使用get(),将显示红色下划线。按alt + enter,它将说出“向方法签名添加例外”,然后选择它。如上所示,这将向main()添加两个异常。同样,它将“ java.util.concurrent.ExecutionException”导入代码。
执行以上代码后,输出显示为:
输出中突出显示的部分告诉您键值,主题名称,分区号,偏移值以及时间戳。消息“ OneTwo”现在将始终转到指定的分区。
因此,通过这种方式,生产者可以使用和不使用密钥将数据发送到Kafka。