Apache Kafka-간단한 생산자 예

Java 클라이언트를 사용하여 메시지를 게시하고 소비하기위한 애플리케이션을 만들어 보겠습니다. Kafka 생산자 클라이언트는 다음 API로 구성됩니다.

KafkaProducer API

이 섹션에서 가장 중요한 Kafka 생산자 API 세트를 이해하겠습니다. KafkaProducer API의 중심 부분은 KafkaProducer 클래스입니다. KafkaProducer 클래스는 다음 메서드를 사용하여 생성자에서 Kafka 브로커를 연결하는 옵션을 제공합니다.

  • KafkaProducer 클래스는 주제에 비동기 적으로 메시지를 보내는 send 메소드를 제공합니다. send ()의 서명은 다음과 같습니다.

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord − 생산자는 전송 대기중인 레코드 버퍼를 관리합니다.

  • Callback − 서버가 레코드를 승인했을 때 실행할 사용자 제공 콜백 (null은 콜백이 없음을 나타냄).

  • KafkaProducer 클래스는 이전에 보낸 모든 메시지가 실제로 완료되었는지 확인하는 flush 메서드를 제공합니다. flush 메소드의 구문은 다음과 같습니다.

public void flush()
  • KafkaProducer 클래스는 주어진 주제에 대한 파티션 메타 데이터를 가져 오는 데 도움이되는 partitionFor 메서드를 제공합니다. 사용자 지정 분할에 사용할 수 있습니다. 이 방법의 서명은 다음과 같습니다.

public Map metrics()

생산자가 유지 관리하는 내부 메트릭 맵을 반환합니다.

  • public void close () − KafkaProducer 클래스는 이전에 보낸 모든 요청이 완료 될 때까지 close 메서드 블록을 제공합니다.

생산자 API

Producer API의 중심 부분은 Producer 클래스입니다. Producer 클래스는 다음과 같은 방법으로 생성자에서 Kafka 브로커를 연결하는 옵션을 제공합니다.

생산자 클래스

생산자 클래스는 send 다음 서명을 사용하여 단일 또는 여러 주제에 대한 메시지.

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

생산자에는 두 가지 유형이 있습니다. SyncAsync.

동일한 API 구성이 동기화 생산자 에도 적용됩니다 . 차이점은 동기화 생산자가 메시지를 직접 보내지 만 백그라운드에서 메시지를 보낸다는 것입니다. 더 높은 처리량을 원할 때 비동기 생산자가 선호됩니다. 0.8과 같은 이전 릴리스에서 비동기 생산자는 오류 처리기를 등록하기위한 send ()에 대한 콜백이 없습니다. 이것은 0.9의 현재 릴리스에서만 사용할 수 있습니다.

공개 무효 close ()

Producer 클래스는 close 모든 Kafka 브로커에 대한 생산자 풀 연결을 닫는 방법.

구성 설정

Producer API의 주요 구성 설정은 이해를 돕기 위해 다음 표에 나열되어 있습니다.

S. 아니 구성 설정 및 설명
1

client.id

생산자 애플리케이션 식별

2

producer.type

동기화 또는 비동기

acks

acks 구성은 생산자 요청이 완료된 것으로 간주되는 기준을 제어합니다.

4

retries

생산자 요청이 실패하면 특정 값으로 자동 재 시도합니다.

5

bootstrap.servers

브로커의 부트 스트랩 목록.

6

linger.ms

요청 수를 줄이려면 linger.ms를 어떤 값보다 큰 값으로 설정할 수 있습니다.

7

key.serializer

직렬 변환기 인터페이스의 키입니다.

8

value.serializer

직렬 변환기 인터페이스의 값입니다.

9

batch.size

버퍼 크기.

10

buffer.memory

버퍼링을 위해 생산자가 사용할 수있는 총 메모리 양을 제어합니다.

ProducerRecord API

ProducerRecord는 다음 서명을 사용하여 파티션, 키 및 값 쌍으로 레코드를 생성하기 위해 Kafka cluster.ProducerRecord 클래스 생성자로 전송되는 키 / 값 쌍입니다.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic − 기록에 추가 될 사용자 정의 주제 이름.

  • Partition − 파티션 수

  • Key − 기록에 포함될 키.

  • Value − 내용 기록
public ProducerRecord (string topic, k key, v value)

ProducerRecord 클래스 생성자는 키, 값 쌍이 있고 파티션이없는 레코드를 만드는 데 사용됩니다.

  • Topic − 기록을 할당 할 주제를 만듭니다.

  • Key − 기록을위한 키.

  • Value − 기록 내용.

public ProducerRecord (string topic, v value)

ProducerRecord 클래스는 파티션 및 키없이 레코드를 만듭니다.

  • Topic − 주제를 만듭니다.

  • Value − 기록 내용.

ProducerRecord 클래스 메소드는 다음 표에 나열되어 있습니다.

S. 아니 클래스 방법 및 설명
1

public string topic()

주제가 레코드에 추가됩니다.

2

public K key()

기록에 포함될 키. 그러한 키가 없으면 여기에서 null이 반환됩니다.

public V value()

내용을 기록하십시오.

4

partition()

레코드의 파티션 수

SimpleProducer 애플리케이션

애플리케이션을 생성하기 전에 먼저 ZooKeeper 및 Kafka 브로커를 시작한 다음 create topic 명령을 사용하여 Kafka 브로커에서 고유 한 주제를 생성합니다. 그 후 Sim-pleProducer.java 라는 Java 클래스를 만들고 다음 코딩을 입력하십시오.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation − 다음 명령을 사용하여 응용 프로그램을 컴파일 할 수 있습니다.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − 다음 명령을 사용하여 응용 프로그램을 실행할 수 있습니다.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

간단한 소비자 예

지금까지 Kafka 클러스터에 메시지를 보내는 생산자를 만들었습니다. 이제 Kafka 클러스터에서 메시지를 소비하는 소비자를 만들어 보겠습니다. KafkaConsumer API는 Kafka 클러스터에서 메시지를 사용하는 데 사용됩니다. KafkaConsumer 클래스 생성자는 아래에 정의되어 있습니다.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs − 소비자 구성 맵을 반환합니다.

KafkaConsumer 클래스에는 아래 표에 나열된 다음과 같은 중요한 메서드가 있습니다.

S. 아니 방법 및 설명
1

public java.util.Set<TopicPar-tition> assignment()

소비자가 현재 할당 한 파티션 세트를 가져옵니다.

2

public string subscription()

동적으로 서명 된 파티션을 얻으려면 주어진 주제 목록을 구독하십시오.

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

동적으로 서명 된 파티션을 얻으려면 주어진 주제 목록을 구독하십시오.

4

public void unsubscribe()

주어진 파티션 목록에서 주제를 구독 취소하십시오.

5

public void sub-scribe(java.util.List<java.lang.String> topics)

동적으로 서명 된 파티션을 얻으려면 주어진 주제 목록을 구독하십시오. 주어진 주제 목록이 비어 있으면 unsubscribe ()와 동일하게 처리됩니다.

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

인수 패턴은 정규식 형식의 구독 패턴을 참조하고 리스너 인수는 구독 패턴에서 알림을받습니다.

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

고객에게 파티션 목록을 수동으로 할당합니다.

8

poll()

구독 / 할당 API 중 하나를 사용하여 지정된 주제 또는 파티션에 대한 데이터를 가져옵니다. 데이터를 폴링하기 전에 토픽을 구독하지 않으면 오류가 반환됩니다.

9

public void commitSync()

구독 된 모든 토픽 및 파티션 목록에 대해 마지막 poll ()에서 반환 된 커밋 오프셋입니다. commitAsyn ()에도 동일한 작업이 적용됩니다.

10

public void seek(TopicPartition partition, long offset)

소비자가 다음 poll () 메서드에서 사용할 현재 오프셋 값을 가져옵니다.

11

public void resume()

일시 중지 된 파티션을 재개합니다.

12

public void wakeup()

소비자를 깨우십시오.

ConsumerRecord API

ConsumerRecord API는 Kafka 클러스터에서 레코드를 수신하는 데 사용됩니다. 이 API는 레코드가 수신되는 토픽 이름, 파티션 번호 및 Kafka 파티션의 레코드를 가리키는 오프셋으로 구성됩니다. ConsumerRecord 클래스는 특정 토픽 이름, 파티션 수 및 <key, value> 쌍으로 소비자 레코드를 만드는 데 사용됩니다. 다음과 같은 서명이 있습니다.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic − Kafka 클러스터에서 수신 한 소비자 레코드의 주제 이름.

  • Partition − 주제에 대한 파티션.

  • Key − 키가 존재하지 않는 경우 레코드의 키는 null이 반환됩니다.

  • Value − 내용을 기록합니다.

ConsumerRecords API

ConsumerRecords API는 ConsumerRecord의 컨테이너 역할을합니다. 이 API는 특정 주제에 대한 파티션 당 ConsumerRecord 목록을 유지하는 데 사용됩니다. 생성자는 아래에 정의되어 있습니다.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition − 특정 주제에 대한 파티션 맵을 반환합니다.

  • Records − ConsumerRecord 목록을 반환합니다.

ConsumerRecords 클래스에는 다음과 같은 메서드가 정의되어 있습니다.

S. 아니 방법 및 설명
1

public int count()

모든 주제에 대한 레코드 수입니다.

2

public Set partitions()

이 레코드 세트에 데이터가있는 파티션 세트 (데이터가 리턴되지 않은 경우 세트가 비어 있음).

public Iterator iterator()

Iterator를 사용하면 컬렉션을 순환하고 요소를 가져 오거나 제거 할 수 있습니다.

4

public List records()

주어진 파티션에 대한 레코드 목록을 가져옵니다.

구성 설정

소비자 클라이언트 API 기본 구성 설정에 대한 구성 설정은 다음과 같습니다.

S. 아니 설정 및 설명
1

bootstrap.servers

브로커 목록을 부트 스트랩합니다.

2

group.id

개별 소비자를 그룹에 할당합니다.

enable.auto.commit

값이 true이면 오프셋에 대해 자동 커밋을 활성화하고 그렇지 않으면 커밋되지 않습니다.

4

auto.commit.interval.ms

업데이트 된 소비 오프셋이 ZooKeeper에 기록되는 빈도를 반환합니다.

5

session.timeout.ms

Kafka가 메시지를 포기하고 계속 사용하기 전에 ZooKeeper가 요청 (읽기 또는 쓰기)에 응답 할 때까지 기다리는 시간 (밀리 초)을 나타냅니다.

SimpleConsumer 애플리케이션

생산자 신청 단계는 여기에서 동일하게 유지됩니다. 먼저 ZooKeeper 및 Kafka 브로커를 시작하십시오. 그런 다음 생성 SimpleConsumer의 이름의 자바 클래스와 응용 프로그램 SimpleCon-sumer.java을 하고 다음 코드를 입력합니다.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation − 다음 명령을 사용하여 응용 프로그램을 컴파일 할 수 있습니다.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − 다음 명령을 사용하여 응용 프로그램을 실행할 수 있습니다.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input− 생산자 CLI를 열고 주제에 몇 가지 메시지를 보냅니다. 간단한 입력을 'Hello Consumer'로 입력 할 수 있습니다.

Output − 다음은 출력됩니다.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer