Apache Kafka - Grundlegende Operationen

Beginnen wir zunächst mit der Implementierung der Konfiguration eines einzelnen Knotens und eines einzelnen Brokers. Anschließend migrieren wir unser Setup auf die Konfiguration eines einzelnen Knotens und mehrerer Broker.

Hoffentlich hätten Sie Java, ZooKeeper und Kafka jetzt auf Ihrem Computer installiert. Bevor Sie zum Kafka Cluster-Setup wechseln, müssen Sie zuerst Ihren ZooKeeper starten, da Kafka Cluster ZooKeeper verwendet.

Starten Sie ZooKeeper

Öffnen Sie ein neues Terminal und geben Sie den folgenden Befehl ein:

bin/zookeeper-server-start.sh config/zookeeper.properties

Geben Sie den folgenden Befehl ein, um Kafka Broker zu starten:

bin/kafka-server-start.sh config/server.properties

Geben Sie nach dem Starten von Kafka Broker den Befehl jps am ZooKeeper-Terminal ein. Die folgende Antwort wird angezeigt:

821 QuorumPeerMain
928 Kafka
931 Jps

Jetzt konnten Sie zwei Dämonen auf dem Terminal sehen, auf denen QuorumPeerMain der ZooKeeper-Dämon und ein weiterer der Kafka-Dämon ist.

Einzelknoten-Einzelbroker-Konfiguration

In dieser Konfiguration haben Sie eine einzelne ZooKeeper- und Broker-ID-Instanz. Im Folgenden finden Sie die Schritte zum Konfigurieren:

Creating a Kafka Topic- Kafka bietet ein Befehlszeilenprogramm namens kafka-topics.sh zum Erstellen von Themen auf dem Server. Öffnen Sie ein neues Terminal und geben Sie das folgende Beispiel ein.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

Wir haben gerade ein Thema namens Hello-Kafka mit einer einzelnen Partition und einem Replikationsfaktor erstellt. Die oben erstellte Ausgabe ähnelt der folgenden Ausgabe:

Output- Erstelltes Thema Hello-Kafka

Sobald das Thema erstellt wurde, können Sie die Benachrichtigung im Kafka-Broker-Terminalfenster und das Protokoll für das erstellte Thema erhalten, das unter „/ tmp / kafka-logs /“ in der Datei config / server.properties angegeben ist.

Liste der Themen

Um eine Liste der Themen auf dem Kafka-Server abzurufen, können Sie den folgenden Befehl verwenden:

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

Da wir ein Thema erstellt haben, wird nur Hello-Kafka aufgelistet. Angenommen, wenn Sie mehr als ein Thema erstellen, erhalten Sie die Themennamen in der Ausgabe.

Starten Sie den Produzenten, um Nachrichten zu senden

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

Aus der obigen Syntax sind zwei Hauptparameter für den Producer-Befehlszeilenclient erforderlich -

Broker-list- Die Liste der Broker, an die wir die Nachrichten senden möchten. In diesem Fall haben wir nur einen Broker. Die Datei Config / server.properties enthält die Broker-Port-ID, da wir wissen, dass unser Broker Port 9092 überwacht, sodass Sie ihn direkt angeben können.

Themenname - Hier ist ein Beispiel für den Themennamen.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

Der Produzent wartet auf die Eingabe von stdin und veröffentlicht im Kafka-Cluster. Standardmäßig wird jede neue Zeile als neue Nachricht veröffentlicht. Anschließend werden die Standardproduzenteneigenschaften in der Datei config / Producer.properties angegeben . Jetzt können Sie einige Zeilen mit Nachrichten in das Terminal eingeben, wie unten gezeigt.

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

Starten Sie Consumer, um Nachrichten zu empfangen

Ähnlich wie bei Producer werden die Standard-Consumer-Eigenschaften in der Datei config / consumer.proper-tie angegeben . Öffnen Sie ein neues Terminal und geben Sie die folgende Syntax ein, um Nachrichten zu konsumieren.

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message

Schließlich können Sie Nachrichten vom Herstellerterminal eingeben und sehen, dass sie im Verbraucherterminal angezeigt werden. Ab sofort haben Sie ein sehr gutes Verständnis für den einzelnen Knotencluster mit einem einzelnen Broker. Kommen wir nun zur Konfiguration mit mehreren Brokern.

Konfiguration mit mehreren Knoten und mehreren Brokern

Starten Sie zunächst Ihren ZooKeeper-Server, bevor Sie mit dem Cluster-Setup für mehrere Broker fortfahren.

Create Multiple Kafka Brokers- Wir haben bereits eine Kafka-Broker-Instanz in con-fig / server.properties. Jetzt benötigen wir mehrere Brokerinstanzen. Kopieren Sie die vorhandene Datei server.prop-erties in zwei neue Konfigurationsdateien und benennen Sie sie in server -one.properties und server-two.prop-erties um. Bearbeiten Sie dann beide neuen Dateien und weisen Sie die folgenden Änderungen zu:

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers- Nachdem alle Änderungen auf drei Servern vorgenommen wurden, öffnen Sie drei neue Terminals, um jeden Broker einzeln zu starten.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

Jetzt laufen drei verschiedene Broker auf der Maschine. Versuchen Sie es selbst, um alle Dämonen durch Eingabe zu überprüfenjps Auf dem ZooKeeper-Terminal wird dann die Antwort angezeigt.

Ein Thema erstellen

Weisen Sie dem Replikationsfaktor für dieses Thema drei zu, da drei verschiedene Broker ausgeführt werden. Wenn Sie zwei Broker haben, beträgt der zugewiesene Replikatwert zwei.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

Mit dem Befehl Beschreiben können Sie überprüfen, welcher Broker das aktuell erstellte Thema abhört (siehe unten).

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

Aus der obigen Ausgabe können wir schließen, dass die erste Zeile eine Zusammenfassung aller Partitionen enthält, die den Themennamen, die Partitionsanzahl und den Replikationsfaktor enthält, den wir bereits ausgewählt haben. In der zweiten Zeile ist jeder Knoten der Anführer für einen zufällig ausgewählten Teil der Partitionen.

In unserem Fall sehen wir, dass unser erster Broker (mit broker.id 0) der Anführer ist. Dann Replikate: 0,2,1 bedeutet, dass alle Broker das Thema replizieren. Schließlich ist Isr die Menge der synchronisierten Replikate. Nun, dies ist die Teilmenge der Repliken, die derzeit am Leben sind und vom Anführer eingeholt werden.

Starten Sie den Produzenten, um Nachrichten zu senden

Dieser Vorgang bleibt derselbe wie beim Setup eines einzelnen Brokers.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

Starten Sie Consumer, um Nachrichten zu empfangen

Dieser Vorgang bleibt derselbe wie im Einzelbroker-Setup.

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

Grundlegende Themenoperationen

In diesem Kapitel werden die verschiedenen grundlegenden Themenoperationen erläutert.

Ein Thema ändern

Wie Sie bereits verstanden haben, erstellen Sie ein Thema in Kafka Cluster. Lassen Sie uns nun ein erstelltes Thema mit dem folgenden Befehl ändern

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Ein Thema löschen

Um ein Thema zu löschen, können Sie die folgende Syntax verwenden.

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −Dies hat keine Auswirkungen, wenn delete.topic.enable ist nicht auf true gesetzt