Apache Storm - Dreizack

Trident ist eine Erweiterung von Storm. Wie Storm wurde auch Trident von Twitter entwickelt. Der Hauptgrund für die Entwicklung von Trident besteht darin, zusätzlich zu Storm eine Abstraktion auf hoher Ebene sowie eine Stateful-Stream-Verarbeitung und verteilte Abfragen mit geringer Latenz bereitzustellen.

Trident verwendet Ausguss und Bolzen, aber diese Komponenten auf niedriger Ebene werden von Trident vor der Ausführung automatisch generiert. Trident verfügt über Funktionen, Filter, Verknüpfungen, Gruppierungen und Aggregationen.

Trident verarbeitet Streams als eine Reihe von Stapeln, die als Transaktionen bezeichnet werden. Im Allgemeinen liegt die Größe dieser kleinen Chargen in der Größenordnung von Tausenden oder Millionen von Tupeln, abhängig vom Eingabestream. Auf diese Weise unterscheidet sich Trident von Storm, das Tupel für Tupel verarbeitet.

Das Stapelverarbeitungskonzept ist Datenbanktransaktionen sehr ähnlich. Jeder Transaktion wird eine Transaktions-ID zugewiesen. Die Transaktion gilt als erfolgreich, sobald die gesamte Verarbeitung abgeschlossen ist. Ein Fehler bei der Verarbeitung eines der Tupel der Transaktion führt jedoch dazu, dass die gesamte Transaktion erneut übertragen wird. Für jeden Stapel ruft Trident zu Beginn der Transaktion beginCommit auf und schreibt am Ende fest.

Dreizack-Topologie

Die Trident-API bietet eine einfache Option zum Erstellen einer Trident-Topologie mithilfe der Klasse "TridentTopology". Grundsätzlich empfängt die Trident-Topologie einen Eingabestream vom Auslauf und führt eine geordnete Abfolge von Operationen (Filter, Aggregation, Gruppierung usw.) für den Stream aus. Storm Tuple wird durch Trident Tuple ersetzt und Bolts werden durch Operationen ersetzt. Eine einfache Trident-Topologie kann wie folgt erstellt werden:

TridentTopology topology = new TridentTopology();

Dreizack-Tupel

Dreizack-Tupel ist eine benannte Liste von Werten. Die TridentTuple-Schnittstelle ist das Datenmodell einer Trident-Topologie. Die TridentTuple-Schnittstelle ist die grundlegende Dateneinheit, die von einer Trident-Topologie verarbeitet werden kann.

Dreizackauslauf

Der Trident-Auslauf ähnelt dem Storm-Auslauf und bietet zusätzliche Optionen zur Verwendung der Funktionen von Trident. Eigentlich können wir weiterhin den IRichSpout verwenden, den wir in der Storm-Topologie verwendet haben, aber er ist nicht transaktionaler Natur und wir können die Vorteile von Trident nicht nutzen.

Der grundlegende Auslauf mit allen Funktionen zur Nutzung der Funktionen von Trident ist "ITridentSpout". Es unterstützt sowohl transaktionale als auch undurchsichtige Transaktionssemantik. Die anderen Ausläufe sind IBatchSpout, IPartitionedTridentSpout und IOpaquePartitionedTridentSpout.

Zusätzlich zu diesen generischen Ausläufen verfügt Trident über zahlreiche Beispielimplementierungen für Dreizackausläufe. Einer davon ist der FeederBatchSpout-Auslauf, mit dem wir benannte Listen von Dreizack-Tupeln einfach senden können, ohne uns um Stapelverarbeitung, Parallelität usw. kümmern zu müssen.

Die Erstellung von FeederBatchSpout und die Datenzufuhr können wie folgt erfolgen:

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Dreizackoperationen

Trident verlässt sich auf die „Trident-Operation“, um den Eingabestrom von Trident-Tupeln zu verarbeiten. Die Trident-API verfügt über eine Reihe integrierter Vorgänge für die einfache bis komplexe Stream-Verarbeitung. Diese Operationen reichen von der einfachen Validierung bis zur komplexen Gruppierung und Aggregation von Dreizack-Tupeln. Lassen Sie uns die wichtigsten und am häufigsten verwendeten Operationen durchgehen.

Filter

