Apache Kafka-소비자 그룹 예
소비자 그룹은 Kafka 주제의 다중 스레드 또는 다중 시스템 소비입니다.
소비자 그룹
소비자는 동일한
group.id
를 사용하여 그룹에 가입 할 수 있습니다.
그룹의 최대 병렬 처리는 그룹의 소비자 수 ← 파티션 수입니다.
Kafka는 그룹의 소비자에게 주제의 파티션을 할당하므로 각 파티션은 그룹의 정확히 한 소비자가 사용합니다.
Kafka는 그룹의 단일 소비자 만 메시지를 읽도록 보장합니다.
소비자는 로그에 저장된 순서대로 메시지를 볼 수 있습니다.
소비자의 재조정
더 많은 프로세스 / 스레드를 추가하면 Kafka가 재조정됩니다. 소비자 또는 브로커가 ZooKeeper로 하트 비트를 보내지 못하는 경우 Kafka 클러스터를 통해 다시 구성 할 수 있습니다. 이 재조정 동안 Kafka는 사용 가능한 스레드에 사용 가능한 파티션을 할당하여 파티션을 다른 프로세스로 이동할 수 있습니다.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
편집
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
실행
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
여기 에서는 두 명의 소비자가있는 my-group
으로 샘플 그룹 이름을 만들었습니다 . 마찬가지로 그룹과 그룹의 소비자 수를 만들 수 있습니다.
입력
생산자 CLI를 열고 다음과 같은 메시지를 보냅니다.
Test consumer group 01
Test consumer group 02
첫 번째 프로세스의 출력
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
두 번째 프로세스의 출력
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
이제 Java 클라이언트 데모를 사용하여 SimpleConsumer 및 ConsumeGroup을 이해했을 것입니다. 이제 Java 클라이언트를 사용하여 메시지를 보내고받는 방법에 대한 아이디어를 얻었습니다. 다음 장에서 빅 데이터 기술과 Kafka 통합을 계속하겠습니다.