📜  Apache Kafka体系结构(1)

📅  最后修改于: 2023-12-03 15:29:25.708000             🧑  作者: Mango

Apache Kafka体系结构

Apache Kafka是一款开源的分布式流处理平台,由Apache软件基金会开发。它是一种高吞吐量、低延迟的消息队列系统,主要用于处理实时数据流。下面我们将介绍它的体系结构。

Kafka体系结构

Kafka是由多个组件组成的,主要分为以下四个组件:

  1. Producer
  2. Broker
  3. Consumer
  4. ZooKeeper
Producer

Producer是消息的生产者,负责创建和发送消息到Kafka Broker。发送的消息可以是任何格式的字节,由Producer自己定义。

代码片段

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        producer.send(new ProducerRecord<String, String>("my-topic", "key", "value"));

        producer.close();
    }
}
Broker

Broker是Kafka的核心组件,起到消息存储的作用。它扮演着消息队列的角色,接收Producer发送的消息,并将消息存储在自己的日志文件中,等待Consumer来消费。

代码片段

无。

Consumer

Consumer是消息的消费者,负责从Kafka Broker中取出消息并消费。它订阅了一个或多个Topic,并从Broker拉取数据,将数据保存到本地,等待消费者来消费。

代码片段

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    System.out.println(record.offset() + ": " + record.value()); 
                }
            }
        }
    }
}
ZooKeeper

ZooKeeper是一个为分布式应用提供协调服务的开源组件。Kafka使用ZooKeeper来处理Brokers的领导者选举、Broker上下线等任务。

代码片段

无。

总结

以上就是Apache Kafka的体系结构。它的模块化设计使得它能够处理海量数据,以及高效处理批处理和实时流数据。同时,它的可扩展性和高可用性保证了消息传输的可靠性。