Apache Kafka - Verbrauchergruppenbeispiel
Die Verbrauchergruppe ist ein Multithread- oder Multimaschinenverbrauch aus Kafka-Themen.
Verbrauchergruppe
Verbraucher können einer Gruppe mit derselben
group.id beitreten.
Die maximale Parallelität einer Gruppe besteht darin, dass die Anzahl der Verbraucher in der Gruppe ← Anzahl der Partitionen beträgt.
Kafka weist dem Verbraucher in einer Gruppe die Partitionen eines Themas zu, sodass jede Partition von genau einem Verbraucher in der Gruppe verwendet wird.
Kafka garantiert, dass eine Nachricht immer nur von einem einzelnen Verbraucher in der Gruppe gelesen wird.
Verbraucher können die Nachricht in der Reihenfolge sehen, in der sie im Protokoll gespeichert wurden.
Neuausgleich eines Verbrauchers
Das Hinzufügen weiterer Prozesse / Threads führt dazu, dass Kafka das Gleichgewicht wieder herstellt. Wenn ein Verbraucher oder Broker keinen Heartbeat an ZooKeeper sendet, kann er über den Kafka-Cluster neu konfiguriert werden. Während dieser Neuausrichtung weist Kafka den verfügbaren Threads verfügbare Partitionen zu und verschiebt möglicherweise eine Partition in einen anderen Prozess.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Zusammenstellung
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
Ausführung
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
Hier haben wir einen Beispielgruppennamen als meine Gruppe
mit zwei Verbrauchern erstellt. Ebenso können Sie Ihre Gruppe und die Anzahl der Verbraucher in der Gruppe erstellen.
Eingang
Öffnen Sie die Produzenten-CLI und senden Sie einige Nachrichten wie -
Test consumer group 01
Test consumer group 02
Ausgabe des ersten Prozesses
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
Ausgabe des zweiten Prozesses
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
Hoffentlich hätten Sie SimpleConsumer und ConsumeGroup mithilfe der Java-Client-Demo verstanden. Jetzt haben Sie eine Idee, wie Sie Nachrichten mit einem Java-Client senden und empfangen können. Lassen Sie uns im nächsten Kapitel die Kafka-Integration mit Big-Data-Technologien fortsetzen.