Apache Storm - Exemplo de trabalho

Examinamos os principais detalhes técnicos do Apache Storm e agora é hora de codificar alguns cenários simples.

Cenário - Analisador de registro de chamadas móveis

A chamada móvel e sua duração serão fornecidas como entrada para o Apache Storm e o Storm processará e agrupará a chamada entre o mesmo chamador e receptor e seu número total de chamadas.

Criação de bico

Spout é um componente utilizado para geração de dados. Basicamente, um spout implementará uma interface IRichSpout. A interface “IRichSpout” possui os seguintes métodos importantes -

  • open- Dá à bica um ambiente de execução. Os executores irão executar este método para inicializar o spout.

  • nextTuple - Emite os dados gerados através do coletor.

  • close - Este método é chamado quando uma bica vai desligar.

  • declareOutputFields - Declara o esquema de saída da tupla.

  • ack - Reconhece que uma tupla específica é processada

  • fail - Especifica que uma tupla específica não é processada e não deve ser reprocessada.

Abrir

A assinatura do open método é o seguinte -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Fornece configuração de tempestade para este bico.

  • context - Fornece informações completas sobre o local do bico na topologia, sua identificação de tarefa, informações de entrada e saída.

  • collector - Permite emitir a tupla que será processada pelos bolts.

nextTuple

A assinatura do nextTuple método é o seguinte -

nextTuple()

nextTuple () é chamado periodicamente do mesmo loop que os métodos ack () e fail (). Ele deve liberar o controle do encadeamento quando não houver trabalho a ser feito, para que os outros métodos tenham a chance de serem chamados. Portanto, a primeira linha de nextTuple verifica se o processamento foi concluído. Nesse caso, ele deve hibernar por pelo menos um milissegundo para reduzir a carga no processador antes de retornar.

Fechar

A assinatura do close método é o seguinte -

close()

declareOutputFields

A assinatura do declareOutputFields método é o seguinte -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - É usado para declarar ids de fluxo de saída, campos de saída, etc.

Este método é usado para especificar o esquema de saída da tupla.

ack

A assinatura do ack método é o seguinte -

ack(Object msgId)

Este método reconhece que uma tupla específica foi processada.

falhou

A assinatura do nextTuple método é o seguinte -

ack(Object msgId)

Este método informa que uma tupla específica não foi totalmente processada. Storm irá reprocessar a tupla específica.

FakeCallLogReaderSpout

Em nosso cenário, precisamos coletar os detalhes do registro de chamadas. A informação do registro de chamadas contém.

  • número do chamador
  • número do receptor
  • duration

Como não temos informações em tempo real de registros de chamadas, geraremos registros de chamadas falsos. As informações falsas serão criadas usando a classe Random. O código completo do programa é fornecido abaixo.

Codificação - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Criação de Parafuso

Bolt é um componente que recebe tuplas como entrada, processa a tupla e produz novas tuplas como saída. Os parafusos irão implementarIRichBoltinterface. Neste programa, duas classes de boltCallLogCreatorBolt e CallLogCounterBolt são usados ​​para realizar as operações.

A interface IRichBolt tem os seguintes métodos -

  • prepare- Fornece ao parafuso um ambiente para execução. Os executores irão executar este método para inicializar o spout.

  • execute - Processa uma única tupla de entrada.

  • cleanup - Chamado quando um parafuso vai desligar.

  • declareOutputFields - Declara o esquema de saída da tupla.

Preparar

A assinatura do prepare método é o seguinte -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Fornece configuração Storm para este parafuso.

  • context - Fornece informações completas sobre o local do parafuso dentro da topologia, sua identificação de tarefa, informações de entrada e saída, etc.

  • collector - Permite-nos emitir a tupla processada.

executar

A assinatura do execute método é o seguinte -

execute(Tuple tuple)

Aqui tuple é a tupla de entrada a ser processada.

o executemétodo processa uma única tupla por vez. Os dados da tupla podem ser acessados ​​pelo método getValue da classe Tupla. Não é necessário processar a tupla de entrada imediatamente. Várias tuplas podem ser processadas e geradas como uma única tupla de saída. A tupla processada pode ser emitida usando a classe OutputCollector.

Limpar

A assinatura do cleanup método é o seguinte -

cleanup()

declareOutputFields

A assinatura do declareOutputFields método é o seguinte -

