Apache Kafka - Exemple de producteur simple
Créons une application pour publier et consommer des messages à l'aide d'un client Java. Le client producteur Kafka comprend les API suivantes.
API KafkaProducer
Laissez-nous comprendre l'ensemble le plus important d'API de producteur Kafka dans cette section. La partie centrale de l'API KafkaProducer
est la classe KafkaProducer
. La classe KafkaProducer fournit une option pour connecter un courtier Kafka dans son constructeur avec les méthodes suivantes.
La classe KafkaProducer fournit une méthode d'envoi pour envoyer des messages de manière asynchrone à une rubrique. La signature de send () est la suivante
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
ProducerRecord - Le producteur gère un buffer d'enregistrements en attente d'envoi.
Callback - Un rappel fourni par l'utilisateur à exécuter lorsque l'enregistrement a été acquitté par le serveur (null indique pas de rappel).
La classe KafkaProducer fournit une méthode de vidage pour garantir que tous les messages précédemment envoyés ont été effectivement terminés. La syntaxe de la méthode flush est la suivante -
public void flush()
La classe KafkaProducer fournit la méthode partitionFor, qui aide à obtenir les métadonnées de partition pour un sujet donné. Cela peut être utilisé pour le partitionnement personnalisé. La signature de cette méthode est la suivante -
public Map metrics()
Il renvoie la carte des métriques internes gérées par le producteur.
public void close () - La classe KafkaProducer fournit des blocs de méthode close jusqu'à ce que toutes les requêtes précédemment envoyées soient terminées.
API du producteur
La partie centrale de l'API Producer
est la classe Producer
. La classe Producer fournit une option pour connecter le courtier Kafka dans son constructeur par les méthodes suivantes.
La classe des producteurs
La classe de producteur fournit une méthode d'envoi à send messages à un ou plusieurs sujets en utilisant les signatures suivantes.
public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);
Il existe deux types de producteurs - Sync et Async.
La même configuration d'API s'applique également au producteur de synchronisation
. La différence entre eux est qu'un producteur de synchronisation envoie des messages directement, mais envoie des messages en arrière-plan. Le producteur asynchrone est préférable lorsque vous souhaitez un débit plus élevé. Dans les versions précédentes comme 0.8, un producteur asynchrone n'a pas de rappel pour send () pour enregistrer les gestionnaires d'erreurs. Ceci n'est disponible que dans la version actuelle de 0.9.
public void close ()
La classe de producteur fournit close méthode pour fermer les connexions du pool de producteurs à tous les courtiers Kafka.
Paramètres de configuration
Les principaux paramètres de configuration de l'API Producer sont répertoriés dans le tableau suivant pour une meilleure compréhension -
S. Non | Paramètres de configuration et description |
---|---|
1 | client.id identifie l'application du producteur |
2 | producer.type soit sync ou async |
3 | acks La configuration d'acks contrôle les critères sous les demandes des producteurs sont considérés comme complets. |
4 | retries Si la demande du producteur échoue, réessayez automatiquement avec une valeur spécifique. |
5 | bootstrap.servers liste de démarrage des courtiers. |
6 | linger.ms si vous souhaitez réduire le nombre de requêtes, vous pouvez définir linger.ms sur une valeur supérieure à une certaine valeur. |
sept | key.serializer Clé de l'interface du sérialiseur. |
8 | value.serializer valeur de l'interface du sérialiseur. |
9 | batch.size Taille du tampon. |
dix | buffer.memory contrôle la quantité totale de mémoire disponible pour le producteur pour la mise en mémoire tampon. |
API ProducerRecord
ProducerRecord est une paire clé / valeur envoyée au constructeur de classe cluster.ProducerRecord Kafka pour créer un enregistrement avec des paires partition, clé et valeur à l'aide de la signature suivante.
public ProducerRecord (string topic, int partition, k key, v value)
Topic - nom de sujet défini par l'utilisateur qui sera ajouté à l'enregistrement.
Partition - nombre de partitions
Key - La clé qui sera incluse dans l'enregistrement.
- Value - Enregistrer le contenu
public ProducerRecord (string topic, k key, v value)
Le constructeur de classe ProducerRecord est utilisé pour créer un enregistrement avec des paires clé, valeur et sans partition.
Topic - Créez un sujet pour attribuer un enregistrement.
Key - clé pour l'enregistrement.
Value - enregistrer le contenu.
public ProducerRecord (string topic, v value)
La classe ProducerRecord crée un enregistrement sans partition ni clé.
Topic - créer un sujet.
Value - enregistrer le contenu.
Les méthodes de la classe ProducerRecord sont répertoriées dans le tableau suivant -
S. Non | Méthodes de classe et description |
---|---|
1 | public string topic() Le sujet s'ajoutera à l'enregistrement. |
2 | public K key() Clé qui sera incluse dans le dossier. Si aucune clé de ce type, nul sera réactivé ici. |
3 | public V value() Enregistrez le contenu. |
4 | partition() Nombre de partitions pour l'enregistrement |
Application SimpleProducer
Avant de créer l'application, démarrez d'abord ZooKeeper et le courtier Kafka, puis créez votre propre rubrique dans le courtier Kafka à l'aide de la commande create topic. Ensuite, créez une classe java nommée Sim-pleProducer.java
et saisissez le codage suivant.
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
Compilation - L'application peut être compilée à l'aide de la commande suivante.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution - L'application peut être exécutée à l'aide de la commande suivante.
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
Output
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10
Exemple de consommateur simple
À partir de maintenant, nous avons créé un producteur pour envoyer des messages au cluster Kafka. Créons maintenant un consommateur pour consommer les messages du cluster Kafka. L'API KafkaConsumer est utilisée pour consommer les messages du cluster Kafka. Le constructeur de classe KafkaConsumer est défini ci-dessous.
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - Renvoie une carte des configurations des consommateurs.
La classe KafkaConsumer possède les méthodes significatives suivantes qui sont répertoriées dans le tableau ci-dessous.
S. Non | Méthode et description |
---|---|
1 | public java.util.Set<TopicPar-tition> assignment() Récupère l'ensemble des partitions actuellement assignées par le consommateur. |
2 | public string subscription() Abonnez-vous à la liste de sujets donnée pour obtenir dynamiquement des partitions as-signées. |
3 | public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) Abonnez-vous à la liste de sujets donnée pour obtenir dynamiquement des partitions as-signées. |
4 | public void unsubscribe() Désabonnez les rubriques de la liste de partitions donnée. |
5 | public void sub-scribe(java.util.List<java.lang.String> topics) Abonnez-vous à la liste de sujets donnée pour obtenir dynamiquement des partitions as-signées. Si la liste de sujets donnée est vide, elle est traitée de la même manière que unsubscribe (). |
6 | public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) Le modèle d'argument fait référence au modèle d'abonnement au format d'expression régulière et l'argument d'écoute reçoit des notifications du modèle d'abonnement. |
sept | public void as-sign(java.util.List<TopicParti-tion> partitions) Attribuez manuellement une liste de partitions au client. |
8 | poll() Récupérez les données des rubriques ou des partitions spécifiées à l'aide de l'une des API d'abonnement / d'attribution. Cela renverra une erreur, si les sujets ne sont pas abonnés avant l'interrogation des données. |
9 | public void commitSync() Les décalages de validation renvoyés lors du dernier sondage () pour toutes les listes de sujets et de partitions sous-marquées. La même opération est appliquée à commitAsyn (). |
dix | public void seek(TopicPartition partition, long offset) Récupère la valeur de décalage actuelle que le consommateur utilisera lors de la prochaine méthode poll (). |
11 | public void resume() Reprenez les partitions en pause. |
12 | public void wakeup() Réveillez le consommateur. |
API ConsumerRecord
L'API ConsumerRecord est utilisée pour recevoir des enregistrements du cluster Kafka. Cette API comprend un nom de rubrique, un numéro de partition, à partir de laquelle l'enregistrement est reçu et un décalage qui pointe vers l'enregistrement dans une partition Kafka. La classe ConsumerRecord est utilisée pour créer un enregistrement consommateur avec un nom de rubrique spécifique, un nombre de partitions et des paires <clé, valeur>. Il a la signature suivante.
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
Topic - Le nom de rubrique pour l'enregistrement consommateur reçu du cluster Kafka.
Partition - Partition pour le sujet.
Key - La clé de l'enregistrement, si aucune clé n'existe, null sera renvoyée.
Value - Enregistrer le contenu.
API ConsumerRecords
L'API ConsumerRecords agit comme un conteneur pour ConsumerRecord. Cette API est utilisée pour conserver la liste des ConsumerRecord par partition pour un sujet particulier. Son constructeur est défini ci-dessous.
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
TopicPartition - Retourne une carte de partition pour un sujet particulier.
Records - Retournez la liste de ConsumerRecord.
La classe ConsumerRecords a les méthodes suivantes définies.
S. Non | Méthodes et description |
---|---|
1 | public int count() Le nombre d'enregistrements pour tous les sujets. |
2 | public Set partitions() L'ensemble des partitions contenant des données dans cet ensemble d'enregistrements (si aucune donnée n'a été renvoyée, l'ensemble est vide). |
3 | public Iterator iterator() Iterator vous permet de parcourir une collection, d'obtenir ou de déplacer des éléments. |
4 | public List records() Obtenez la liste des enregistrements pour la partition donnée. |
Paramètres de configuration
Les paramètres de configuration des principaux paramètres de configuration de l'API client client sont répertoriés ci-dessous:
S. Non | Paramètres et description |
---|---|
1 | bootstrap.servers Liste de démarrage des courtiers. |
2 | group.id Affecte un consommateur individuel à un groupe. |
3 | enable.auto.commit Activez la validation automatique pour les décalages si la valeur est true, sinon non validée. |
4 | auto.commit.interval.ms Renvoie la fréquence à laquelle les décalages consommés mis à jour sont écrits dans ZooKeeper. |
5 | session.timeout.ms Indique combien de millisecondes Kafka attendra que le ZooKeeper réponde à une requête (lecture ou écriture) avant d'abandonner et de continuer à consommer des messages. |
Application SimpleConsumer
Les étapes de candidature du producteur restent ici les mêmes. Tout d'abord, démarrez votre courtier ZooKeeper et Kafka. Créez ensuite une application SimpleConsumer
avec la classe java nommée SimpleCon-sumer.java
et tapez le code suivant.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Compilation - L'application peut être compilée à l'aide de la commande suivante.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution − L'application peut être exécutée à l'aide de la commande suivante
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
Input- Ouvrez la CLI du producteur et envoyez des messages à la rubrique. Vous pouvez mettre l'entrée smple comme «Bonjour consommateur».
Output - Voici la sortie.
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer