📅  最后修改于: 2023-12-03 14:39:16.557000             🧑  作者: Mango
Apache Kafka是一种高吞吐量,分布式,可扩展的消息代理系统。它主要用于处理实时数据流,提供了一种可靠的数据传输方式。
在开始使用Apache Kafka之前,您需要先安装和设置一些基本工具。
由于Apache Kafka是一个Java应用程序,因此您需要首先安装Java。可以从Oracle官方网站免费下载Java。
Apache Kafka需要依赖Apache Zookeeper来执行协调和群集管理。您需要先安装和启动Zookeeper集群,然后再启动Kafka Broker。
下载Kafka
在解压后的文件夹中,可以找到config文件夹,其中包含了几个配置文件,例如server.properties、producer.properties、consumer.properties等等。
您可以通过更改这些文件的属性,来配置消息队列的操作方式。
Apache Kafka的核心是生产者和消费者。
生产者是一个将消息发送到Kafka Topic的应用程序。它的主要功能包括将消息发送到指定的Kafka Topic,以及处理发送过程中的异常。
在创建生产者之前,您需要先创建一个Kafka Topic。
您可以使用以下命令来创建Kafka Topic:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
上述命令将在本地创建一个新的Topic。
现在,我们可以创建一个简单的生产者。
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleProducer {
public static void main(String[] args) throws Exception{
String topicName = "test";
String key = "message-key";
String value = "message-value";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
producer.close();
}
}
消费者是一个从Kafka Topic中消费数据的应用程序。它的主要功能是从Kafka Topic中拉取消息,并处理拉取过程中的异常。
在创建消费者之前,您需要先启动Kafka Broker,并创建一个新的Kafka Topic。
现在,让我们创建一个简单的消费者。
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SimpleConsumer {
public static void main(String[] args) throws Exception{
String topicName = "test";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
}
Apache Kafka是一个高吞吐量的消息代理系统,可以用于处理实时数据流。您需要按照本文所述的步骤,安装和设置Java、Zookeeper,以及Apache Kafka。在完成安装和设置之后,您可以使用生产者和消费者API,来向Kafka Topic发送和接收消息。