declareOutputFields(OutputFieldsDeclarer declarer)

Aqui o parâmetro declarer é usado para declarar ids de fluxo de saída, campos de saída, etc.

Este método é usado para especificar o esquema de saída da tupla

Parafuso do criador do registro de chamadas

O bolt do criador do registro de chamadas recebe a tupla do registro de chamadas. A tupla do registro de chamadas tem o número do chamador, o número do receptor e a duração da chamada. Este parafuso simplesmente cria um novo valor combinando o número do chamador e o número do receptor. O formato do novo valor é "Número do chamador - Número do receptor" e é denominado como um novo campo, "ligar". O código completo é fornecido abaixo.

Codificação - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Parafuso do contador do registro de chamadas

O parafuso do contador de registro de chamadas recebe a chamada e sua duração como uma tupla. Este parafuso inicializa um objeto de dicionário (Mapa) no método de preparação. Noexecute, ele verifica a tupla e cria uma nova entrada no objeto de dicionário para cada novo valor de “chamada” na tupla e define um valor 1 no objeto de dicionário. Para a entrada já disponível no dicionário, basta incrementar seu valor. Em termos simples, esse parafuso salva a chamada e sua contagem no objeto de dicionário. Em vez de salvar a chamada e sua contagem no dicionário, também podemos salvá-la em uma fonte de dados. O código completo do programa é o seguinte -

Codificação - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Criando Topologia

A topologia Storm é basicamente uma estrutura Thrift. A classe TopologyBuilder fornece métodos simples e fáceis para criar topologias complexas. A classe TopologyBuilder tem métodos para definir spout(setSpout) e para definir o parafuso (setBolt). Finalmente, TopologyBuilder tem createTopology para criar topologia. Use o seguinte snippet de código para criar uma topologia -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping e fieldsGrouping métodos ajudam a definir o agrupamento de fluxo para bico e parafusos.

Cluster Local

Para fins de desenvolvimento, podemos criar um cluster local usando o objeto "LocalCluster" e, em seguida, enviar a topologia usando o método "submitTopology" da classe "LocalCluster". Um dos argumentos para "submitTopology" é uma instância da classe "Config". A classe "Config" é usada para definir opções de configuração antes de enviar a topologia. Esta opção de configuração será mesclada com a configuração do cluster em tempo de execução e enviada para todas as tarefas (spout e bolt) com o método prepare. Assim que a topologia for enviada ao cluster, esperaremos 10 segundos para que o cluster calcule a topologia enviada e, em seguida, encerraremos o cluster usando o método de “desligamento” de "LocalCluster". O código completo do programa é o seguinte -

Codificação - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Construindo e executando o aplicativo

O aplicativo completo possui quatro códigos Java. Eles são -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

O aplicativo pode ser construído usando o seguinte comando -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

O aplicativo pode ser executado usando o seguinte comando -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Resultado

Assim que o aplicativo for iniciado, ele produzirá os detalhes completos sobre o processo de inicialização do cluster, processamento de spout e bolt e, finalmente, o processo de desligamento do cluster. Em "CallLogCounterBolt" imprimimos a chamada e seus detalhes de contagem. Essas informações serão exibidas no console da seguinte forma -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Linguagens não JVM

Topologias Storm são implementadas por interfaces Thrift, o que facilita o envio de topologias em qualquer idioma. Storm suporta Ruby, Python e muitas outras linguagens. Vamos dar uma olhada na vinculação python.

Ligação Python

Python é uma linguagem de programação de alto nível interpretada de propósito geral, interativa, orientada a objetos. Storm suporta Python para implementar sua topologia. Python oferece suporte a operações de emissão, ancoragem, reconhecimento e registro.

Como você sabe, os parafusos podem ser definidos em qualquer idioma. Bolts escritos em outro idioma são executados como subprocessos e Storm se comunica com esses subprocessos com mensagens JSON por stdin / stdout. Primeiro, pegue um parafuso de amostra WordCount que suporte a vinculação Python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Aqui a aula WordCount implementa o IRichBoltinterface e executando com a implementação python especificado argumento do super método "splitword.py". Agora crie uma implementação python chamada "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

Este é o exemplo de implementação para Python que conta as palavras em uma determinada frase. Da mesma forma, você também pode vincular a outras linguagens de suporte.