📜  Apache Kafka-简单生产者示例

📅  最后修改于: 2020-12-03 02:58:09             🧑  作者: Mango


让我们创建一个使用Java客户端发布和使用消息的应用程序。 Kafka生产者客户端包含以下API。

KafkaProducer API

让我们了解本节中最重要的Kafka生产者API集。 KafkaProducer API的核心部分是KafkaProducer类。 KafkaProducer类提供了使用以下方法连接其构造函数中的Kafka代理的选项。

  • KafkaProducer类提供了send方法,以将消息异步发送到主题。 send()的签名如下

producer.send(new ProducerRecord(topic, 
partition, key1, value1) , callback);
  • ProducerRecord-生产者管理等待发送记录的缓冲区。

  • 回调-由服务器确认记录时执行的用户提供的回调(空表示没有回调)。

  • KafkaProducer类提供了flush方法,以确保所有先前发送的消息均已实际完成。刷新方法的语法如下-

public void flush()
  • KafkaProducer类提供了partitionFor方法,该方法有助于获取给定主题的分区元数据。这可以用于自定义分区。该方法的签名如下-

public Map metrics()

它返回生产者维护的内部指标图。

  • public void close()-KafkaProducer类提供close方法块,直到所有先前发送的请求完成为止。

生产者API

Producer API的中心部分是Producer类。 Producer类通过以下方法提供了一个在其构造函数中连接Kafka Broker的选项。

生产者阶层

生产者类提供了使用以下签名消息发送到单个或多个主题的send方法。