Filter ist ein Objekt, mit dem die Aufgabe der Eingabevalidierung ausgeführt wird. Ein Trident-Filter erhält eine Teilmenge von Trident-Tupelfeldern als Eingabe und gibt entweder true oder false zurück, je nachdem, ob bestimmte Bedingungen erfüllt sind oder nicht. Wenn true zurückgegeben wird, bleibt das Tupel im Ausgabestream erhalten. Andernfalls wird das Tupel aus dem Stream entfernt. Filter erbt grundsätzlich von derBaseFilter Klasse und implementieren die isKeepMethode. Hier ist eine Beispielimplementierung der Filteroperation -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Die Filterfunktion kann in der Topologie mit "jeder" Methode aufgerufen werden. Die Klasse "Felder" kann verwendet werden, um die Eingabe anzugeben (Teilmenge des Dreizack-Tupels). Der Beispielcode lautet wie folgt:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Funktion

Functionist ein Objekt, mit dem eine einfache Operation an einem einzelnen Dreizack-Tupel ausgeführt wird. Es nimmt eine Teilmenge von Dreizack-Tupelfeldern und gibt null oder mehr neue Dreizack-Tupelfelder aus.

Function erbt grundsätzlich von der BaseFunction Klasse und implementiert die executeMethode. Eine Beispielimplementierung ist unten angegeben -

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Genau wie die Filteroperation kann die Funktionsoperation in einer Topologie mit der aufgerufen werden eachMethode. Der Beispielcode lautet wie folgt:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Anhäufung

Aggregation ist ein Objekt, mit dem Aggregationsvorgänge für einen Eingabestapel, eine Partition oder einen Stream ausgeführt werden. Trident hat drei Arten der Aggregation. Sie sind wie folgt -

  • aggregate- Aggregiert jede Charge Dreizack-Tupel isoliert. Während des Aggregationsprozesses werden die Tupel zunächst mithilfe der globalen Gruppierung neu partitioniert, um alle Partitionen desselben Stapels in einer einzigen Partition zu kombinieren.

  • partitionAggregate- Aggregiert jede Partition anstelle des gesamten Dreizack-Tupelstapels. Die Ausgabe des Partitionsaggregats ersetzt das Eingabetupel vollständig. Die Ausgabe des Partitionsaggregats enthält ein einzelnes Feldtupel.

  • persistentaggregate - Aggregiert alle Dreizack-Tupel über alle Stapel hinweg und speichert das Ergebnis entweder im Speicher oder in der Datenbank.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Die Aggregationsoperation kann entweder mit CombinerAggregator, ReducerAggregator oder einer generischen Aggregator-Schnittstelle erstellt werden. Der im obigen Beispiel verwendete "count" -Aggregator ist einer der integrierten Aggregatoren. Er wird mit "CombinerAggregator" implementiert. Die Implementierung ist wie folgt:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Gruppierung

Die Gruppierungsoperation ist eine eingebaute Operation und kann von der aufgerufen werden groupByMethode. Die groupBy-Methode partitioniert den Stream neu, indem sie eine partitionBy für die angegebenen Felder ausführt. Anschließend gruppiert sie innerhalb jeder Partition Tupel, deren Gruppenfelder gleich sind. Normalerweise verwenden wir "groupBy" zusammen mit "persistentAggregate", um die gruppierte Aggregation zu erhalten. Der Beispielcode lautet wie folgt:

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Zusammenführen und Beitreten

Das Zusammenführen und Verbinden kann mithilfe der Methode "Zusammenführen" bzw. "Verbinden" erfolgen. Durch das Zusammenführen werden ein oder mehrere Streams kombiniert. Das Verbinden ähnelt dem Zusammenführen, mit der Ausnahme, dass beim Verbinden das Dreizack-Tupelfeld von beiden Seiten verwendet wird, um zwei Streams zu überprüfen und zu verbinden. Darüber hinaus funktioniert das Verbinden nur auf Chargenebene. Der Beispielcode lautet wie folgt:

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Zustandswartung

Trident bietet einen Mechanismus für die staatliche Aufrechterhaltung. Statusinformationen können in der Topologie selbst gespeichert werden, andernfalls können Sie sie auch in einer separaten Datenbank speichern. Der Grund besteht darin, den Status beizubehalten, dass das fehlgeschlagene Tupel wiederholt wird, wenn ein Tupel während der Verarbeitung ausfällt. Dies führt beim Aktualisieren des Status zu einem Problem, da Sie nicht sicher sind, ob der Status dieses Tupels zuvor aktualisiert wurde oder nicht. Wenn das Tupel vor dem Aktualisieren des Status fehlgeschlagen ist, wird der Status stabil, wenn Sie das Tupel erneut versuchen. Wenn das Tupel jedoch nach dem Aktualisieren des Status fehlgeschlagen ist, wird durch erneutes Versuchen desselben Tupels die Anzahl in der Datenbank erneut erhöht und der Status instabil. Die folgenden Schritte müssen ausgeführt werden, um sicherzustellen, dass eine Nachricht nur einmal verarbeitet wird:

  • Verarbeiten Sie die Tupel in kleinen Mengen.

  • Weisen Sie jeder Charge eine eindeutige ID zu. Wenn der Stapel erneut versucht wird, erhält er dieselbe eindeutige ID.

  • Die Statusaktualisierungen werden nach Stapeln sortiert. Beispielsweise ist die Statusaktualisierung des zweiten Stapels erst möglich, wenn die Statusaktualisierung für den ersten Stapel abgeschlossen ist.

