ApacheKafka-単純なプロデューサーの例

Javaクライアントを使用してメッセージを公開および消費するためのアプリケーションを作成しましょう。Kafkaプロデューサークライアントは、次のAPIで構成されています。

KafkaProducer API

このセクションでは、KafkaプロデューサーAPIの最も重要なセットを理解しましょう。KafkaProducer APIの中心的な部分は、KafkaProducerクラスです。KafkaProducerクラスは、コンストラクター内のKafkaブローカーを次のメソッドに接続するオプションを提供します。

  • KafkaProducerクラスは、トピックに非同期でメッセージを送信するためのsendメソッドを提供します。send()の署名は次のとおりです

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord −プロデューサーは、送信を待機しているレコードのバッファーを管理します。

  • Callback −レコードがサーバーによって確認されたときに実行するユーザー指定のコールバック(nullはコールバックがないことを示します)。

  • KafkaProducerクラスは、以前に送信されたすべてのメッセージが実際に完了したことを確認するためのフラッシュメソッドを提供します。flushメソッドの構文は次のとおりです-

public void flush()
  • KafkaProducerクラスは、特定のトピックのパーティションメタデータを取得するのに役立つpartitionForメソッドを提供します。これは、カスタムパーティショニングに使用できます。このメソッドのシグネチャは次のとおりです-

public Map metrics()

プロデューサーによって維持されている内部メトリックのマップを返します。

  • public void close()-KafkaProducerクラスは、以前に送信されたすべての要求が完了するまで、closeメソッドブロックを提供します。

プロデューサーAPI

Producer APIの中心的な部分は、Producerクラスです。プロデューサークラスは、次のメソッドによってコンストラクターでKafkaブローカーに接続するオプションを提供します。

プロデューサークラス

プロデューサークラスはsendメソッドをに提供します send 次の署名を使用して、単一または複数のトピックにメッセージを送信します。

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

プロデューサーには2つのタイプがあります– Sync そして Async

同じAPI構成が同期プロデューサーにも適用されます。それらの違いは、同期プロデューサーはメッセージを直接送信しますが、バックグラウンドでメッセージを送信することです。より高いスループットが必要な場合は、非同期プロデューサーをお勧めします。0.8のような以前のリリースでは、非同期プロデューサーには、エラーハンドラーを登録するためのsend()のコールバックがありません。これは、現在のリリース0.9でのみ使用できます。

public void close()

プロデューサークラスは提供します close すべてのKafkaブローカーへのプロデューサープール接続を閉じる方法。

構成設定

わかりやすくするために、ProducerAPIの主な構成設定を次の表に示します。

S.No 構成設定と説明
1

client.id

プロデューサーアプリケーションを識別します

2

producer.type

同期または非同期のいずれか

3

acks

acks構成は、プロデューサー要求の下での基準を制御し、完全であると見なされます。

4

retries

プロデューサーリクエストが失敗した場合は、特定の値で自動的に再試行します。

5

bootstrap.servers

ブローカーのブートストラップリスト。

6

linger.ms

リクエストの数を減らしたい場合は、linger.msをある値よりも大きい値に設定できます。

7

key.serializer

シリアライザーインターフェイスのキー。

8

value.serializer

シリアライザーインターフェイスの値。

9

batch.size

バッファサイズ。

10

buffer.memory

バッファリングのためにプロデューサーが使用できるメモリの合計量を制御します。

ProducerRecord API

ProducerRecordは、Kafka cluster.ProducerRecordクラスコンストラクターに送信されるキーと値のペアであり、次の署名を使用して、パーティション、キー、および値のペアを持つレコードを作成します。

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic −レコードに追加されるユーザー定義のトピック名。

  • Partition −パーティション数

  • Key −レコードに含まれるキー。

  • Value −記録内容
public ProducerRecord (string topic, k key, v value)

ProducerRecordクラスコンストラクターは、キーと値のペアがあり、パーティションがないレコードを作成するために使用されます。

  • Topic −レコードを割り当てるトピックを作成します。

  • Key −レコードのキー。

  • Value −内容を記録します。

public ProducerRecord (string topic, v value)

ProducerRecordクラスは、パーティションとキーなしでレコードを作成します。

  • Topic −トピックを作成します。

  • Value −内容を記録します。

ProducerRecordクラスのメソッドを次の表に示します-

S.No クラスのメソッドと説明
1

public string topic()

トピックがレコードに追加されます。

2

public K key()

レコードに含まれるキー。そのようなキーがない場合、ここでnullが返されます。

3

public V value()

内容を記録します。

4

partition()

レコードのパーティション数

SimpleProducerアプリケーション

アプリケーションを作成する前に、まずZooKeeperとKafkaブローカーを起動し、次にcreatetopicコマンドを使用してKafkaブローカーで独自のトピックを作成します。その後、Sim-pleProducer.javaという名前のJavaクラスを作成し、次のコーディングを入力します。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation −アプリケーションは、次のコマンドを使用してコンパイルできます。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution −以下のコマンドでアプリケーションを実行できます。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

簡単な消費者の例

今のところ、Kafkaクラスターにメッセージを送信するプロデューサーを作成しました。次に、Kafkaクラスターからメッセージを消費するコンシューマーを作成しましょう。KafkaConsumer APIは、Kafkaクラスターからのメッセージを消費するために使用されます。KafkaConsumerクラスコンストラクターは以下で定義されています。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs −コンシューマー構成のマップを返します。

KafkaConsumerクラスには、以下の表にリストされている次の重要なメソッドがあります。

S.No 方法と説明
1

public java.util.Set<TopicPar-tition> assignment()

消費者によって現在割り当てられているパーティションのセットを取得します。

2

public string subscription()

指定されたトピックのリストをサブスクライブして、動的に割り当てられたパーティションを取得します。

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

指定されたトピックのリストをサブスクライブして、動的に割り当てられたパーティションを取得します。

4

public void unsubscribe()

指定されたパーティションのリストからトピックのサブスクライブを解除します。

5

public void sub-scribe(java.util.List<java.lang.String> topics)

指定されたトピックのリストをサブスクライブして、動的に割り当てられたパーティションを取得します。指定されたトピックのリストが空の場合、unsubscribe()と同じように扱われます。

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

引数パターンは正規表現の形式でサブスクライブパターンを参照し、リスナー引数はサブスクライブパターンから通知を受け取ります。

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

パーティションのリストを顧客に手動で割り当てます。

8

poll()

サブスクライブ/割り当てAPIの1つを使用して指定されたトピックまたはパーティションのデータをフェッチします。データのポーリング前にトピックがサブスクライブされていない場合、これはエラーを返します。

9

public void commitSync()

トピックとパーティションのすべてのサブスクライブされたリストについて、最後のpoll()で返されたコミットオフセット。同じ操作がcommitAsyn()に適用されます。

10

public void seek(TopicPartition partition, long offset)

コンシューマーが次のpoll()メソッドで使用する現在のオフセット値を取得します。

11

public void resume()

一時停止したパーティションを再開します。

12

public void wakeup()

消費者を起こします。

ConsumerRecord API

ConsumerRecord APIは、Kafkaクラスターからレコードを受信するために使用されます。このAPIは、トピック名、レコードの受信元のパーティション番号、およびKafkaパーティション内のレコードを指すオフセットで構成されます。ConsumerRecordクラスは、特定のトピック名、パーティション数、および<key、value>のペアを持つコンシューマーレコードを作成するために使用されます。以下の署名があります。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic −Kafkaクラスターから受信したコンシューマーレコードのトピック名。

  • Partition −トピックのパーティション。

  • Key −レコードのキー(キーが存在しない場合)はnullが返されます。

  • Value −内容を記録します。

ConsumerRecords API

ConsumerRecords APIは、ConsumerRecordのコンテナーとして機能します。このAPIは、特定のトピックのパーティションごとのConsumerRecordのリストを保持するために使用されます。そのコンストラクターは以下に定義されています。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition −特定のトピックのパーティションのマップを返します。

  • Records −ConsumerRecordのリストを返します。

ConsumerRecordsクラスには、次のメソッドが定義されています。

S.No メソッドと説明
1

public int count()

すべてのトピックのレコード数。

2

public Set partitions()

このレコードセット内のデータを含むパーティションのセット(データが返されない場合、セットは空です)。

3

public Iterator iterator()

イテレータを使用すると、コレクションを循環して、要素を取得または削除できます。

4

public List records()

指定されたパーティションのレコードのリストを取得します。

構成設定

コンシューマクライアントAPIの主な構成設定の構成設定を以下に示します-

S.No 設定と説明
1

bootstrap.servers

ブローカーのブートストラップリスト。

2

group.id

個々の消費者をグループに割り当てます。

3

enable.auto.commit

値がtrueの場合はオフセットの自動コミットを有効にし、そうでない場合はコミットしません。

4

auto.commit.interval.ms

更新された消費オフセットがZooKeeperに書き込まれる頻度を返します。

5

session.timeout.ms

KafkaがZooKeeperが要求(読み取りまたは書き込み)に応答するのを待ってから、メッセージをあきらめて消費し続けるミリ秒数を示します。

SimpleConsumerアプリケーション

プロデューサーの申請手順はここでも同じです。まず、ZooKeeperとKafkaブローカーを起動します。次に、作成SimpleConsumerの名前のJavaクラスを使用してアプリケーションをSimpleCon-sumer.javaし、次のコードを入力します。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation −アプリケーションは、次のコマンドを使用してコンパイルできます。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − アプリケーションは、次のコマンドを使用して実行できます

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input−プロデューサーCLIを開き、トピックにいくつかのメッセージを送信します。単純な入力を「HelloConsumer」として入力できます。

Output −以下が出力になります。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer