Apache Storm - Рабочий пример

Мы рассмотрели основные технические детали Apache Storm, и теперь пришло время написать несколько простых сценариев.

Сценарий - Анализатор журнала мобильных вызовов

Мобильный вызов и его продолжительность будут предоставлены в качестве входных данных для Apache Storm, и Storm обработает и сгруппирует вызов между одним и тем же вызывающим абонентом и получателем, а также их общее количество вызовов.

Создание носика

Носик - это компонент, который используется для генерации данных. По сути, носик реализует интерфейс IRichSpout. Интерфейс «IRichSpout» имеет следующие важные методы:

  • open- Обеспечивает среду для работы носика. Исполнители запустят этот метод для инициализации носика.

  • nextTuple - Выдает сгенерированные данные через коллектор.

  • close - Этот метод вызывается, когда излив собирается выключиться.

  • declareOutputFields - Объявляет схему вывода кортежа.

  • ack - Подтверждает, что конкретный кортеж обрабатывается

  • fail - Указывает, что конкретный кортеж не обрабатывается и не подлежит повторной обработке.

открыто

Подпись open метод выглядит следующим образом -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Обеспечивает конфигурацию шторма для этого излива.

  • context - Предоставляет полную информацию о месте излива в топологии, его идентификаторе задачи, входной и выходной информации.

  • collector - Позволяет нам выдать кортеж, который будет обрабатываться болтами.

nextTuple

Подпись nextTuple метод выглядит следующим образом -

nextTuple()

nextTuple () периодически вызывается из того же цикла, что и методы ack () и fail (). Он должен освободить контроль над потоком, когда нет работы, чтобы другие методы могли быть вызваны. Итак, первая строка nextTuple проверяет, завершена ли обработка. Если это так, он должен спать не менее одной миллисекунды, чтобы снизить нагрузку на процессор, прежде чем вернуться.

близко

Подпись close метод выглядит следующим образом -

close()

declareOutputFields

Подпись declareOutputFields метод выглядит следующим образом -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Он используется для объявления идентификаторов выходных потоков, полей вывода и т. Д.

Этот метод используется для указания выходной схемы кортежа.

подтверждать

Подпись ack метод выглядит следующим образом -

ack(Object msgId)

Этот метод подтверждает, что определенный кортеж был обработан.

потерпеть поражение

Подпись nextTuple метод выглядит следующим образом -

ack(Object msgId)

Этот метод сообщает, что определенный кортеж не был полностью обработан. Storm повторно обработает конкретный кортеж.

FakeCallLogReaderSpout

В нашем сценарии нам нужно собрать данные журнала вызовов. Информация журнала вызовов содержит.

  • номер звонящего
  • номер получателя
  • duration

Поскольку у нас нет информации о журналах вызовов в реальном времени, мы будем создавать поддельные журналы вызовов. Поддельная информация будет создана с использованием класса Random. Полный программный код приведен ниже.

Кодирование - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Создание болта

Bolt - это компонент, который принимает кортежи в качестве входных данных, обрабатывает кортежи и создает новые кортежи в качестве выходных данных. Болты осуществимIRichBoltинтерфейс. В этой программе два класса болтовCallLogCreatorBolt а также CallLogCounterBolt используются для выполнения операций.

Интерфейс IRichBolt имеет следующие методы -

  • prepare- Предоставляет болту среду для выполнения. Исполнители запустят этот метод для инициализации носика.

  • execute - Обработать один кортеж ввода.

  • cleanup - Вызывается при отключении болта.

  • declareOutputFields - Объявляет схему вывода кортежа.

Подготовить

Подпись prepare метод выглядит следующим образом -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Предоставляет конфигурацию Storm для этого болта.

  • context - Предоставляет полную информацию о месте болта в топологии, его идентификаторе задачи, входной и выходной информации и т. Д.

  • collector - Позволяет нам выдать обработанный кортеж.

выполнять

Подпись execute метод выглядит следующим образом -

execute(Tuple tuple)

Вот tuple - входной кортеж, который нужно обработать.

В executeобрабатывает по одному кортежу за раз. Доступ к данным кортежа можно получить с помощью метода getValue класса Tuple. Нет необходимости немедленно обрабатывать входной кортеж. Несколько кортежей можно обработать и вывести как один выходной кортеж. Обработанный кортеж можно передать с помощью класса OutputCollector.

