Apache Storm - Trisula

Trident adalah perpanjangan dari Storm. Seperti Storm, Trident juga dikembangkan oleh Twitter. Alasan utama di balik pengembangan Trident adalah untuk menyediakan abstraksi tingkat tinggi di atas Storm bersama dengan pemrosesan aliran stateful dan kueri terdistribusi latensi rendah.

Trident menggunakan cerat dan baut, tetapi komponen tingkat rendah ini dibuat secara otomatis oleh Trident sebelum dieksekusi. Trident memiliki fungsi, filter, gabungan, pengelompokan, dan agregasi.

Trident memproses aliran sebagai rangkaian batch yang disebut sebagai transaksi. Umumnya ukuran batch kecil itu akan berada di urutan ribuan atau jutaan tupel, tergantung pada aliran input. Dengan cara ini, Trident berbeda dari Storm, yang melakukan pemrosesan tuple-by-tuple.

Konsep pemrosesan batch sangat mirip dengan transaksi database. Setiap transaksi diberi ID transaksi. Transaksi dianggap berhasil, setelah semua prosesnya selesai. Namun, kegagalan dalam memproses salah satu tupel transaksi akan menyebabkan seluruh transaksi dikirim ulang. Untuk setiap batch, Trident akan memanggil beginCommit di awal transaksi, dan berkomitmen di akhir transaksi.

Topologi Trisula

Trident API memperlihatkan opsi mudah untuk membuat topologi Trident menggunakan kelas “TridentTopology”. Pada dasarnya, topologi Trident menerima aliran input dari cerat dan melakukan urutan operasi yang teratur (filter, agregasi, pengelompokan, dll.,) Di aliran. Storm Tuple digantikan oleh Trident Tuple dan Bolts diganti dengan pengoperasian. Topologi Trident sederhana dapat dibuat sebagai berikut -

TridentTopology topology = new TridentTopology();

Trident Tuple

Trident tuple adalah daftar nilai bernama. Antarmuka TridentTuple adalah model data dari topologi Trident. Antarmuka TridentTuple adalah unit dasar data yang dapat diproses oleh topologi Trident.

Trident Spout

Trident spout mirip dengan Storm spout, dengan opsi tambahan untuk menggunakan fitur Trident. Sebenarnya kita masih bisa menggunakan IRichSpout yang sudah kita pakai di topologi Storm, tapi sifatnya non-transaksional dan kita tidak akan bisa menggunakan kelebihan yang diberikan oleh Trident.

Cerat dasar yang memiliki semua fungsi untuk menggunakan fitur Trident adalah "ITridentSpout". Ini mendukung semantik transaksional transaksional dan buram. Spout lainnya adalah IBatchSpout, IPartitionedTridentSpout, dan IOpaquePartitionedTridentSpout.

Selain cerat generik ini, Trident memiliki banyak contoh penerapan cerat trisula. Salah satunya adalah FeederBatchSpout spout, yang dapat kita gunakan untuk mengirim daftar tupel trident dengan mudah tanpa perlu khawatir tentang pemrosesan batch, paralelisme, dll.

Pembuatan FeederBatchSpout dan pengumpanan data dapat dilakukan seperti yang ditunjukkan di bawah ini -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Operasi Trident

Trident mengandalkan "Operasi Trident" untuk memproses aliran input tupel trisula. Trident API memiliki sejumlah operasi built-in untuk menangani pemrosesan streaming yang sederhana hingga kompleks. Operasi ini berkisar dari validasi sederhana hingga pengelompokan kompleks dan agregasi tupel trisula. Mari kita bahas operasi yang paling penting dan sering digunakan.

Saring

Filter adalah sebuah objek yang digunakan untuk melakukan tugas validasi input. Filter Trident mendapatkan subset dari bidang tupel trisula sebagai masukan dan menampilkan benar atau salah bergantung pada apakah kondisi tertentu terpenuhi atau tidak. Jika true dikembalikan, maka tupel disimpan dalam aliran keluaran; jika tidak, tupel akan dihapus dari aliran. Filter pada dasarnya akan mewarisi dariBaseFilter kelas dan menerapkan isKeepmetode. Berikut adalah contoh implementasi operasi filter -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Fungsi filter dapat dipanggil dalam topologi menggunakan metode “each”. Kelas “Fields” dapat digunakan untuk menentukan input (bagian dari tupel trisula). Kode sampelnya adalah sebagai berikut -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Fungsi