Verteilter RPC

Verteilter RPC wird verwendet, um das Ergebnis aus der Trident-Topologie abzufragen und abzurufen. Storm verfügt über einen eingebauten verteilten RPC-Server. Der verteilte RPC-Server empfängt die RPC-Anforderung vom Client und übergibt sie an die Topologie. Die Topologie verarbeitet die Anforderung und sendet das Ergebnis an den verteilten RPC-Server, der vom verteilten RPC-Server an den Client umgeleitet wird. Die verteilte RPC-Abfrage von Trident wird wie eine normale RPC-Abfrage ausgeführt, mit der Ausnahme, dass diese Abfragen parallel ausgeführt werden.

Wann sollte man Trident verwenden?

Wie in vielen Anwendungsfällen können wir eine Abfrage nur einmal verarbeiten, indem wir eine Topologie in Trident schreiben. Andererseits wird es im Fall von Storm schwierig sein, genau eine einmalige Verarbeitung zu erreichen. Daher ist Trident für Anwendungsfälle nützlich, in denen Sie genau einmal eine Verarbeitung benötigen. Trident ist nicht für alle Anwendungsfälle geeignet, insbesondere für Hochleistungsanwendungsfälle, da es Storm komplexer macht und den Status verwaltet.

Arbeitsbeispiel für Dreizack

Wir werden unsere im vorherigen Abschnitt erarbeitete Call Log Analyzer-Anwendung in das Trident-Framework konvertieren. Die Anwendung von Trident ist dank der High-Level-API im Vergleich zu normalem Sturm relativ einfach. Storm ist grundsätzlich erforderlich, um eine der Funktionen, Filter, Aggregate, GroupBy, Join und Merge in Trident auszuführen. Zum Schluss starten wir den DRPC Server mit demLocalDRPC Klasse und suchen Sie ein Schlüsselwort mit dem execute Methode der LocalDRPC-Klasse.

Formatieren der Anrufinformationen

Der Zweck der FormatCall-Klasse besteht darin, die Anrufinformationen zu formatieren, die "Anrufernummer" und "Empfängernummer" umfassen. Der vollständige Programmcode lautet wie folgt:

Codierung: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Der Zweck der CSVSplit-Klasse besteht darin, die Eingabezeichenfolge basierend auf "Komma (,)" zu teilen und jedes Wort in der Zeichenfolge auszugeben. Diese Funktion wird verwendet, um das Eingabeargument der verteilten Abfrage zu analysieren. Der vollständige Code lautet wie folgt:

Codierung: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

Dies ist die Hauptanwendung. Zunächst initialisiert die Anwendung die TridentTopology und füttert Anruferinformationen mitFeederBatchSpout. Trident-Topologiestream kann mit dem erstellt werdennewStreamMethode der TridentTopology-Klasse. In ähnlicher Weise kann ein DRPC-Stream mit Trident-Topologie mithilfe von erstellt werdennewDRCPStreamMethode der TridentTopology-Klasse. Ein einfacher DRCP-Server kann mit der LocalDRPC-Klasse erstellt werden.LocalDRPChat eine Ausführungsmethode zum Suchen eines Schlüsselworts. Der vollständige Code ist unten angegeben.

Codierung: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Erstellen und Ausführen der Anwendung

Die vollständige Anwendung verfügt über drei Java-Codes. Sie sind wie folgt -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

Die Anwendung kann mit dem folgenden Befehl erstellt werden:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Die Anwendung kann mit dem folgenden Befehl ausgeführt werden:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Ausgabe

Sobald die Anwendung gestartet ist, gibt die Anwendung die vollständigen Details zum Cluster-Startprozess, zur Betriebsverarbeitung, zu DRPC-Server- und Client-Informationen und schließlich zum Cluster-Herunterfahren aus. Diese Ausgabe wird wie unten gezeigt auf der Konsole angezeigt.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends