Apache Kafka - Contoh Grup Konsumen

Kelompok konsumen adalah konsumsi multi-threaded atau multi-mesin dari topik Kafka.

Grup Konsumen

  • Konsumen dapat bergabung dalam grup dengan menggunakan group.id yang sama .

  • Paralelisme maksimum grup adalah jumlah konsumen dalam grup ← jumlah partisi.

  • Kafka menetapkan partisi topik ke konsumen dalam grup, sehingga setiap partisi dikonsumsi oleh satu konsumen dalam grup.

  • Kafka menjamin bahwa sebuah pesan hanya akan dibaca oleh satu konsumen dalam grup.

  • Konsumen dapat melihat pesan tersebut sesuai urutan penyimpanannya di log.

Re-balancing Konsumen

Menambahkan lebih banyak proses / utas akan menyebabkan Kafka menyeimbangkan kembali. Jika ada konsumen atau broker yang gagal mengirim detak jantung ke Zookeeper, maka detak jantung dapat dikonfigurasi ulang melalui klaster Kafka. Selama penyeimbangan ulang ini, Kafka akan menetapkan partisi yang tersedia ke utas yang tersedia, mungkin memindahkan partisi ke proses lain.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Kompilasi

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Eksekusi

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Di sini kami telah membuat nama grup sampel sebagai grup-saya dengan dua konsumen. Demikian pula, Anda dapat membuat grup dan jumlah konsumen di grup.

Memasukkan

Buka produser CLI dan kirim beberapa pesan seperti -

Test consumer group 01
Test consumer group 02

Output dari Proses Pertama

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Output dari Proses Kedua

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Sekarang semoga Anda memahami SimpleConsumer dan ConsumeGroup dengan menggunakan demo klien Java. Sekarang Anda memiliki ide tentang cara mengirim dan menerima pesan menggunakan klien Java. Mari kita lanjutkan integrasi Kafka dengan teknologi data besar di bab berikutnya.