Apache Kafka - Opérations de base

Commençons par implémenter une configuration de courtier à nœud unique et nous migrerons ensuite notre configuration vers une configuration de courtier à nœud unique.

J'espère que vous auriez déjà installé Java, ZooKeeper et Kafka sur votre machine. Avant de passer à la configuration du cluster Kafka, vous devez d'abord démarrer votre ZooKeeper car Kafka Cluster utilise ZooKeeper.

Démarrez ZooKeeper

Ouvrez un nouveau terminal et tapez la commande suivante -

bin/zookeeper-server-start.sh config/zookeeper.properties

Pour démarrer Kafka Broker, tapez la commande suivante -

bin/kafka-server-start.sh config/server.properties

Après avoir démarré Kafka Broker, tapez la commande jps sur le terminal ZooKeeper et vous verrez la réponse suivante -

821 QuorumPeerMain
928 Kafka
931 Jps

Vous pouvez maintenant voir deux démons s'exécuter sur le terminal où QuorumPeerMain est le démon ZooKeeper et un autre est le démon Kafka.

Configuration de courtier à nœud unique

Dans cette configuration, vous avez une seule instance d'ID de ZooKeeper et de courtier. Voici les étapes pour le configurer -

Creating a Kafka Topic- Kafka fournit un utilitaire de ligne de commande nommé kafka-topics.sh pour créer des rubriques sur le serveur. Ouvrez un nouveau terminal et saisissez l'exemple ci-dessous.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

Nous venons de créer un sujet nommé Hello-Kafka avec une seule partition et un facteur de réplique. La sortie créée ci-dessus sera similaire à la sortie suivante -

Output- Création du sujet Hello-Kafka

Une fois le sujet créé, vous pouvez obtenir la notification dans la fenêtre du terminal du courtier Kafka et le journal du sujet créé spécifié dans «/ tmp / kafka-logs /» dans le fichier config / server.properties.

Liste des sujets

Pour obtenir une liste de sujets dans le serveur Kafka, vous pouvez utiliser la commande suivante -

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

Puisque nous avons créé un sujet, il ne listera que Hello-Kafka . Supposons que si vous créez plusieurs sujets, vous obtiendrez les noms des sujets dans la sortie.

Lancer le producteur pour envoyer des messages

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name

À partir de la syntaxe ci-dessus, deux paramètres principaux sont requis pour le client de ligne de commande du producteur -

Broker-list- La liste des courtiers auxquels nous voulons envoyer les messages. Dans ce cas, nous n'avons qu'un seul courtier. Le fichier Config / server.properties contient l'ID de port du courtier, car nous savons que notre courtier écoute sur le port 9092, vous pouvez donc le spécifier directement.

Nom du sujet - Voici un exemple pour le nom du sujet.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka

Le producteur attendra les entrées de stdin et publiera sur le cluster Kafka. Par défaut, chaque nouvelle ligne est publiée en tant que nouveau message, puis les propriétés du producteur par défaut sont spécifiées dans le fichier config / producteur.properties . Vous pouvez maintenant taper quelques lignes de messages dans le terminal comme indiqué ci-dessous.

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

Commencer le consommateur à recevoir des messages

Comme pour le producteur, les propriétés du consommateur par défaut sont spécifiées dans le fichier config / consumer.proper-tie . Ouvrez un nouveau terminal et saisissez la syntaxe ci-dessous pour consommer les messages.

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message

Enfin, vous pouvez saisir des messages depuis le terminal du producteur et les voir apparaître dans le terminal du consommateur. À partir de maintenant, vous avez une très bonne compréhension du cluster à nœud unique avec un seul courtier. Passons maintenant à la configuration de plusieurs courtiers.

Configuration d'un seul nœud-plusieurs courtiers

Avant de passer à la configuration du cluster à plusieurs courtiers, démarrez d'abord votre serveur ZooKeeper.

Create Multiple Kafka Brokers- Nous avons déjà une instance de courtier Kafka dans con-fig / server.properties. Maintenant, nous avons besoin de plusieurs instances de courtier, donc copiez le fichier server.prop-erties existant dans deux nouveaux fichiers de configuration et renommez-le en server-one.properties et server-two.prop-erties. Ensuite, modifiez les deux nouveaux fichiers et attribuez les modifications suivantes -

config / server-one.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

config / server-two.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

Start Multiple Brokers- Une fois que toutes les modifications ont été apportées sur trois serveurs, ouvrez trois nouveaux terminaux pour démarrer chaque courtier un par un.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties

Maintenant, nous avons trois courtiers différents fonctionnant sur la machine. Essayez-le par vous-même pour vérifier tous les démons en tapantjps sur le terminal ZooKeeper, vous verrez alors la réponse.

Créer un sujet

Attribuons la valeur du facteur de réplication à trois pour ce sujet, car nous avons trois courtiers différents en cours d'exécution. Si vous avez deux courtiers, la valeur de réplique attribuée sera de deux.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output

created topic “Multibrokerapplication”

La commande Describe est utilisée pour vérifier quel courtier écoute le sujet actuellement créé, comme indiqué ci-dessous -

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

À partir de la sortie ci-dessus, nous pouvons conclure que la première ligne donne un résumé de toutes les partitions, montrant le nom du sujet, le nombre de partitions et le facteur de réplication que nous avons déjà choisi. Dans la deuxième ligne, chaque nœud sera le leader pour une partie sélectionnée au hasard des partitions.

Dans notre cas, nous voyons que notre premier courtier (avec broker.id 0) est le leader. Ensuite, Replicas: 0,2,1 signifie que tous les courtiers répliquent le sujet enfin Isr est l'ensemble des répliques synchronisées . Eh bien, c'est le sous-ensemble de répliques qui sont actuellement en vie et rattrapées par le leader.

Lancer le producteur pour envoyer des messages

Cette procédure reste la même que dans la configuration du courtier unique.

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

Commencer le consommateur à recevoir des messages

Cette procédure reste la même que celle indiquée dans la configuration du courtier unique.

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

Opérations de base sur les sujets

Dans ce chapitre, nous aborderons les différentes opérations de base du sujet.

Modifier un sujet

Comme vous l'avez déjà compris, comment créer un sujet dans Kafka Cluster. Modifions maintenant un sujet créé à l'aide de la commande suivante

Syntax

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2

Output

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Supprimer un sujet

Pour supprimer une rubrique, vous pouvez utiliser la syntaxe suivante.

Syntax

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output

> Topic Hello-kafka marked for deletion

Note −Cela n'aura aucun impact si delete.topic.enable n'est pas défini sur true