Apache Storm - Трезубец

Trident - это расширение Storm. Как и Storm, Trident также был разработан Twitter. Основная причина разработки Trident - предоставить высокоуровневую абстракцию поверх Storm вместе с потоковой обработкой с сохранением состояния и распределенными запросами с низкой задержкой.

Trident использует носик и болт, но эти низкоуровневые компоненты автоматически генерируются Trident перед выполнением. Trident имеет функции, фильтры, объединения, группировку и агрегирование.

Trident обрабатывает потоки как серию пакетов, которые называются транзакциями. Обычно размер этих небольших пакетов составляет порядка тысяч или миллионов кортежей, в зависимости от входного потока. Таким образом, Trident отличается от Storm, который выполняет обработку кортежа за кортежем.

Концепция пакетной обработки очень похожа на транзакции базы данных. Каждой транзакции присваивается идентификатор транзакции. Транзакция считается успешной после завершения всей ее обработки. Однако сбой в обработке одного из кортежей транзакции приведет к повторной передаче всей транзакции. Для каждого пакета Trident вызовет beginCommit в начале транзакции и зафиксирует ее в конце.

Топология трезубца

Trident API предоставляет простой вариант создания топологии Trident с использованием класса «TridentTopology». По сути, топология Trident получает входной поток от spout и выполняет упорядоченную последовательность операций (фильтрация, агрегирование, группировка и т. Д.) В потоке. Кортеж бури заменен кортежем трезубца, а болты заменены операциями. Простую топологию Trident можно создать следующим образом:

TridentTopology topology = new TridentTopology();

Кортежи трезубца

Кортеж Trident - это именованный список значений. Интерфейс TridentTuple - это модель данных топологии Trident. Интерфейс TridentTuple - это базовая единица данных, которая может обрабатываться топологией Trident.

Трезубец Носик

Носик Trident похож на носик Storm, но с дополнительными опциями для использования функций Trident. Фактически, мы все еще можем использовать IRichSpout, который мы использовали в топологии Storm, но он будет нетранзакционным по своей природе, и мы не сможем использовать преимущества, предоставляемые Trident.

Базовым изливом, имеющим все функции для использования функций Trident, является «ITridentSpout». Он поддерживает как транзакционную, так и непрозрачную транзакционную семантику. Другие носики - это IBatchSpout, IPartitionedTridentSpout и IOpaquePartitionedTridentSpout.

В дополнение к этим стандартным носикам, Trident предлагает множество примеров реализации носика трезубца. Одним из них является носик FeederBatchSpout, который мы можем использовать для простой отправки именованного списка кортежей трезубца, не беспокоясь о пакетной обработке, параллелизме и т. Д.

Создание FeederBatchSpout и подача данных могут выполняться, как показано ниже -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Трайдент Операции

Trident полагается на «Операцию Trident» для обработки входного потока кортежей трезубца. Trident API имеет ряд встроенных операций для обработки потоков от простого к сложному. Эти операции варьируются от простой проверки до сложной группировки и агрегирования кортежей трезубца. Разберемся с наиболее важными и часто используемыми операциями.

Фильтр

Фильтр - это объект, используемый для выполнения задачи проверки ввода. Фильтр Trident получает в качестве входных данных подмножество полей кортежа трезубца и возвращает либо истину, либо ложь в зависимости от того, выполняются определенные условия или нет. Если возвращается истина, то кортеж сохраняется в потоке вывода; в противном случае кортеж удаляется из потока. Фильтр будет в основном унаследован отBaseFilter класс и реализовать isKeepметод. Вот пример реализации операции фильтра -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Функцию фильтра можно вызвать в топологии с помощью метода «each». Класс «Поля» может использоваться для указания входных данных (подмножество кортежа трезубца). Пример кода выглядит следующим образом -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Функция

Function- объект, используемый для выполнения простой операции с одним кортежем трезубца. Он принимает подмножество полей кортежа трезубца и генерирует ноль или более новых полей кортежа трезубца.

Function в основном наследуется от BaseFunction класс и реализует executeметод. Пример реализации приведен ниже -

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Так же, как и операция фильтра, операция функции может быть вызвана в топологии с использованием eachметод. Пример кода выглядит следующим образом -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Агрегация

Агрегация - это объект, используемый для выполнения операций агрегирования над входным пакетом, разделом или потоком. Trident имеет три типа агрегации. Они следующие -

  • aggregate- Агрегирует отдельно каждую партию кортежа трезубца. Во время процесса агрегирования кортежи изначально повторно разбиваются на разделы с использованием глобальной группировки для объединения всех разделов одного и того же пакета в один раздел.

  • partitionAggregate- Агрегирует каждый раздел вместо всей партии кортежа трезубца. Выходные данные агрегата раздела полностью заменяют входной кортеж. Выходные данные агрегата раздела содержат кортеж из одного поля.

  • persistentaggregate - Агрегирует по всему кортежу трезубца по всему пакету и сохраняет результат либо в памяти, либо в базе данных.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Операцию агрегирования можно создать с помощью CombinerAggregator, ReducerAggregator или универсального интерфейса Aggregator. Агрегатор «count», используемый в приведенном выше примере, является одним из встроенных агрегаторов. Он реализован с помощью «CombinerAggregator». Реализация выглядит следующим образом:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Группировка

Операция группировки является встроенной и может быть вызвана groupByметод. Метод groupBy перераспределяет поток, выполняя partitionBy для указанных полей, а затем внутри каждого раздела он группирует кортежи, чьи групповые поля равны. Обычно мы используем groupBy вместе с persistentAggregate, чтобы получить сгруппированное агрегирование. Пример кода выглядит следующим образом -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Слияние и присоединение

Слияние и объединение может быть выполнено с использованием методов «слияния» и «соединения» соответственно. Слияние объединяет один или несколько потоков. Объединение похоже на объединение, за исключением того факта, что объединение использует поле кортежа трезубца с обеих сторон для проверки и объединения двух потоков. Причем присоединение будет работать только на уровне партии. Пример кода выглядит следующим образом -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Государственное обслуживание

Trident предоставляет механизм для поддержания состояния. Информация о состоянии может храниться в самой топологии, в противном случае вы также можете хранить ее в отдельной базе данных. Причина в том, чтобы поддерживать состояние, при котором, если какой-либо кортеж выходит из строя во время обработки, он повторяется. Это создает проблему при обновлении состояния, поскольку вы не уверены, обновлялось ли состояние этого кортежа ранее или нет. Если кортеж не удалось обновить до обновления состояния, то повторная попытка кортежа сделает состояние стабильным. Однако, если кортеж завершился неудачно после обновления состояния, то повторная попытка того же кортежа снова увеличит счетчик в базе данных и сделает состояние нестабильным. Чтобы сообщение было обработано только один раз, необходимо выполнить следующие шаги:

  • Обработайте кортежи небольшими партиями.

  • Присвойте уникальный идентификатор каждой партии. Если пакет повторяется, ему присваивается тот же уникальный идентификатор.

  • Обновления состояния упорядочиваются по партиям. Например, обновление состояния второго пакета будет невозможно, пока не завершится обновление состояния для первого пакета.

Распределенный RPC

Распределенный RPC используется для запроса и получения результата из топологии Trident. Storm имеет встроенный распределенный сервер RPC. Распределенный сервер RPC получает запрос RPC от клиента и передает его в топологию. Топология обрабатывает запрос и отправляет результат распределенному серверу RPC, который перенаправляется распределенным сервером RPC клиенту. Распределенный RPC-запрос Trident выполняется как обычный RPC-запрос, за исключением того факта, что эти запросы выполняются параллельно.

Когда использовать трезубец?

Как и во многих случаях использования, если требуется обработать запрос только один раз, мы можем добиться этого, написав топологию в Trident. С другой стороны, в случае с Storm будет сложно добиться только однократной обработки. Следовательно, Trident будет полезен в тех случаях, когда вам требуется только однократная обработка. Trident подходит не для всех случаев использования, особенно для высокопроизводительных вариантов использования, поскольку он усложняет Storm и управляет состоянием.

Рабочий пример трезубца

Мы собираемся преобразовать наше приложение-анализатор журнала вызовов, разработанное в предыдущем разделе, в платформу Trident. Приложение Trident будет относительно простым по сравнению с обычным штормом благодаря высокоуровневому API. Storm в основном потребуется для выполнения любой из операций Function, Filter, Aggregate, GroupBy, Join и Merge в Trident. Наконец, мы запустим сервер DRPC, используяLocalDRPC class и выполните поиск по ключевому слову, используя execute метод класса LocalDRPC.

Форматирование информации о звонке

Целью класса FormatCall является форматирование информации о вызове, содержащей «номер вызывающего абонента» и «номер получателя». Полный программный код выглядит следующим образом -

Кодирование: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Целью класса CSVSplit является разделение входной строки на основе «запятой (,)» и выдача каждого слова в строке. Эта функция используется для анализа входного аргумента распределенного запроса. Полный код выглядит следующим образом -

Кодирование: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Анализатор журналов

Это основное приложение. Первоначально приложение инициализирует TridentTopology и передает информацию о вызывающем абоненте, используяFeederBatchSpout. Поток топологии Trident может быть создан с помощьюnewStreamметод класса TridentTopology. Точно так же поток DRPC топологии Trident может быть создан с помощьюnewDRCPStreamметод класса TridentTopology. Простой сервер DRCP можно создать с помощью класса LocalDRPC.LocalDRPCимеет метод выполнения для поиска по ключевому слову. Полный код приведен ниже.

Кодирование: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Сборка и запуск приложения

Полное приложение содержит три кода Java. Они следующие -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

Приложение может быть создано с помощью следующей команды -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Приложение можно запустить с помощью следующей команды -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Выход

После запуска приложения оно выведет полную информацию о процессе запуска кластера, обработке операций, информации о сервере DRPC и клиенте и, наконец, о процессе завершения работы кластера. Этот вывод будет отображаться на консоли, как показано ниже.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends