📜  弹簧靴 |如何使用 Apache Kafka 使用字符串消息(1)

📅  最后修改于: 2023-12-03 15:09:50.298000             🧑  作者: Mango

弹簧靴 | 如何使用 Apache Kafka 使用字符串消息

如果您正在使用 Apache Kafka 作为消息队列来连接不同的应用程序,那么您可能需要发送和接收字符串消息。本文将介绍如何使用 Apache Kafka 发送和接收字符串消息。

依赖项

在开始之前,您需要确保您的项目中已经添加了 Apache Kafka 客户端依赖项。您可以在 Maven 中添加以下依赖项:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>
发送字符串消息

要发送字符串消息,您需要创建一个 Kafka 生产者,并将一个字符串包装在 ProducerRecord 中,并指定一个主题。以下是如何使用 Java 发送字符串消息:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class StringProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        String topic = "my-topic";
        String message = "Hello, World!";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        producer.send(record);
        producer.close();
    }
}

在上面的示例中,我们创建了一个 KafkaProducer 对象,并设置了一些必要的属性,例如引导服务器和序列化程序。然后,我们创建了一个 ProducerRecord 对象,其中指定了主题和消息,在最后一行使用 producer.send(record) 发送消息。

接收字符串消息

要接收字符串消息,您需要创建一个 Kafka 消费者,并设置要从哪个主题读取消息。以下是如何使用 Java 接收字符串消息:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class StringConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "my-group");

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "my-topic";
        consumer.subscribe(Collections.singleton(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

在上面的示例中,我们创建了一个 KafkaConsumer 对象,并设置了一些必要的属性,例如引导服务器、反序列化程序和消费者组 ID。然后,我们订阅了 my-topic 主题,并使用 consumer.poll(100) 从该主题读取消息。在消息循环中,我们遍历 ConsumerRecords,并针对每个 ConsumerRecord 输出它的偏移量、键和值。

这样,您就可以使用 Apache Kafka 发送和接收字符串消息了。