public void send(KeyedMessaget message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

生产者有两种类型: SyncAsync

相同的API配置也适用于Sync生产者。它们之间的区别是同步生产者直接发送消息,但在后台发送消息。当您想要更高的吞吐量时,首选异步生产器。在像0.8这样的早期版本中,异步生产者没有用于send()的回调来注册错误处理程序。仅在当前的0.9版本中可用。

公共无效close()

生产者类提供关闭方法,以关闭与所有卡夫卡经纪人的生产者池连接。

配置设定

下表列出了Producer API的主要配置设置,以便于更好地理解-

S.No Configuration Settings and Description
1

client.id

identifies producer application

2

producer.type

either sync or async

3

acks

The acks config controls the criteria under producer requests are con-sidered complete.

4

retries

If producer request fails, then automatically retry with specific value.

5

bootstrap.servers

bootstrapping list of brokers.

6

linger.ms

if you want to reduce the number of requests you can set linger.ms to something greater than some value.

7

key.serializer

Key for the serializer interface.

8

value.serializer

value for the serializer interface.

9

batch.size

Buffer size.

10

buffer.memory

controls the total amount of memory available to the producer for buff-ering.

ProducerRecord API

ProducerRecord是一个键/值对,它被发送到Kafka集群。ProducerRecord类的构造函数用于使用以下签名创建具有分区,键和值对的记录。

public ProducerRecord (string topic, int partition, k key, v value)
  • 主题-用户定义的主题名称,将附加到记录中。

  • 分区-分区数

  • 密钥-将包含在记录中的密钥。

  • -记录内容
public ProducerRecord (string topic, k key, v value)

ProducerRecord类构造函数用于创建具有键,值对且无分区的记录。

  • 主题-创建一个主题以分配记录。

  • -记录的键。

  • -记录内容。

public ProducerRecord (string topic, v value)

ProducerRecord类创建一个没有分区和键的记录。

  • 主题-创建一个主题。

  • -记录内容。

下表中列出了ProducerRecord类方法-

S.No Class Methods and Description
1

public string topic()

Topic will append to the record.

2

public K key()

Key that will be included in the record. If no such key, null will be re-turned here.

3

public V value()

Record contents.

4

partition()

Partition count for the record

SimpleProducer应用程序

在创建应用程序之前,首先启动ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中创建自己的主题。之后,创建一个名为Sim-pleProducer.java的Java类,并输入以下代码。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer producer = new KafkaProducer
         (props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

编译-可以使用以下命令来编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行-可以使用以下命令执行应用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer 

输出

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic  —from-beginning
1
2
3
4
5
6
7
8
9
10

简单的消费者示例

截至目前,我们已经创建了一个生产器,用于将消息发送到Kafka集群。现在,让我们创建一个使用者以使用来自Kafka集群的消息。 KafkaConsumer API用于消费来自Kafka集群的消息。 KafkaConsumer类的构造函数在下面定义。

public KafkaConsumer(java.util.Map configs)

configs-返回使用者配置图。

KafkaConsumer类具有以下重要方法,下表中列出了这些方法。

S.No Method and Description
1

public java.util.Set assignment()

Get the set of partitions currently assigned by the con-sumer.

2

public string subscription()

Subscribe to the given list of topics to get dynamically as-signed partitions.

3

public void sub-scribe(java.util.List topics, ConsumerRe-balanceListener listener)

Subscribe to the given list of topics to get dynamically as-signed partitions.

4

public void unsubscribe()

Unsubscribe the topics from the given list of partitions.

5

public void sub-scribe(java.util.List topics)

Subscribe to the given list of topics to get dynamically as-signed partitions. If the given list of topics is empty, it is treated the same as unsubscribe().

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

The argument pattern refers to the subscribing pattern in the format of regular expression and the listener argument gets notifications from the subscribing pattern.

7

public void as-sign(java.util.List partitions)

Manually assign a list of partitions to the customer.

8

poll()

Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. This will return error, if the topics are not subscribed before the polling for data.

9

public void commitSync()

Commit offsets returned on the last poll() for all the sub-scribed list of topics and partitions. The same operation is applied to commitAsyn().

10

public void seek(TopicPartition partition, long offset)

Fetch the current offset value that consumer will use on the next poll() method.

11

public void resume()

Resume the paused partitions.

12

public void wakeup()

Wakeup the consumer.

ConsumerRecord API

ConsumerRecord API用于接收来自Kafka集群的记录。该API由主题名称,分区号(从中接收记录)和指向Kafka分区中的记录的偏移量组成。 ConsumerRecord类用于创建具有特定主题名称,分区计数和<键,值>对的消费者记录。它具有以下签名。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • 主题-从Kafka集群收到的消费者记录的主题名称。

  • 分区-主题分区。

  • -记录的键,如果不存在键,则返回null。

  • -记录内容。

ConsumerRecords API

ConsumerRecords API充当ConsumerRecord的容器。该API用于为特定主题保留每个分区的ConsumerRecord列表。其构造函数定义如下。

public ConsumerRecords(java.util.MapK,V>>> records)
  • TopicPartition-返回特定主题的分区图。

  • 记录-ConsumerRecord的返回列表。

ConsumerRecords类具有以下定义的方法。

S.No Methods and Description
1

public int count()

The number of records for all the topics.

2

public Set partitions()

The set of partitions with data in this record set (if no data was returned then the set is empty).

3

public Iterator iterator()

Iterator enables you to cycle through a collection, obtaining or re-moving elements.

4

public List records()

Get list of records for the given partition.

配置设定

消费者客户端API主要配置设置的配置设置在下面列出-

S.No Settings and Description
1

bootstrap.servers

Bootstrapping list of brokers.

2

group.id

Assigns an individual consumer to a group.

3

enable.auto.commit

Enable auto commit for offsets if the value is true, otherwise not committed.

4

auto.commit.interval.ms

Return how often updated consumed offsets are written to ZooKeeper.

5

session.timeout.ms

Indicates how many milliseconds Kafka will wait for the ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.

SimpleConsumer应用程序

生产者应用程序步骤在此保持不变。首先,启动您的ZooKeeper和Kafka经纪人。然后使用名为SimpleCon-sumer.java的Java类创建一个SimpleConsumer应用程序,并键入以下代码。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer consumer = new KafkaConsumer
         (props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords records = con-sumer.poll(100);
         for (ConsumerRecord record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

编译-可以使用以下命令来编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行-可以使用以下命令执行应用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer 

输入-打开生产者CLI,并向该主题发送一些消息。您可以将少量输入作为“ Hello Consumer”。

输出-以下将是输出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer