📅  最后修改于: 2023-12-03 15:09:50.298000             🧑  作者: Mango
如果您正在使用 Apache Kafka 作为消息队列来连接不同的应用程序,那么您可能需要发送和接收字符串消息。本文将介绍如何使用 Apache Kafka 发送和接收字符串消息。
在开始之前,您需要确保您的项目中已经添加了 Apache Kafka 客户端依赖项。您可以在 Maven 中添加以下依赖项:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
要发送字符串消息,您需要创建一个 Kafka 生产者,并将一个字符串包装在 ProducerRecord
中,并指定一个主题。以下是如何使用 Java 发送字符串消息:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class StringProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String topic = "my-topic";
String message = "Hello, World!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
producer.send(record);
producer.close();
}
}
在上面的示例中,我们创建了一个 KafkaProducer
对象,并设置了一些必要的属性,例如引导服务器和序列化程序。然后,我们创建了一个 ProducerRecord
对象,其中指定了主题和消息,在最后一行使用 producer.send(record)
发送消息。
要接收字符串消息,您需要创建一个 Kafka 消费者,并设置要从哪个主题读取消息。以下是如何使用 Java 接收字符串消息:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class StringConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "my-group");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "my-topic";
consumer.subscribe(Collections.singleton(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
在上面的示例中,我们创建了一个 KafkaConsumer
对象,并设置了一些必要的属性,例如引导服务器、反序列化程序和消费者组 ID。然后,我们订阅了 my-topic
主题,并使用 consumer.poll(100)
从该主题读取消息。在消息循环中,我们遍历 ConsumerRecords
,并针对每个 ConsumerRecord
输出它的偏移量、键和值。
这样,您就可以使用 Apache Kafka 发送和接收字符串消息了。