Functionadalah objek yang digunakan untuk melakukan operasi sederhana pada tupel trisula tunggal. Dibutuhkan subset bidang tupel trisula dan memancarkan nol atau lebih bidang tupel trisula baru.

Function pada dasarnya mewarisi dari BaseFunction kelas dan mengimplementasikan executemetode. Contoh implementasi diberikan di bawah ini -

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Sama seperti operasi Filter, operasi Fungsi dapat dipanggil dalam topologi menggunakan eachmetode. Kode sampelnya adalah sebagai berikut -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Pengumpulan

Agregasi adalah objek yang digunakan untuk melakukan operasi agregasi pada batch input atau partisi atau aliran. Trident memiliki tiga jenis agregasi. Mereka adalah sebagai berikut -

  • aggregate- Mengumpulkan setiap batch tupel trisula secara terpisah. Selama proses agregat, tupel awalnya dipartisi ulang menggunakan pengelompokan global untuk menggabungkan semua partisi dari batch yang sama ke dalam satu partisi.

  • partitionAggregate- Mengumpulkan setiap partisi, bukan seluruh batch tupel trisula. Keluaran agregat partisi sepenuhnya menggantikan tupel masukan. Keluaran agregat partisi berisi tupel bidang tunggal.

  • persistentaggregate - Agregat pada semua tupel trisula di semua batch dan menyimpan hasilnya di memori atau database.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Operasi agregasi dapat dibuat menggunakan antarmuka CombinerAggregator, ReducerAggregator, atau generik Agregator. Agregator "count" yang digunakan dalam contoh di atas adalah salah satu agregator build-in. Agregator ini diterapkan menggunakan "CombinerAggregator". Penerapannya adalah sebagai berikut -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Pengelompokan

Operasi pengelompokan adalah operasi bawaan dan dapat dipanggil oleh groupBymetode. Metode groupBy mempartisi ulang aliran dengan melakukan partisiBy pada bidang yang ditentukan, dan kemudian dalam setiap partisi, ia mengelompokkan tupel bersama yang bidang kelompoknya sama. Biasanya, kami menggunakan "groupBy" bersama dengan "persistentAggregate" untuk mendapatkan agregasi yang dikelompokkan. Kode sampelnya adalah sebagai berikut -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Penggabungan dan Bergabung

Penggabungan dan penggabungan dapat dilakukan dengan menggunakan metode “merge” dan “join”. Penggabungan menggabungkan satu atau lebih aliran. Penggabungan mirip dengan penggabungan, kecuali fakta bahwa penggabungan menggunakan bidang tupel trisula dari kedua sisi untuk memeriksa dan menggabungkan dua aliran. Selain itu, bergabung hanya akan bekerja di bawah tingkat batch. Kode sampelnya adalah sebagai berikut -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Pemeliharaan Negara

Trident menyediakan mekanisme untuk pemeliharaan negara. Informasi status dapat disimpan di topologi itu sendiri, jika tidak, Anda juga dapat menyimpannya di database terpisah. Alasannya adalah untuk mempertahankan keadaan bahwa jika ada tupel yang gagal selama pemrosesan, maka tupel yang gagal akan dicoba kembali. Ini menimbulkan masalah saat memperbarui status karena Anda tidak yakin apakah status tupel ini telah diperbarui sebelumnya atau tidak. Jika tupel gagal sebelum memperbarui status, maka mencoba kembali tupel akan membuat status stabil. Namun, jika tupel gagal setelah memperbarui status, mencoba kembali tupel yang sama akan meningkatkan jumlah dalam database dan membuat status tidak stabil. Seseorang perlu melakukan langkah-langkah berikut untuk memastikan pesan diproses hanya sekali -

  • Proses tupel dalam kelompok kecil.

  • Tetapkan ID unik untuk setiap kelompok. Jika batch dicoba lagi, batch tersebut diberi ID unik yang sama.

  • Pembaruan status dipesan di antara batch. Misalnya, pembaruan status untuk batch kedua tidak akan memungkinkan hingga pembaruan status untuk batch pertama telah selesai.

RPC Terdistribusi

RPC terdistribusi digunakan untuk melakukan kueri dan mengambil hasil dari topologi Trident. Storm memiliki server RPC terdistribusi bawaan. Server RPC terdistribusi menerima permintaan RPC dari klien dan meneruskannya ke topologi. Topologi memproses permintaan dan mengirimkan hasilnya ke server RPC terdistribusi, yang diarahkan oleh server RPC terdistribusi ke klien. Kueri RPC terdistribusi Trident mengeksekusi seperti kueri RPC normal, kecuali fakta bahwa kueri ini dijalankan secara paralel.

Kapan Menggunakan Trident?

Seperti dalam banyak kasus penggunaan, jika persyaratannya adalah memproses kueri hanya sekali, kita bisa mencapainya dengan menulis topologi di Trident. Di sisi lain, akan sulit untuk mencapai proses tepat setelah diproses dalam kasus Storm. Karenanya Trident akan berguna untuk kasus penggunaan di mana Anda memerlukan tepat sekali pemrosesan. Trident tidak untuk semua kasus penggunaan, terutama kasus penggunaan berperforma tinggi karena Trident menambahkan kerumitan pada Storm dan mengelola status.

Contoh Pengerjaan Trident

Kami akan mengonversi aplikasi penganalisis log panggilan kami yang bekerja di bagian sebelumnya ke kerangka Trident. Aplikasi Trident akan relatif lebih mudah dibandingkan dengan badai biasa, berkat API tingkat tinggi. Storm pada dasarnya akan diminta untuk melakukan salah satu dari operasi Fungsi, Filter, Agregat, GroupBy, Gabung, dan Gabung di Trident. Akhirnya kita akan memulai Server DRPC menggunakanLocalDRPC kelas dan cari beberapa kata kunci menggunakan execute metode kelas LocalDRPC.

Memformat informasi panggilan

Tujuan dari kelas FormatCall adalah untuk memformat informasi panggilan yang terdiri dari "Nomor pemanggil" dan "nomor Penerima". Kode program lengkapnya adalah sebagai berikut -

Coding: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Tujuan dari kelas CSVSplit adalah untuk membagi string input berdasarkan "koma (,)" dan mengeluarkan setiap kata dalam string tersebut. Fungsi ini digunakan untuk mengurai argumen input kueri terdistribusi. Kode lengkapnya adalah sebagai berikut -

Pengkodean: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Penganalisis Log

Ini adalah aplikasi utama. Awalnya, aplikasi akan menginisialisasi TridentTopology dan memberi makan informasi pemanggil menggunakanFeederBatchSpout. Aliran topologi trisula dapat dibuat menggunakannewStreammetode kelas TridentTopology. Demikian pula, aliran DRPC topologi Trident dapat dibuat menggunakannewDRCPStreammetode kelas TridentTopology. Server DRCP sederhana dapat dibuat menggunakan kelas LocalDRPC.LocalDRPCtelah mengeksekusi metode untuk mencari beberapa kata kunci. Kode lengkap diberikan di bawah ini.

Coding: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Membangun dan Menjalankan Aplikasi

Aplikasi lengkapnya memiliki tiga kode Java. Mereka adalah sebagai berikut -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

Aplikasi dapat dibangun dengan menggunakan perintah berikut -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Aplikasi dapat dijalankan dengan menggunakan perintah berikut -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Keluaran

Setelah aplikasi dimulai, aplikasi akan menampilkan detail lengkap tentang proses startup cluster, pemrosesan operasi, DRPC Server dan informasi klien, dan akhirnya, proses penutupan cluster. Output ini akan ditampilkan di konsol seperti yang ditunjukkan di bawah ini.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends