Apache Storm - przykład roboczy

Zapoznaliśmy się z podstawowymi szczegółami technicznymi Apache Storm, a teraz nadszedł czas, aby zakodować kilka prostych scenariuszy.

Scenariusz - mobilny analizator dziennika połączeń

Połączenie mobilne i czas jego trwania zostaną podane jako dane wejściowe do Apache Storm, a Storm przetworzy i pogrupuje połączenie między tego samego dzwoniącego i odbierającego oraz ich całkowitą liczbę połączeń.

Tworzenie wylewki

Wylewka to element służący do generowania danych. Zasadniczo wylewka będzie implementowała interfejs IRichSpout. Interfejs „IRichSpout” ma następujące ważne metody -

  • open- Zapewnia wylewce środowisko do wykonania. Wykonawcy uruchomią tę metodę w celu zainicjowania wylewki.

  • nextTuple - Emituje wygenerowane dane za pośrednictwem kolektora.

  • close - Ta metoda jest wywoływana, gdy wylewka będzie się wyłączać.

  • declareOutputFields - Deklaruje schemat wyjściowy krotki.

  • ack - potwierdza, że ​​przetwarzana jest konkretna krotka

  • fail - określa, że ​​określona krotka nie jest przetwarzana i nie ma być ponownie przetwarzana.

otwarty

Podpis open metoda jest następująca -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Zapewnia konfigurację burzową dla tej wylewki.

  • context - Zapewnia pełne informacje o miejscu wylewki w topologii, jego identyfikatorze zadania, danych wejściowych i wyjściowych.

  • collector - Umożliwia nam emitowanie krotki, która będzie przetwarzana przez śruby.

nextTuple

Podpis nextTuple metoda jest następująca -

nextTuple()

nextTuple () jest wywoływana okresowo z tej samej pętli, co metody ACK () i Fail (). Musi zwolnić kontrolę nad wątkiem, gdy nie ma pracy do wykonania, aby inne metody miały szansę zostać wywołane. Zatem pierwsza linia nextTuple sprawdza, czy przetwarzanie zostało zakończone. Jeśli tak, powinien spać przez co najmniej jedną milisekundę, aby zmniejszyć obciążenie procesora przed powrotem.

blisko

Podpis close metoda jest następująca -

close()

deklarujOutputFields

Podpis declareOutputFields metoda jest następująca -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Służy do deklarowania identyfikatorów strumieni wyjściowych, pól wyjściowych itp.

Ta metoda służy do określania schematu wyjściowego spójnej kolekcji.

ACK

Podpis ack metoda jest następująca -

ack(Object msgId)

Ta metoda potwierdza, że ​​została przetworzona konkretna krotka.

zawieść

Podpis nextTuple metoda jest następująca -

ack(Object msgId)

Ta metoda informuje, że określona krotka nie została w pełni przetworzona. Storm ponownie przetworzy określoną krotkę.

FakeCallLogReaderSpout

W naszym scenariuszu musimy zebrać szczegóły rejestru połączeń. Zawiera informacje z rejestru połączeń.

  • numer dzwoniącego
  • numer odbiorcy
  • duration

Ponieważ nie mamy informacji o dziennikach połączeń w czasie rzeczywistym, będziemy generować fałszywe dzienniki połączeń. Fałszywe informacje zostaną utworzone za pomocą klasy Random. Pełny kod programu znajduje się poniżej.

Kodowanie - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Tworzenie śrub

Bolt to komponent, który pobiera krotki jako dane wejściowe, przetwarza krotkę i generuje nowe krotki jako dane wyjściowe. Śruby będą realizowaćIRichBoltberło. W tym programie dwie klasy śrubCallLogCreatorBolt i CallLogCounterBolt służą do wykonywania operacji.

Interfejs IRichBolt ma następujące metody -

  • prepare- Zapewnia śrubie środowisko do wykonania. Wykonawcy uruchomią tę metodę w celu zainicjowania wylewki.

  • execute - Przetwarzaj pojedynczą krotkę danych wejściowych.

  • cleanup - Wezwany, gdy śruba się wyłączy.

  • declareOutputFields - Deklaruje schemat wyjściowy krotki.

Przygotować

Podpis prepare metoda jest następująca -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Zapewnia konfigurację burzy dla tej śruby.

  • context - Zapewnia pełne informacje o miejscu śruby w topologii, identyfikatorze zadania, danych wejściowych i wyjściowych itp.

  • collector - Umożliwia nam emitowanie przetworzonej krotki.

wykonać

Podpis execute metoda jest następująca -

execute(Tuple tuple)

Tutaj tuple jest krotką wejściową do przetworzenia.

Plik executemetoda przetwarza pojedynczą krotkę naraz. Dostęp do danych krotki można uzyskać za pomocą metody getValue klasy Tuple. Nie jest konieczne natychmiastowe przetwarzanie krotki wejściowej. Wiele krotek może być przetwarzanych i wyprowadzanych jako jedna krotka wyjściowa. Przetworzoną krotkę można wyemitować przy użyciu klasy OutputCollector.

