Apache Kafka - podstawowe operacje

Najpierw zacznijmy wdrażać konfigurację brokera z jednym węzłem i jednym węzłem, a następnie przeprowadzimy migrację naszej konfiguracji do konfiguracji brokerów z jednym węzłem i wieloma.

Miejmy nadzieję, że już zainstalowałbyś Javę, ZooKeepera i Kafkę na swoim komputerze. Przed przejściem do konfiguracji klastra Kafka, najpierw musisz uruchomić ZooKeeper, ponieważ klaster Kafka używa ZooKeeper.

Uruchom ZooKeeper

Otwórz nowy terminal i wpisz następujące polecenie -

bin/zookeeper-server-start.sh config/zookeeper.properties

Aby uruchomić Kafka Broker, wpisz następujące polecenie -

bin/kafka-server-start.sh config/server.properties

Po uruchomieniu Kafka Broker wpisz polecenie jps na terminalu ZooKeeper, a zobaczysz następującą odpowiedź -

821 QuorumPeerMain
928 Kafka
931 Jps

Teraz możesz zobaczyć dwa demony działające na terminalu, gdzie QuorumPeerMain to demon ZooKeeper, a drugi to demon Kafka.

Konfiguracja brokera z jednym węzłem i jednym węzłem

W tej konfiguracji masz jedną instancję ZooKeepera i identyfikatora brokera. Oto kroki, aby go skonfigurować -

Creating a Kafka Topic- Kafka udostępnia narzędzie wiersza poleceń o nazwie kafka-topics.sh do tworzenia tematów na serwerze. Otwórz nowy terminal i wpisz poniższy przykład.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

Właśnie utworzyliśmy temat o nazwie Hello-Kafka z jedną partycją i jednym współczynnikiem repliki. Powyższy utworzony wynik będzie podobny do następującego wyniku -

Output- Utworzono temat Hello-Kafka

Po utworzeniu tematu można uzyskać powiadomienie w oknie terminala brokera Kafka oraz dziennik utworzonego tematu określony w „/ tmp / kafka-logs /” w pliku config / server.properties.

Lista tematów

Aby uzyskać listę tematów na serwerze Kafka, możesz użyć następującego polecenia -

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

Ponieważ utworzyliśmy temat, wyświetli się tylko Hello-Kafka . Załóżmy, że jeśli utworzysz więcej niż jeden temat, w wyniku otrzymasz nazwy tematów.

Uruchom producenta, aby wysyłać wiadomości

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

Z powyższej składni wymagane są dwa główne parametry dla klienta wiersza poleceń producenta -

Broker-list- Lista brokerów, do których chcemy wysyłać wiadomości. W tym przypadku mamy tylko jednego brokera. Plik Config / server.properties zawiera identyfikator portu brokera, ponieważ wiemy, że nasz broker nasłuchuje na porcie 9092, więc możesz go określić bezpośrednio.

Nazwa tematu - oto przykład nazwy tematu.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

Producent będzie czekał na dane wejściowe ze stdin i publikuje w klastrze Kafka. Domyślnie każda nowa linia jest publikowana jako nowa wiadomość, a następnie domyślne właściwości producenta są określone w pliku config / production.properties . Teraz możesz wpisać kilka wierszy wiadomości w terminalu, jak pokazano poniżej.

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

Uruchom konsumenta, aby otrzymywać wiadomości

Podobnie jak w przypadku producenta, domyślne właściwości konsumenta są określone w pliku config / consumer.proper-ties . Otwórz nowy terminal i wpisz poniższą składnię do używania wiadomości.

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message

Wreszcie możesz wprowadzić wiadomości z terminala producenta i zobaczyć je pojawiające się w terminalu konsumenta. W tej chwili masz bardzo dobrą wiedzę na temat klastra z jednym węzłem z jednym brokerem. Przejdźmy teraz do konfiguracji wielu brokerów.

Konfiguracja brokerów z jednym węzłem i wieloma

Przed przejściem do konfiguracji klastra wielu brokerów, najpierw uruchom serwer ZooKeeper.

Create Multiple Kafka Brokers- Mamy już jedną instancję brokera Kafka w konfiguracji / server.properties. Teraz potrzebujemy wielu instancji brokera, więc skopiuj istniejący plik server.prop-erties do dwóch nowych plików konfiguracyjnych i zmień jego nazwę na server-one.properties i server-two.prop-erties. Następnie edytuj oba nowe pliki i przypisz następujące zmiany -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers- Po wprowadzeniu wszystkich zmian na trzech serwerach otwórz trzy nowe terminale, aby po kolei uruchomić każdego brokera.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

Teraz mamy trzech różnych brokerów działających na komputerze. Spróbuj sam, aby sprawdzić wszystkie demony, wpisującjps na terminalu ZooKeeper, wtedy zobaczysz odpowiedź.

Tworzenie tematu

Przypiszmy wartość współczynnika replikacji jako trzy dla tego tematu, ponieważ mamy uruchomionych trzech różnych brokerów. Jeśli masz dwóch brokerów, przypisana wartość repliki będzie wynosić dwa.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

Polecenie Opisz służy do sprawdzenia, który broker nasłuchuje aktualnie utworzonego tematu, jak pokazano poniżej -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

Z powyższego wyniku możemy wywnioskować, że pierwsza linia zawiera podsumowanie wszystkich partycji, pokazując nazwę tematu, liczbę partycji i współczynnik replikacji, który już wybraliśmy. W drugiej linii każdy węzeł będzie liderem w losowo wybranej części partycji.

W naszym przypadku widzimy, że nasz pierwszy broker (z broker.id 0) jest liderem. Następnie Replicas: 0,2,1 oznacza, że ​​wszyscy brokerzy powielają temat, w końcu Isr jest zbiorem replik synchronizowanych . Cóż, jest to podzbiór replik, które są obecnie żywe i złapane przez lidera.

Uruchom producenta, aby wysyłać wiadomości

Ta procedura pozostaje taka sama, jak w przypadku konfiguracji z jednym brokerem.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

Uruchom konsumenta, aby otrzymywać wiadomości

Ta procedura pozostaje taka sama, jak pokazano w konfiguracji jednego brokera.

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

Podstawowe operacje na tematach

W tym rozdziale omówimy różne podstawowe operacje tematyczne.

Modyfikowanie tematu

Jak już wiesz, jak utworzyć temat w klastrze Kafka. Teraz zmodyfikujmy utworzony temat za pomocą następującego polecenia

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Usuwanie tematu

Aby usunąć temat, możesz użyć następującej składni.

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −Nie będzie to miało wpływu, jeśli delete.topic.enable nie jest ustawiona na true