Apache Kafka - Integracja z Storm

W tym rozdziale dowiemy się, jak zintegrować Kafkę z Apache Storm.

O Storm

Storm został pierwotnie stworzony przez Nathana Marz i zespół BackType. W krótkim czasie Apache Storm stał się standardem dla rozproszonego systemu przetwarzania w czasie rzeczywistym, który umożliwia przetwarzanie ogromnych ilości danych. Storm jest bardzo szybki, a benchmark taktował go z prędkością ponad miliona krotek przetwarzanych na sekundę na węzeł. Apache Storm działa w sposób ciągły, pobiera dane ze skonfigurowanych źródeł (Spouts) i przekazuje je w dół potoku przetwarzania (Bolts). Połączone, wylewki i śruby tworzą topologię.

Integracja z Storm

Kafka i Storm w naturalny sposób uzupełniają się wzajemnie, a ich silna współpraca umożliwia analizę strumieniową w czasie rzeczywistym dla szybko zmieniających się dużych zbiorów danych. Integracja Kafka i Storm ma ułatwić programistom pozyskiwanie i publikowanie strumieni danych z topologii Storm.

Koncepcyjny przepływ

Wylewka jest źródłem strumieni. Na przykład dziobek może odczytać krotki z tematu Kafki i emitować je jako strumień. Bolt zużywa strumienie wejściowe, przetwarza i prawdopodobnie emituje nowe strumienie. Bolts może robić wszystko, od uruchamiania funkcji, filtrowania krotek, agregacji strumieniowych, łączenia strumieniowego, komunikowania się z bazami danych i nie tylko. Każdy węzeł w topologii Storm działa równolegle. Topologia działa w nieskończoność, dopóki jej nie zakończysz. Storm automatycznie ponownie przydzieli wszystkie nieudane zadania. Dodatkowo Storm gwarantuje, że nie nastąpi utrata danych, nawet jeśli maszyny ulegną awarii, a wiadomości zostaną odrzucone.

Przyjrzyjmy się szczegółowo interfejsom API integracji Kafka-Storm. Istnieją trzy główne klasy integrujące Kafkę ze Stormem. Są następujące -

BrokerHosts - ZkHosts & StaticHosts

BrokerHosts to interfejs, a ZkHosts i StaticHosts to jego dwie główne implementacje. ZkHosts służy do dynamicznego śledzenia brokerów Kafka poprzez utrzymywanie szczegółów w ZooKeeper, podczas gdy StaticHosts służy do ręcznego / statycznego ustawiania brokerów Kafka i ich szczegółów. ZkHosts to prosty i szybki sposób na dostęp do brokera Kafka.

Podpis ZkHosts jest następujący -

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

Gdzie brokerZkStr jest hostem ZooKeeper, a brokerZkPath jest ścieżką ZooKeeper do zarządzania danymi brokera Kafka.

KafkaConfig API

Ten interfejs API służy do definiowania ustawień konfiguracji dla klastra Kafka. Podpis Kafki Con-fig jest zdefiniowany w następujący sposób

public KafkaConfig(BrokerHosts hosts, string topic)

    Hosts - BrokerHosts może być ZkHosts / StaticHosts.

    Topic - nazwa tematu.

SpoutConfig API

Spoutconfig to rozszerzenie KafkaConfig, które obsługuje dodatkowe informacje ZooKeeper.

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
  • Hosts - BrokerHosts może być dowolną implementacją interfejsu BrokerHosts

  • Topic - nazwa tematu.

  • zkRoot - Ścieżka główna ZooKeeper.

  • id −Wylewka przechowuje stan odsadzek, które zostały zużyte w Zookeeper. Identyfikator powinien jednoznacznie identyfikować twoją wylewkę.

SchemeAsMultiScheme

SchemeAsMultiScheme to interfejs, który dyktuje, w jaki sposób ByteBuffer konsumowany przez Kafkę zostanie przekształcony w krotkę burzy. Wywodzi się z MultiScheme i akceptujemy implementację klasy Scheme. Istnieje wiele implementacji klasy Scheme, a jedną z takich implementacji jest StringScheme, który analizuje bajt jako prosty ciąg. Kontroluje również nazewnictwo pola wyjściowego. Podpis jest zdefiniowany w następujący sposób.

public SchemeAsMultiScheme(Scheme scheme)
  • Scheme - bufor bajtów zużyty z kafki.

KafkaSpout API

KafkaSpout to nasza realizacja wylewki, która będzie zintegrowana ze Stormem. Pobiera wiadomości z tematu kafka i emituje je do ekosystemu Storm jako krotki. KafkaSpout pobiera szczegóły konfiguracji ze SpoutConfig.

Poniżej znajduje się przykładowy kod do stworzenia prostej wylewki Kafki.

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Tworzenie śrub

Bolt to komponent, który pobiera krotki jako dane wejściowe, przetwarza krotkę i generuje nowe krotki jako dane wyjściowe. Bolts zaimplementuje interfejs IRichBolt. W tym programie do wykonywania operacji używane są dwie klasy śrub, WordSplitter-Bolt i WordCounterBolt.

Interfejs IRichBolt ma następujące metody -

  • Prepare- Zapewnia śrubie środowisko do wykonania. Wykonawcy uruchomią tę metodę w celu zainicjowania wylewki.

  • Execute - Przetwarzaj pojedynczą krotkę danych wejściowych.

  • Cleanup - Zawołano, gdy zamyka się zasuwka.

  • declareOutputFields - Deklaruje schemat wyjściowy krotki.

Stwórzmy SplitBolt.java, który implementuje logikę dzielenia zdania na słowa oraz CountBolt.java, który implementuje logikę do oddzielania unikalnych słów i liczenia ich występowania.

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Przesyłanie do topologii

Topologia Storm jest w zasadzie strukturą Thrift. Klasa TopologyBuilder udostępnia proste i łatwe metody tworzenia złożonych topologii. Klasa TopologyBuilder zawiera metody do ustawiania wylewki (setSpout) i ustawiania śruby (setBolt). Wreszcie, TopologyBuilder ma createTopology do tworzenia pology. shuffleGrouping i fields Metody grupowania pomagają ustawić grupowanie strumieni dla wylewki i śrub.

Local Cluster- Dla celów rozwoju, możemy stworzyć lokalny klaster korzystając LocalCluster obiekt, a następnie przedstawić topologię używając submitTopology metodę LocalCluster klasie.

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

Przed przeniesieniem kompilacji, integracja Kakfa-Storm wymaga biblioteki java klienta ZooKeeper. Wersja 2.9.1 kuratora obsługuje Apache Storm w wersji 0.9.5 (której używamy w tym samouczku). Pobierz poniższe pliki jar i umieść je w ścieżce klas java.

  • curator-client-2.9.1.jar
  • curator-framework-2.9.1.jar

Po dołączeniu plików zależności skompiluj program za pomocą następującego polecenia,

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

Wykonanie

Uruchom interfejs wiersza polecenia Kafka Producer (wyjaśniono w poprzednim rozdziale), utwórz nowy temat o nazwie my-first-topic i podaj kilka przykładowych wiadomości, jak pokazano poniżej -

hello
kafka
storm
spark
test message
another test message

Teraz uruchom aplikację za pomocą następującego polecenia -

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample

Przykładowe dane wyjściowe tej aplikacji są określone poniżej -

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2