Apache Kafka-소비자 그룹 예

소비자 그룹은 Kafka 주제의 다중 스레드 또는 다중 시스템 소비입니다.

소비자 그룹

  • 소비자는 동일한 group.id 를 사용하여 그룹에 가입 할 수 있습니다 .

  • 그룹의 최대 병렬 처리는 그룹의 소비자 수 ← 파티션 수입니다.

  • Kafka는 그룹의 소비자에게 주제의 파티션을 할당하므로 각 파티션은 그룹의 정확히 한 소비자가 사용합니다.

  • Kafka는 그룹의 단일 소비자 만 메시지를 읽도록 보장합니다.

  • 소비자는 로그에 저장된 순서대로 메시지를 볼 수 있습니다.

소비자의 재조정

더 많은 프로세스 / 스레드를 추가하면 Kafka가 재조정됩니다. 소비자 또는 브로커가 ZooKeeper로 하트 비트를 보내지 못하는 경우 Kafka 클러스터를 통해 다시 구성 할 수 있습니다. 이 재조정 동안 Kafka는 사용 가능한 스레드에 사용 가능한 파티션을 할당하여 파티션을 다른 프로세스로 이동할 수 있습니다.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

편집

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

실행

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

여기 에서는 두 명의 소비자가있는 my-group 으로 샘플 그룹 이름을 만들었습니다 . 마찬가지로 그룹과 그룹의 소비자 수를 만들 수 있습니다.

입력

생산자 CLI를 열고 다음과 같은 메시지를 보냅니다.

Test consumer group 01
Test consumer group 02

첫 번째 프로세스의 출력

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

두 번째 프로세스의 출력

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

이제 Java 클라이언트 데모를 사용하여 SimpleConsumer 및 ConsumeGroup을 이해했을 것입니다. 이제 Java 클라이언트를 사용하여 메시지를 보내고받는 방법에 대한 아이디어를 얻었습니다. 다음 장에서 빅 데이터 기술과 Kafka 통합을 계속하겠습니다.