📅  最后修改于: 2023-12-03 15:29:25.708000             🧑  作者: Mango
Apache Kafka是一款开源的分布式流处理平台,由Apache软件基金会开发。它是一种高吞吐量、低延迟的消息队列系统,主要用于处理实时数据流。下面我们将介绍它的体系结构。
Kafka是由多个组件组成的,主要分为以下四个组件:
Producer是消息的生产者,负责创建和发送消息到Kafka Broker。发送的消息可以是任何格式的字节,由Producer自己定义。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>("my-topic", "key", "value"));
producer.close();
}
}
Broker是Kafka的核心组件,起到消息存储的作用。它扮演着消息队列的角色,接收Producer发送的消息,并将消息存储在自己的日志文件中,等待Consumer来消费。
无。
Consumer是消息的消费者,负责从Kafka Broker中取出消息并消费。它订阅了一个或多个Topic,并从Broker拉取数据,将数据保存到本地,等待消费者来消费。
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
}
}
}
}
ZooKeeper是一个为分布式应用提供协调服务的开源组件。Kafka使用ZooKeeper来处理Brokers的领导者选举、Broker上下线等任务。
无。
以上就是Apache Kafka的体系结构。它的模块化设计使得它能够处理海量数据,以及高效处理批处理和实时流数据。同时,它的可扩展性和高可用性保证了消息传输的可靠性。