Apache Kafka - Intégration avec Storm

Dans ce chapitre, nous allons apprendre à intégrer Kafka à Apache Storm.

À propos de Storm

Storm a été créé à l'origine par Nathan Marz et l'équipe de BackType. En peu de temps, Apache Storm est devenu un standard pour le système de traitement en temps réel distribué qui vous permet de traiter un énorme volume de données. Storm est très rapide et un benchmark l'a cadencé à plus d'un million de tuples traités par seconde et par nœud. Apache Storm s'exécute en continu, consommant les données des sources configurées (Spouts) et transmet les données dans le pipeline de traitement (Bolts). Combinés, les becs et les boulons forment une topologie.

Intégration avec Storm

Kafka et Storm se complètent naturellement, et leur puissante coopération permet des analyses de streaming en temps réel pour des Big Data en évolution rapide. L'intégration de Kafka et Storm permet aux développeurs d'ingérer et de publier plus facilement des flux de données à partir de topologies Storm.

Flux conceptuel

Un bec est une source de flux. Par exemple, un spout peut lire les tuples d'un sujet Kafka et les émettre en tant que flux. Un boulon consomme des flux d'entrée, traite et émet éventuellement de nouveaux flux. Les Bolts peuvent tout faire: exécuter des fonctions, filtrer des tuples, effectuer des agrégations de streaming, des jointures en streaming, parler à des bases de données, etc. Chaque nœud d'une topologie Storm s'exécute en parallèle. Une topologie s'exécute indéfiniment jusqu'à ce que vous la terminiez. Storm réaffectera automatiquement toutes les tâches ayant échoué. De plus, Storm garantit qu'il n'y aura aucune perte de données, même si les machines tombent en panne et que les messages sont supprimés.

Passons en revue les API d'intégration Kafka-Storm en détail. Il existe trois classes principales pour intégrer Kafka à Storm. Ils sont les suivants -

BrokerHosts - ZkHosts et StaticHosts

BrokerHosts est une interface et ZkHosts et StaticHosts sont ses deux principales implémentations. ZkHosts est utilisé pour suivre dynamiquement les courtiers Kafka en conservant les détails dans ZooKeeper, tandis que StaticHosts est utilisé pour définir manuellement / statiquement les courtiers Kafka et ses détails. ZkHosts est le moyen simple et rapide d'accéder au courtier Kafka.

La signature de ZkHosts est la suivante -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

Où brokerZkStr est l'hôte ZooKeeper et brokerZkPath est le chemin ZooKeeper pour gérer les détails du courtier Kafka.

API KafkaConfig

Cette API est utilisée pour définir les paramètres de configuration du cluster Kafka. La signature de Kafka Con-fig est définie comme suit

public KafkaConfig(BrokerHosts hosts, string topic)

    Hosts - Les BrokerHosts peuvent être des ZkHosts / StaticHosts.

    Topic - nom du sujet.

API SpoutConfig

Spoutconfig est une extension de KafkaConfig qui prend en charge des informations supplémentaires sur ZooKeeper.

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Hosts - Les BrokerHosts peuvent être n'importe quelle implémentation de l'interface BrokerHosts

  • Topic - nom du sujet.

  • zkRoot - Chemin racine de ZooKeeper.

  • id −Le bec stocke l'état des décalages consommés dans Zookeeper. L'identifiant doit identifier de manière unique votre bec.

SchemeAsMultiScheme

SchemeAsMultiScheme est une interface qui dicte comment le ByteBuffer consommé par Kafka est transformé en un tuple de tempête. Il est dérivé de MultiScheme et accepte l'implémentation de la classe Scheme. Il existe de nombreuses implémentations de la classe Scheme et l'une de ces implémentations est StringScheme, qui analyse l'octet comme une simple chaîne. Il contrôle également la dénomination de votre champ de sortie. La signature est définie comme suit.

public SchemeAsMultiScheme(Scheme scheme)
  • Scheme - tampon d'octets consommé par kafka.

API KafkaSpout

KafkaSpout est notre implémentation de bec, qui s'intégrera à Storm. Il récupère les messages du sujet kafka et les émet dans l'écosystème Storm sous forme de tuples. KafkaSpout obtient ses détails de configuration auprès de SpoutConfig.

Vous trouverez ci-dessous un exemple de code pour créer un simple bec verseur Kafka.

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Création de boulons

Bolt est un composant qui prend des tuples en entrée, traite le tuple et produit de nouveaux tuples en sortie. Bolts implémentera l'interface IRichBolt. Dans ce programme, deux classes de boulons WordSplitter-Bolt et WordCounterBolt sont utilisées pour effectuer les opérations.

L'interface IRichBolt a les méthodes suivantes -

  • Prepare- Fournit au boulon un environnement à exécuter. Les exécuteurs exécuteront cette méthode pour initialiser le bec.

  • Execute - Traitez un seul tuple d'entrée.

  • Cleanup - Appelé lorsqu'un verrou va s'arrêter.

  • declareOutputFields - Déclare le schéma de sortie du tuple.

Créons SplitBolt.java, qui implémente la logique pour diviser une phrase en mots et CountBolt.java, qui implémente la logique pour séparer les mots uniques et compter son occurrence.

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Soumission à la topologie

La topologie Storm est essentiellement une structure Thrift. La classe TopologyBuilder fournit des méthodes simples et faciles pour créer des topologies complexes. La classe TopologyBuilder a des méthodes pour définir spout (setSpout) et pour définir bolt (setBolt). Enfin, TopologyBuilder a createTopology pour créer des to-pology. shuffleGrouping et fieldsGrouping Les méthodes permettent de définir le regroupement de flux pour le bec et les boulons.

Local Cluster- Pour des fins de développement, nous pouvons créer un cluster local en utilisant LocalCluster objet, puis soumettre la topologie à l' aide submitTopology méthode de LocalCluster classe.

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

Avant de déplacer la compilation, l'intégration Kakfa-Storm nécessite la bibliothèque java du client ZooKeeper du conservateur. La version 2.9.1 de Curator prend en charge la version 0.9.5 d'Apache Storm (que nous utilisons dans ce tutoriel). Téléchargez les fichiers jar spécifiés ci-dessous et placez-les dans le chemin de classe java.

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

Après avoir inclus les fichiers de dépendance, compilez le programme à l'aide de la commande suivante,

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

Exécution

Démarrez Kafka Producer CLI (expliqué dans le chapitre précédent), créez un nouveau sujet appelé my-first-topic et fournissez des exemples de messages comme indiqué ci-dessous -

hello
kafka
storm
spark
test message
another test message

Exécutez maintenant l'application à l'aide de la commande suivante -

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

L'exemple de sortie de cette application est spécifié ci-dessous -

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2