Apache Kafka - podstawowe operacje
Najpierw zacznijmy wdrażać konfigurację brokera z jednym węzłem i jednym węzłem,
a następnie przeprowadzimy migrację naszej konfiguracji do konfiguracji brokerów z jednym węzłem i wieloma.
Miejmy nadzieję, że już zainstalowałbyś Javę, ZooKeepera i Kafkę na swoim komputerze. Przed przejściem do konfiguracji klastra Kafka, najpierw musisz uruchomić ZooKeeper, ponieważ klaster Kafka używa ZooKeeper.
Uruchom ZooKeeper
Otwórz nowy terminal i wpisz następujące polecenie -
bin/zookeeper-server-start.sh config/zookeeper.properties
Aby uruchomić Kafka Broker, wpisz następujące polecenie -
bin/kafka-server-start.sh config/server.properties
Po uruchomieniu Kafka Broker wpisz polecenie jps
na terminalu ZooKeeper, a zobaczysz następującą odpowiedź -
821 QuorumPeerMain
928 Kafka
931 Jps
Teraz możesz zobaczyć dwa demony działające na terminalu, gdzie QuorumPeerMain to demon ZooKeeper, a drugi to demon Kafka.
Konfiguracja brokera z jednym węzłem i jednym węzłem
W tej konfiguracji masz jedną instancję ZooKeepera i identyfikatora brokera. Oto kroki, aby go skonfigurować -
Creating a Kafka Topic- Kafka udostępnia narzędzie wiersza poleceń o nazwie kafka-topics.sh
do tworzenia tematów na serwerze. Otwórz nowy terminal i wpisz poniższy przykład.
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
Właśnie utworzyliśmy temat o nazwie Hello-Kafka
z jedną partycją i jednym współczynnikiem repliki. Powyższy utworzony wynik będzie podobny do następującego wyniku -
Output- Utworzono temat Hello-Kafka
Po utworzeniu tematu można uzyskać powiadomienie w oknie terminala brokera Kafka oraz dziennik utworzonego tematu określony w „/ tmp / kafka-logs /” w pliku config / server.properties.
Lista tematów
Aby uzyskać listę tematów na serwerze Kafka, możesz użyć następującego polecenia -
Syntax
bin/kafka-topics.sh --list --zookeeper localhost:2181
Output
Hello-Kafka
Ponieważ utworzyliśmy temat, wyświetli się tylko Hello-Kafka
. Załóżmy, że jeśli utworzysz więcej niż jeden temat, w wyniku otrzymasz nazwy tematów.
Uruchom producenta, aby wysyłać wiadomości
Syntax
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
Z powyższej składni wymagane są dwa główne parametry dla klienta wiersza poleceń producenta -
Broker-list- Lista brokerów, do których chcemy wysyłać wiadomości. W tym przypadku mamy tylko jednego brokera. Plik Config / server.properties zawiera identyfikator portu brokera, ponieważ wiemy, że nasz broker nasłuchuje na porcie 9092, więc możesz go określić bezpośrednio.
Nazwa tematu - oto przykład nazwy tematu.
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
Producent będzie czekał na dane wejściowe ze stdin i publikuje w klastrze Kafka. Domyślnie każda nowa linia jest publikowana jako nowa wiadomość, a następnie domyślne właściwości producenta są określone w pliku config / production.properties
. Teraz możesz wpisać kilka wierszy wiadomości w terminalu, jak pokazano poniżej.
Output
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
Uruchom konsumenta, aby otrzymywać wiadomości
Podobnie jak w przypadku producenta, domyślne właściwości konsumenta są określone w pliku config / consumer.proper-ties
. Otwórz nowy terminal i wpisz poniższą składnię do używania wiadomości.
Syntax
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning
Output
Hello
My first message
My second message
Wreszcie możesz wprowadzić wiadomości z terminala producenta i zobaczyć je pojawiające się w terminalu konsumenta. W tej chwili masz bardzo dobrą wiedzę na temat klastra z jednym węzłem z jednym brokerem. Przejdźmy teraz do konfiguracji wielu brokerów.
Konfiguracja brokerów z jednym węzłem i wieloma
Przed przejściem do konfiguracji klastra wielu brokerów, najpierw uruchom serwer ZooKeeper.
Create Multiple Kafka Brokers- Mamy już jedną instancję brokera Kafka w konfiguracji / server.properties. Teraz potrzebujemy wielu instancji brokera, więc skopiuj istniejący plik server.prop-erties do dwóch nowych plików konfiguracyjnych i zmień jego nazwę na server-one.properties i server-two.prop-erties. Następnie edytuj oba nowe pliki i przypisz następujące zmiany -
config / server-one.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
config / server-two.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
Start Multiple Brokers- Po wprowadzeniu wszystkich zmian na trzech serwerach otwórz trzy nowe terminale, aby po kolei uruchomić każdego brokera.
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
Teraz mamy trzech różnych brokerów działających na komputerze. Spróbuj sam, aby sprawdzić wszystkie demony, wpisującjps na terminalu ZooKeeper, wtedy zobaczysz odpowiedź.
Tworzenie tematu
Przypiszmy wartość współczynnika replikacji jako trzy dla tego tematu, ponieważ mamy uruchomionych trzech różnych brokerów. Jeśli masz dwóch brokerów, przypisana wartość repliki będzie wynosić dwa.
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
Output
created topic “Multibrokerapplication”
Polecenie Opisz
służy do sprawdzenia, który broker nasłuchuje aktualnie utworzonego tematu, jak pokazano poniżej -
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Output
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1
Z powyższego wyniku możemy wywnioskować, że pierwsza linia zawiera podsumowanie wszystkich partycji, pokazując nazwę tematu, liczbę partycji i współczynnik replikacji, który już wybraliśmy. W drugiej linii każdy węzeł będzie liderem w losowo wybranej części partycji.
W naszym przypadku widzimy, że nasz pierwszy broker (z broker.id 0) jest liderem. Następnie Replicas: 0,2,1 oznacza, że wszyscy brokerzy powielają temat, w końcu Isr
jest zbiorem replik synchronizowanych
. Cóż, jest to podzbiór replik, które są obecnie żywe i złapane przez lidera.
Uruchom producenta, aby wysyłać wiadomości
Ta procedura pozostaje taka sama, jak w przypadku konfiguracji z jednym brokerem.
Example
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
Output
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message
Uruchom konsumenta, aby otrzymywać wiadomości
Ta procedura pozostaje taka sama, jak pokazano w konfiguracji jednego brokera.
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning
Output
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
Podstawowe operacje na tematach
W tym rozdziale omówimy różne podstawowe operacje tematyczne.
Modyfikowanie tematu
Jak już wiesz, jak utworzyć temat w klastrze Kafka. Teraz zmodyfikujmy utworzony temat za pomocą następującego polecenia
Syntax
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
Example
We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2
Output
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
Usuwanie tematu
Aby usunąć temat, możesz użyć następującej składni.
Syntax
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
Example
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Output
> Topic Hello-kafka marked for deletion
Note −Nie będzie to miało wpływu, jeśli delete.topic.enable nie jest ustawiona na true