📜  Apache Kafka教程(1)

📅  最后修改于: 2023-12-03 14:39:16.557000             🧑  作者: Mango

Apache Kafka教程

Apache Kafka是一种高吞吐量,分布式,可扩展的消息代理系统。它主要用于处理实时数据流,提供了一种可靠的数据传输方式。

安装与设置

在开始使用Apache Kafka之前,您需要先安装和设置一些基本工具。

安装Java

由于Apache Kafka是一个Java应用程序,因此您需要首先安装Java。可以从Oracle官方网站免费下载Java。

安装Apache Zookeeper

Apache Kafka需要依赖Apache Zookeeper来执行协调和群集管理。您需要先安装和启动Zookeeper集群,然后再启动Kafka Broker。

下载Zookeeper

安装Apache Kafka

下载Kafka

在解压后的文件夹中,可以找到config文件夹,其中包含了几个配置文件,例如server.properties、producer.properties、consumer.properties等等。

您可以通过更改这些文件的属性,来配置消息队列的操作方式。

生产者和消费者

Apache Kafka的核心是生产者和消费者。

生产者

生产者是一个将消息发送到Kafka Topic的应用程序。它的主要功能包括将消息发送到指定的Kafka Topic,以及处理发送过程中的异常。

在创建生产者之前,您需要先创建一个Kafka Topic。

您可以使用以下命令来创建Kafka Topic:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

上述命令将在本地创建一个新的Topic。

现在,我们可以创建一个简单的生产者。

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {
   public static void main(String[] args) throws Exception{

      String topicName = "test";
      String key = "message-key";
      String value = "message-value";

      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      KafkaProducer<String, String> producer = new KafkaProducer<>(props);
      ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
      producer.send(record);

      producer.close();
   }
}
消费者

消费者是一个从Kafka Topic中消费数据的应用程序。它的主要功能是从Kafka Topic中拉取消息,并处理拉取过程中的异常。

在创建消费者之前,您需要先启动Kafka Broker,并创建一个新的Kafka Topic。

现在,让我们创建一个简单的消费者。

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception{

      String topicName = "test";
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test-group");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList(topicName));
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
      }
   }
}
总结

Apache Kafka是一个高吞吐量的消息代理系统,可以用于处理实时数据流。您需要按照本文所述的步骤,安装和设置Java、Zookeeper,以及Apache Kafka。在完成安装和设置之后,您可以使用生产者和消费者API,来向Kafka Topic发送和接收消息。