Apache Kafka - Tüketici Grubu Örneği

Tüketici grubu, Kafka konularından çok iş parçacıklı veya çok makineli bir tüketimdir.

Tüketici Grubu

  • Tüketiciler, aynı group.id adresini kullanarak bir gruba katılabilir .

  • Bir grubun maksimum paralelliği, gruptaki tüketici sayısı ← bölüm sayısıdır.

  • Kafka, bir konunun bölümlerini bir gruptaki tüketiciye atar, böylece her bölüm gruptaki tam olarak bir tüketici tarafından tüketilir.

  • Kafka, bir mesajın yalnızca gruptaki tek bir tüketici tarafından okunacağını garanti eder.

  • Tüketiciler, mesajı günlükte saklandıkları sırayla görebilirler.

Bir Tüketicinin Yeniden Dengelenmesi

Daha fazla süreç / iş parçacığı eklemek Kafka'nın yeniden dengelenmesine neden olacaktır. Herhangi bir tüketici veya komisyoncu ZooKeeper'a sinyal gönderemezse, Kafka kümesi aracılığıyla yeniden yapılandırılabilir. Bu yeniden dengeleme sırasında Kafka, mevcut bölümleri mevcut iş parçacıkları için atayacak ve muhtemelen bir bölümü başka bir işleme taşıyacaktır.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Derleme

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Yürütme

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Burada , iki tüketicili my-group olarak bir örnek grup adı oluşturduk . Benzer şekilde, grubunuzu ve gruptaki tüketici sayısını oluşturabilirsiniz.

Giriş

Üretici CLI'yi açın ve aşağıdakiler gibi bazı mesajlar gönderin:

Test consumer group 01
Test consumer group 02

İlk İşlemin Çıktısı

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

İkinci Sürecin Çıktısı

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Şimdi umarım SimpleConsumer ve ConsumeGroup'u Java istemci demosunu kullanarak anlamışsınızdır. Artık bir Java istemcisi kullanarak nasıl mesaj gönderip alacağınız hakkında bir fikriniz var. Bir sonraki bölümde büyük veri teknolojileri ile Kafka entegrasyonuna devam edelim.