Tempestade Apache - Tridente

Trident é uma extensão do Storm. Assim como o Storm, o Trident também foi desenvolvido pelo Twitter. A principal razão por trás do desenvolvimento do Trident é fornecer uma abstração de alto nível sobre o Storm, juntamente com processamento de stream com estado e consultas distribuídas de baixa latência.

O Trident usa bico e parafuso, mas esses componentes de baixo nível são gerados automaticamente pelo Trident antes da execução. O Trident possui funções, filtros, junções, agrupamento e agregação.

O Trident processa fluxos como uma série de lotes que são chamados de transações. Geralmente, o tamanho desses pequenos lotes será da ordem de milhares ou milhões de tuplas, dependendo do fluxo de entrada. Dessa forma, o Trident é diferente do Storm, que executa o processamento tupla por tupla.

O conceito de processamento em lote é muito semelhante às transações de banco de dados. Cada transação recebe um ID de transação. A transação é considerada bem sucedida, uma vez que todo o seu processamento seja concluído. No entanto, uma falha no processamento de uma das tuplas da transação fará com que toda a transação seja retransmitida. Para cada lote, o Trident chamará beginCommit no início da transação e fará o commit no final dela.

Topologia Trident

A API Trident expõe uma opção fácil para criar topologia Trident usando a classe “TridentTopology”. Basicamente, a topologia Trident recebe o fluxo de entrada do spout e faz a sequência ordenada de operação (filtro, agregação, agrupamento, etc.) no fluxo. Storm Tuple é substituído por Trident Tuple e Bolts são substituídos por operações. Uma topologia Trident simples pode ser criada da seguinte forma -

TridentTopology topology = new TridentTopology();

Tuplas de tridente

A tupla de tridente é uma lista nomeada de valores. A interface TridentTuple é o modelo de dados de uma topologia Trident. A interface TridentTuple é a unidade básica de dados que pode ser processada por uma topologia Trident.

Bico Trident

O bico Trident é semelhante ao bico Storm, com opções adicionais para usar os recursos do Trident. Na verdade, ainda podemos usar o IRichSpout, que usamos na topologia Storm, mas será de natureza não transacional e não poderemos usar as vantagens fornecidas pelo Trident.

O bico básico com todas as funcionalidades para usar os recursos do Trident é o "ITridentSpout". Ele oferece suporte a semânticas transacionais opacas e transacionais. Os outros spouts são IBatchSpout, IPartitionedTridentSpout e IOpaquePartitionedTridentSpout.

Além desses bicos genéricos, o Trident tem muitos exemplos de implementação de bicos tridentes. Um deles é o spout FeederBatchSpout, que podemos usar para enviar listas nomeadas de tuplas de tridentes facilmente, sem nos preocuparmos com processamento em lote, paralelismo etc.

A criação do FeederBatchSpout e a alimentação de dados podem ser feitas conforme mostrado abaixo -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Operações Trident

O Trident depende da “Operação Trident” para processar o fluxo de entrada das tuplas de tridente. A API Trident tem várias operações embutidas para lidar com o processamento de fluxo simples a complexo. Essas operações variam de validação simples a agrupamento complexo e agregação de tuplas de tridentes. Vamos examinar as operações mais importantes e mais usadas.

Filtro

O filtro é um objeto utilizado para realizar a tarefa de validação de entrada. Um filtro Trident obtém um subconjunto de campos de tupla trident como entrada e retorna verdadeiro ou falso dependendo se certas condições são satisfeitas ou não. Se true for retornado, a tupla será mantida no fluxo de saída; caso contrário, a tupla é removida do fluxo. O filtro basicamente herdará doBaseFilter Classifique e implemente o isKeepmétodo. Aqui está um exemplo de implementação da operação do filtro -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

A função de filtro pode ser chamada na topologia usando o método “each”. A classe “Fields” pode ser usada para especificar a entrada (subconjunto da tupla tridente). O código de amostra é o seguinte -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Função

Functioné um objeto usado para realizar uma operação simples em uma única tupla de tridente. Ele pega um subconjunto de campos de tupla tridente e emite zero ou mais novos campos de tupla tridente.

Function basicamente herda do BaseFunction classe e implementa o executemétodo. Um exemplo de implementação é fornecido abaixo -

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Assim como a operação de filtro, a operação de função pode ser chamada em uma topologia usando o eachmétodo. O código de amostra é o seguinte -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Agregação

Agregação é um objeto usado para executar operações de agregação em um lote de entrada ou partição ou fluxo. O Trident possui três tipos de agregação. Eles são os seguintes -

  • aggregate- Agrega cada lote de tupla de tridente isoladamente. Durante o processo de agregação, as tuplas são inicialmente reparticionadas usando o agrupamento global para combinar todas as partições do mesmo lote em uma única partição.

  • partitionAggregate- Agrega cada partição em vez de todo o lote de tupla tridente. A saída do agregado de partição substitui completamente a tupla de entrada. A saída do agregado de partição contém uma única tupla de campo.

  • persistentaggregate - Agrega em todas as tuplas de tridente em todo o lote e armazena o resultado na memória ou no banco de dados.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

A operação de agregação pode ser criada usando CombinerAggregator, ReducerAggregator ou interface Aggregator genérica. O agregador "count" usado no exemplo acima é um dos agregadores integrados. Ele é implementado usando "CombinerAggregator". A implementação é a seguinte -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Agrupamento

A operação de agrupamento é uma operação embutida e pode ser chamada pelo groupBymétodo. O método groupBy reparticiona o fluxo fazendo um partitionBy nos campos especificados e, em seguida, em cada partição, ele agrupa tuplas cujos campos de grupo são iguais. Normalmente, usamos “groupBy” junto com “persistentAggregate” para obter a agregação agrupada. O código de amostra é o seguinte -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Mesclar e unir

A fusão e a junção podem ser feitas usando os métodos “merge” e “join” respectivamente. A mesclagem combina um ou mais fluxos. A união é semelhante à fusão, exceto pelo fato de que a união usa um campo de tupla tridente de ambos os lados para verificar e unir dois fluxos. Além disso, a união funcionará apenas no nível de lote. O código de amostra é o seguinte -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Manutenção do Estado

O Trident fornece um mecanismo para manutenção do estado. As informações de estado podem ser armazenadas na própria topologia, caso contrário, você também pode armazená-las em um banco de dados separado. O motivo é manter um estado de que, se alguma tupla falhar durante o processamento, a tupla com falha será repetida. Isso cria um problema ao atualizar o estado, porque você não tem certeza se o estado desta tupla foi atualizado anteriormente ou não. Se a tupla falhou antes de atualizar o estado, tentar novamente a tupla tornará o estado estável. No entanto, se a tupla falhar após a atualização do estado, tentar novamente a mesma tupla aumentará novamente a contagem no banco de dados e tornará o estado instável. É necessário realizar as seguintes etapas para garantir que uma mensagem seja processada apenas uma vez -

  • Processe as tuplas em pequenos lotes.

  • Atribua um ID exclusivo a cada lote. Se o lote for repetido, ele receberá o mesmo ID exclusivo.

  • As atualizações de estado são ordenadas entre lotes. Por exemplo, a atualização de estado do segundo lote não será possível até que a atualização de estado do primeiro lote seja concluída.

RPC distribuído

O RPC distribuído é usado para consultar e recuperar o resultado da topologia Trident. Storm tem um servidor RPC distribuído embutido. O servidor RPC distribuído recebe a solicitação RPC do cliente e a passa para a topologia. A topologia processa a solicitação e envia o resultado para o servidor RPC distribuído, que é redirecionado pelo servidor RPC distribuído para o cliente. A consulta RPC distribuída do Trident é executada como uma consulta RPC normal, exceto pelo fato de que essas consultas são executadas em paralelo.

Quando usar o Trident?

Como em muitos casos de uso, se o requisito é processar uma consulta apenas uma vez, podemos alcançá-lo escrevendo uma topologia no Trident. Por outro lado, será difícil conseguir exatamente um processamento no caso do Storm. Conseqüentemente, o Trident será útil para os casos de uso em que você precisa de exatamente um processamento. O Trident não é para todos os casos de uso, especialmente casos de uso de alto desempenho, porque adiciona complexidade ao Storm e gerencia o estado.

Exemplo de trabalho do tridente

Vamos converter nosso aplicativo analisador de registro de chamadas desenvolvido na seção anterior para a estrutura Trident. O aplicativo Trident será relativamente fácil em comparação com o plain storm, graças à sua API de alto nível. Storm será basicamente necessário para executar qualquer uma das operações de Função, Filtro, Agregação, GroupBy, Join e Merge no Trident. Finalmente, iniciaremos o servidor DRPC usando oLocalDRPC classe e pesquisar alguma palavra-chave usando o execute método da classe LocalDRPC.

Formatando as informações da chamada

O objetivo da classe FormatCall é formatar as informações da chamada compreendendo “Número do chamador” e “Número do receptor”. O código completo do programa é o seguinte -

Codificação: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

O objetivo da classe CSVSplit é dividir a string de entrada com base em “vírgula (,)” e emitir todas as palavras da string. Esta função é usada para analisar o argumento de entrada da consulta distribuída. O código completo é o seguinte -

Codificação: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

Este é o aplicativo principal. Inicialmente, o aplicativo irá inicializar o TridentTopology e alimentar as informações do chamador usandoFeederBatchSpout. O fluxo de topologia Trident pode ser criado usando onewStreammétodo da classe TridentTopology. Da mesma forma, o fluxo DRPC da topologia Trident pode ser criado usando onewDRCPStreammétodo da classe TridentTopology. Um servidor DRCP simples pode ser criado usando a classe LocalDRPC.LocalDRPCtem método execute para pesquisar alguma palavra-chave. O código completo é fornecido abaixo.

Codificação: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Construindo e executando o aplicativo

O aplicativo completo possui três códigos Java. Eles são os seguintes -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

O aplicativo pode ser construído usando o seguinte comando -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

O aplicativo pode ser executado usando o seguinte comando -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Resultado

Assim que o aplicativo for iniciado, ele produzirá os detalhes completos sobre o processo de inicialização do cluster, processamento de operações, servidor DRPC e informações do cliente e, finalmente, o processo de encerramento do cluster. Esta saída será exibida no console conforme mostrado abaixo.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends