Apache Storm - Exemplo de trabalho
Examinamos os principais detalhes técnicos do Apache Storm e agora é hora de codificar alguns cenários simples.
Cenário - Analisador de registro de chamadas móveis
A chamada móvel e sua duração serão fornecidas como entrada para o Apache Storm e o Storm processará e agrupará a chamada entre o mesmo chamador e receptor e seu número total de chamadas.
Criação de bico
Spout é um componente utilizado para geração de dados. Basicamente, um spout implementará uma interface IRichSpout. A interface “IRichSpout” possui os seguintes métodos importantes -
open- Dá à bica um ambiente de execução. Os executores irão executar este método para inicializar o spout.
nextTuple - Emite os dados gerados através do coletor.
close - Este método é chamado quando uma bica vai desligar.
declareOutputFields - Declara o esquema de saída da tupla.
ack - Reconhece que uma tupla específica é processada
fail - Especifica que uma tupla específica não é processada e não deve ser reprocessada.
Abrir
A assinatura do open método é o seguinte -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - Fornece configuração de tempestade para este bico.
context - Fornece informações completas sobre o local do bico na topologia, sua identificação de tarefa, informações de entrada e saída.
collector - Permite emitir a tupla que será processada pelos bolts.
nextTuple
A assinatura do nextTuple método é o seguinte -
nextTuple()
nextTuple () é chamado periodicamente do mesmo loop que os métodos ack () e fail (). Ele deve liberar o controle do encadeamento quando não houver trabalho a ser feito, para que os outros métodos tenham a chance de serem chamados. Portanto, a primeira linha de nextTuple verifica se o processamento foi concluído. Nesse caso, ele deve hibernar por pelo menos um milissegundo para reduzir a carga no processador antes de retornar.
Fechar
A assinatura do close método é o seguinte -
close()
declareOutputFields
A assinatura do declareOutputFields método é o seguinte -
declareOutputFields(OutputFieldsDeclarer declarer)
declarer - É usado para declarar ids de fluxo de saída, campos de saída, etc.
Este método é usado para especificar o esquema de saída da tupla.
ack
A assinatura do ack método é o seguinte -
ack(Object msgId)
Este método reconhece que uma tupla específica foi processada.
falhou
A assinatura do nextTuple método é o seguinte -
ack(Object msgId)
Este método informa que uma tupla específica não foi totalmente processada. Storm irá reprocessar a tupla específica.
FakeCallLogReaderSpout
Em nosso cenário, precisamos coletar os detalhes do registro de chamadas. A informação do registro de chamadas contém.
- número do chamador
- número do receptor
- duration
Como não temos informações em tempo real de registros de chamadas, geraremos registros de chamadas falsos. As informações falsas serão criadas usando a classe Random. O código completo do programa é fornecido abaixo.
Codificação - FakeCallLogReaderSpout.java
import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface
to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;
//Create instance for TopologyContext which contains topology data.
private TopologyContext context;
//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
if(this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
//Override all the interface methods
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Criação de Parafuso
Bolt é um componente que recebe tuplas como entrada, processa a tupla e produz novas tuplas como saída. Os parafusos irão implementarIRichBoltinterface. Neste programa, duas classes de boltCallLogCreatorBolt e CallLogCounterBolt são usados para realizar as operações.
A interface IRichBolt tem os seguintes métodos -
prepare- Fornece ao parafuso um ambiente para execução. Os executores irão executar este método para inicializar o spout.
execute - Processa uma única tupla de entrada.
cleanup - Chamado quando um parafuso vai desligar.
declareOutputFields - Declara o esquema de saída da tupla.
Preparar
A assinatura do prepare método é o seguinte -
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf - Fornece configuração Storm para este parafuso.
context - Fornece informações completas sobre o local do parafuso dentro da topologia, sua identificação de tarefa, informações de entrada e saída, etc.
collector - Permite-nos emitir a tupla processada.
executar
A assinatura do execute método é o seguinte -
execute(Tuple tuple)
Aqui tuple é a tupla de entrada a ser processada.
o executemétodo processa uma única tupla por vez. Os dados da tupla podem ser acessados pelo método getValue da classe Tupla. Não é necessário processar a tupla de entrada imediatamente. Várias tuplas podem ser processadas e geradas como uma única tupla de saída. A tupla processada pode ser emitida usando a classe OutputCollector.
Limpar
A assinatura do cleanup método é o seguinte -
cleanup()
declareOutputFields
A assinatura do declareOutputFields método é o seguinte -
declareOutputFields(OutputFieldsDeclarer declarer)
Aqui o parâmetro declarer é usado para declarar ids de fluxo de saída, campos de saída, etc.
Este método é usado para especificar o esquema de saída da tupla
Parafuso do criador do registro de chamadas
O bolt do criador do registro de chamadas recebe a tupla do registro de chamadas. A tupla do registro de chamadas tem o número do chamador, o número do receptor e a duração da chamada. Este parafuso simplesmente cria um novo valor combinando o número do chamador e o número do receptor. O formato do novo valor é "Número do chamador - Número do receptor" e é denominado como um novo campo, "ligar". O código completo é fornecido abaixo.
Codificação - CallLogCreatorBolt.java
//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Parafuso do contador do registro de chamadas
O parafuso do contador de registro de chamadas recebe a chamada e sua duração como uma tupla. Este parafuso inicializa um objeto de dicionário (Mapa) no método de preparação. Noexecute, ele verifica a tupla e cria uma nova entrada no objeto de dicionário para cada novo valor de “chamada” na tupla e define um valor 1 no objeto de dicionário. Para a entrada já disponível no dicionário, basta incrementar seu valor. Em termos simples, esse parafuso salva a chamada e sua contagem no objeto de dicionário. Em vez de salvar a chamada e sua contagem no dicionário, também podemos salvá-la em uma fonte de dados. O código completo do programa é o seguinte -
Codificação - CallLogCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Criando Topologia
A topologia Storm é basicamente uma estrutura Thrift. A classe TopologyBuilder fornece métodos simples e fáceis para criar topologias complexas. A classe TopologyBuilder tem métodos para definir spout(setSpout) e para definir o parafuso (setBolt). Finalmente, TopologyBuilder tem createTopology para criar topologia. Use o seguinte snippet de código para criar uma topologia -
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping e fieldsGrouping métodos ajudam a definir o agrupamento de fluxo para bico e parafusos.
Cluster Local
Para fins de desenvolvimento, podemos criar um cluster local usando o objeto "LocalCluster" e, em seguida, enviar a topologia usando o método "submitTopology" da classe "LocalCluster". Um dos argumentos para "submitTopology" é uma instância da classe "Config". A classe "Config" é usada para definir opções de configuração antes de enviar a topologia. Esta opção de configuração será mesclada com a configuração do cluster em tempo de execução e enviada para todas as tarefas (spout e bolt) com o método prepare. Assim que a topologia for enviada ao cluster, esperaremos 10 segundos para que o cluster calcule a topologia enviada e, em seguida, encerraremos o cluster usando o método de “desligamento” de "LocalCluster". O código completo do programa é o seguinte -
Codificação - LogAnalyserStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
Construindo e executando o aplicativo
O aplicativo completo possui quatro códigos Java. Eles são -
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
O aplicativo pode ser construído usando o seguinte comando -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
O aplicativo pode ser executado usando o seguinte comando -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Resultado
Assim que o aplicativo for iniciado, ele produzirá os detalhes completos sobre o processo de inicialização do cluster, processamento de spout e bolt e, finalmente, o processo de desligamento do cluster. Em "CallLogCounterBolt" imprimimos a chamada e seus detalhes de contagem. Essas informações serão exibidas no console da seguinte forma -
1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93
Linguagens não JVM
Topologias Storm são implementadas por interfaces Thrift, o que facilita o envio de topologias em qualquer idioma. Storm suporta Ruby, Python e muitas outras linguagens. Vamos dar uma olhada na vinculação python.
Ligação Python
Python é uma linguagem de programação de alto nível interpretada de propósito geral, interativa, orientada a objetos. Storm suporta Python para implementar sua topologia. Python oferece suporte a operações de emissão, ancoragem, reconhecimento e registro.
Como você sabe, os parafusos podem ser definidos em qualquer idioma. Bolts escritos em outro idioma são executados como subprocessos e Storm se comunica com esses subprocessos com mensagens JSON por stdin / stdout. Primeiro, pegue um parafuso de amostra WordCount que suporte a vinculação Python.
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
Aqui a aula WordCount implementa o IRichBoltinterface e executando com a implementação python especificado argumento do super método "splitword.py". Agora crie uma implementação python chamada "splitword.py".
import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()
Este é o exemplo de implementação para Python que conta as palavras em uma determinada frase. Da mesma forma, você também pode vincular a outras linguagens de suporte.