уборка

Подпись cleanup метод выглядит следующим образом -

cleanup()

declareOutputFields

Подпись declareOutputFields метод выглядит следующим образом -

declareOutputFields(OutputFieldsDeclarer declarer)

Здесь параметр declarer используется для объявления идентификаторов выходных потоков, полей вывода и т. д.

Этот метод используется для указания выходной схемы кортежа

Журнал вызовов Creator Bolt

Болт создателя журнала вызовов получает кортеж журнала вызовов. Кортеж журнала вызовов содержит номер вызывающего абонента, номер получателя и продолжительность разговора. Этот болт просто создает новое значение, комбинируя номер вызывающего абонента и номер получателя. Новое значение имеет формат «Номер вызывающего абонента - номер получателя» и называется новым полем «вызов». Полный код приведен ниже.

Кодирование - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Болт счетчика журнала вызовов

Болт счетчика журнала вызовов получает вызов и его продолжительность в виде кортежа. Этот болт инициализирует объект словаря (Map) в методе подготовки. Вexecute, он проверяет кортеж и создает новую запись в объекте словаря для каждого нового значения «вызова» в кортеже и устанавливает значение 1 в объекте словаря. Для уже доступной записи в словаре она просто увеличивает ее значение. Проще говоря, этот болт сохраняет вызов и его счет в объекте словаря. Вместо того, чтобы сохранять вызов и его количество в словаре, мы также можем сохранить его в источник данных. Полный программный код выглядит следующим образом -

Кодирование - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Создание топологии

Топология Storm - это, по сути, структура Thrift. Класс TopologyBuilder предоставляет простые и легкие методы для создания сложных топологий. Класс TopologyBuilder имеет методы для установки spout(setSpout) и установить болт (setBolt). Наконец, TopologyBuilder имеет createTopology для создания топологии. Используйте следующий фрагмент кода для создания топологии -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping а также fieldsGrouping методы помогают настроить группировку потоков для излива и болтов.

Локальный кластер

В целях разработки мы можем создать локальный кластер, используя объект «LocalCluster», а затем отправить топологию, используя метод «submitTopology» класса «LocalCluster». Один из аргументов для submitTopology - это экземпляр класса «Config». Класс «Config» используется для установки параметров конфигурации перед отправкой топологии. Эта опция конфигурации будет объединена с конфигурацией кластера во время выполнения и отправлена ​​во все задачи (носик и болт) с помощью метода подготовки. После того, как топология будет отправлена ​​в кластер, мы подождем 10 секунд, пока кластер вычислит переданную топологию, а затем завершим работу кластера, используя метод «shutdown» для «LocalCluster». Полный программный код выглядит следующим образом -

Кодирование - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Сборка и запуск приложения

Полное приложение содержит четыре кода Java. Они -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

Приложение может быть создано с помощью следующей команды -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Приложение можно запустить с помощью следующей команды -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Выход

После запуска приложение выводит полные сведения о процессе запуска кластера, обработке носика и болта и, наконец, о процессе завершения работы кластера. В «CallLogCounterBolt» мы распечатали вызов и детали его подсчета. Эта информация будет отображаться на консоли следующим образом -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Языки без JVM

Топологии Storm реализуются с помощью интерфейсов Thrift, что упрощает отправку топологий на любом языке. Storm поддерживает Ruby, Python и многие другие языки. Давайте посмотрим на привязку Python.

Связывание Python

Python - это интерпретируемый, интерактивный, объектно-ориентированный язык программирования высокого уровня общего назначения. Storm поддерживает Python для реализации своей топологии. Python поддерживает операции создания, привязки, подтверждения и регистрации.

Как известно, болты можно определять на любом языке. Болты, написанные на другом языке, выполняются как подпроцессы, и Storm взаимодействует с этими подпроцессами с помощью сообщений JSON через stdin / stdout. Сначала возьмите образец болта WordCount, который поддерживает привязку Python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Здесь класс WordCount реализует IRichBoltинтерфейс и запускается с реализацией python, указанным аргументом супер-метода "splitword.py". Теперь создайте реализацию Python с именем "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

Это пример реализации Python, которая считает слова в заданном предложении. Точно так же вы можете выполнить привязку с другими поддерживающими языками.