📜  如果在发布到 kafka 期间消息丢失,如何处理异常 - 任何代码示例

📅  最后修改于: 2022-03-11 14:55:55.105000             🧑  作者: Mango

代码示例1
public class ProduceToKafka {

  private ProducerRecord message = null;

 // TracerBulletProducer class has producer properties
  private KafkaProducer myProducer = TracerBulletProducer
      .createProducer();

  public void publishMessage(String string) {

    ProducerRecord message = new ProducerRecord<>(
        "topicName", string);

    myProducer.send(message, new MyCallback(message.key(), message.value()));
  }

  class MyCallback implements Callback {

    private final String key;
    private final String value;

    public MyCallback(String key, String value) {
      this.key = key;
      this.value = value;
    }


    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception == null) {
        log.info("--------> All good !!");
      } else {
        log.info("--------> not so good  !!");
        log.info(metadata.toString());
        log.info("" + metadata.serializedValueSize());
        log.info(exception.getMessage());

      }
    }
  }

}