Apache Kafka - Verbrauchergruppenbeispiel

Die Verbrauchergruppe ist ein Multithread- oder Multimaschinenverbrauch aus Kafka-Themen.

Verbrauchergruppe

  • Verbraucher können einer Gruppe mit derselben group.id beitreten.

  • Die maximale Parallelität einer Gruppe besteht darin, dass die Anzahl der Verbraucher in der Gruppe ← Anzahl der Partitionen beträgt.

  • Kafka weist dem Verbraucher in einer Gruppe die Partitionen eines Themas zu, sodass jede Partition von genau einem Verbraucher in der Gruppe verwendet wird.

  • Kafka garantiert, dass eine Nachricht immer nur von einem einzelnen Verbraucher in der Gruppe gelesen wird.

  • Verbraucher können die Nachricht in der Reihenfolge sehen, in der sie im Protokoll gespeichert wurden.

Neuausgleich eines Verbrauchers

Das Hinzufügen weiterer Prozesse / Threads führt dazu, dass Kafka das Gleichgewicht wieder herstellt. Wenn ein Verbraucher oder Broker keinen Heartbeat an ZooKeeper sendet, kann er über den Kafka-Cluster neu konfiguriert werden. Während dieser Neuausrichtung weist Kafka den verfügbaren Threads verfügbare Partitionen zu und verschiebt möglicherweise eine Partition in einen anderen Prozess.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Zusammenstellung

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Ausführung

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Hier haben wir einen Beispielgruppennamen als meine Gruppe mit zwei Verbrauchern erstellt. Ebenso können Sie Ihre Gruppe und die Anzahl der Verbraucher in der Gruppe erstellen.

Eingang

Öffnen Sie die Produzenten-CLI und senden Sie einige Nachrichten wie -

Test consumer group 01
Test consumer group 02

Ausgabe des ersten Prozesses

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Ausgabe des zweiten Prozesses

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02

Hoffentlich hätten Sie SimpleConsumer und ConsumeGroup mithilfe der Java-Client-Demo verstanden. Jetzt haben Sie eine Idee, wie Sie Nachrichten mit einem Java-Client senden und empfangen können. Lassen Sie uns im nächsten Kapitel die Kafka-Integration mit Big-Data-Technologien fortsetzen.