अपाचे काफ्का - उपभोक्ता समूह उदाहरण

उपभोक्ता समूह कफका विषयों से बहु-थ्रेडेड या बहु-मशीन खपत है।

उपभोक्ता समूह

  • उपभोक्ता एक ही group.id का उपयोग करके एक समूह में शामिल हो सकते हैं

  • एक समूह की अधिकतम समानता यह है कि समूह में उपभोक्ताओं की संख्या part विभाजन की संख्या नहीं है।

  • काफ्का एक समूह में उपभोक्ता को एक विषय के विभाजन प्रदान करता है, ताकि प्रत्येक विभाजन समूह में ठीक एक उपभोक्ता द्वारा खपत हो।

  • काफ्का गारंटी देता है कि एक संदेश केवल समूह में किसी एकल उपभोक्ता द्वारा पढ़ा जाता है।

  • उपभोक्ता उस संदेश को देख सकते हैं जिस क्रम में उन्हें लॉग में संग्रहीत किया गया था।

एक उपभोक्ता का पुन: संतुलन

अधिक प्रक्रियाओं / थ्रेड्स को जोड़ने से काफ्का को फिर से संतुलन मिलेगा। यदि कोई भी उपभोक्ता या दलाल दिल की धड़कन को चिड़ियाघर कीपर के पास भेजने में विफल रहता है, तो इसे काफ्का क्लस्टर के माध्यम से फिर से कॉन्फ़िगर किया जा सकता है। इस पुनः संतुलन के दौरान, काफ्का उपलब्ध थ्रेड को उपलब्ध विभाजन प्रदान करेगा, संभवतः एक विभाजन को दूसरी प्रक्रिया में ले जाएगा।

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

संकलन

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

क्रियान्वयन

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

यहां हमने दो उपभोक्ताओं के साथ मेरे समूह के रूप में एक नमूना समूह नाम बनाया है । इसी तरह, आप समूह में अपने समूह और उपभोक्ताओं की संख्या बना सकते हैं।

इनपुट

निर्माता CLI खोलें और कुछ संदेश भेजें -

Test consumer group 01
Test consumer group 02

पहली प्रक्रिया का आउटपुट

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

दूसरी प्रक्रिया का आउटपुट

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

अब उम्मीद है कि आप Java क्लाइंट डेमो का उपयोग करके SimpleConsumer और ConsumeGroup समझ गए होंगे। अब आपके पास जावा क्लाइंट का उपयोग करके संदेश भेजने और प्राप्त करने के बारे में एक विचार है। हमें अगले अध्याय में बड़ी डेटा तकनीकों के साथ काफ्का एकीकरण जारी रखना चाहिए।