sprzątać

Podpis cleanup metoda jest następująca -

cleanup()

deklarujOutputFields

Podpis declareOutputFields metoda jest następująca -

declareOutputFields(OutputFieldsDeclarer declarer)

Tutaj parametr declarer służy do deklarowania identyfikatorów strumieni wyjściowych, pól wyjściowych itp.

Ta metoda służy do określania schematu wyjściowego spójnej kolekcji

Twórca rejestru połączeń Bolt

Bolt kreatora rejestru połączeń odbiera krotkę rejestru połączeń. Krotka rejestru połączeń zawiera numer dzwoniącego, numer odbiorcy i czas trwania połączenia. Ta śruba po prostu tworzy nową wartość, łącząc numer dzwoniącego i numer odbiorcy. Format nowej wartości to „Numer dzwoniącego - Numer odbiorcy” i nazywa się ją nowym polem „Zadzwoń”. Pełny kod podano poniżej.

Kodowanie - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Rejestr połączeń Bolt licznika

Blokada rejestru połączeń odbiera połączenie i jego czas trwania jako krotkę. Ta śruba inicjuje obiekt Dictionary (Map) w metodzie przygotowania. Wexecutesprawdza krotkę i tworzy nowy wpis w obiekcie słownika dla każdej nowej wartości „wywołania” w krotce i ustawia wartość 1 w obiekcie słownika. Dla już dostępnego wpisu w słowniku po prostu zwiększa jego wartość. Mówiąc prościej, ten rygiel zapisuje wywołanie i jego liczbę w obiekcie słownika. Zamiast zapisywać wywołanie i jego liczbę w słowniku, możemy również zapisać je w źródle danych. Pełny kod programu jest następujący -

Kodowanie - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Tworzenie topologii

Topologia Storm jest w zasadzie strukturą Thrift. Klasa TopologyBuilder udostępnia proste i łatwe metody tworzenia złożonych topologii. Klasa TopologyBuilder zawiera metody ustawiania wylewu(setSpout) i ustawić rygiel (setBolt). Wreszcie, TopologyBuilder ma createTopology do tworzenia topologii. Użyj poniższego fragmentu kodu, aby utworzyć topologię -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping i fieldsGrouping metody pomagają ustawić grupowanie strumieni dla wylewek i śrub.

Klaster lokalny

Dla celów programistycznych możemy stworzyć klaster lokalny za pomocą obiektu „LocalCluster”, a następnie przesłać topologię za pomocą metody „submitTopology” klasy „LocalCluster”. Jednym z argumentów argumentu „submitTopology” jest instancja klasy „Config”. Klasa „Config” służy do ustawiania opcji konfiguracyjnych przed przesłaniem topologii. Ta opcja konfiguracji zostanie scalona z konfiguracją klastra w czasie wykonywania i wysłana do wszystkich zadań (wylewki i śruby) metodą przygotowania. Po przesłaniu topologii do klastra będziemy czekać 10 sekund, aż klaster obliczy przesłaną topologię, a następnie zamknie klaster przy użyciu metody „shutdown” „LocalCluster”. Pełny kod programu jest następujący -

Kodowanie - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Tworzenie i uruchamianie aplikacji

Cała aplikacja zawiera cztery kody Java. Oni są -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

Aplikację można zbudować za pomocą następującego polecenia -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Aplikację można uruchomić za pomocą następującego polecenia -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Wynik

Po uruchomieniu aplikacja wyświetli szczegółowe informacje o procesie uruchamiania klastra, przetwarzaniu spout i śruby, a na końcu o procesie zamykania klastra. W „CallLogCounterBolt” wydrukowaliśmy wywołanie i jego liczbę. Te informacje zostaną wyświetlone na konsoli w następujący sposób -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Języki spoza JVM

Topologie Storm są implementowane przez interfejsy Thrift, co ułatwia przesyłanie topologii w dowolnym języku. Storm obsługuje Ruby, Python i wiele innych języków. Rzućmy okiem na powiązanie Pythona.

Python Binding

Python jest interpretowanym, interaktywnym, obiektowym językiem programowania wysokiego poziomu ogólnego przeznaczenia. Storm obsługuje Pythona w celu implementacji swojej topologii. Python obsługuje operacje emitowania, zakotwiczania, potwierdzania i rejestrowania.

Jak wiesz, śruby można definiować w dowolnym języku. Śruby napisane w innym języku są wykonywane jako podprocesy, a Storm komunikuje się z tymi podprocesami za pomocą komunikatów JSON przez stdin / stdout. Najpierw weź przykładową śrubę WordCount, która obsługuje powiązanie języka Python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Tutaj klasa WordCount implementuje IRichBoltinterfejs i działa z implementacją Pythona z określonym argumentem super metody "splitword.py". Teraz utwórz implementację Pythona o nazwie „splitword.py”.

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

To jest przykładowa implementacja dla Pythona, która liczy słowa w danym zdaniu. Podobnie możesz również łączyć się z innymi językami pomocniczymi.