Bão Apache - Cây đinh ba

Trident là một phần mở rộng của Storm. Giống như Storm, Trident cũng được phát triển bởi Twitter. Lý do chính đằng sau việc phát triển Trident là cung cấp sự trừu tượng cấp cao trên Storm cùng với xử lý luồng trạng thái và truy vấn phân tán độ trễ thấp.

Trident sử dụng vòi và bu lông, nhưng các thành phần cấp thấp này được Trident tạo tự động trước khi thực thi. Trident có các chức năng, bộ lọc, nối, nhóm và tổng hợp.

Trident xử lý các luồng dưới dạng một loạt các lô được gọi là giao dịch. Nói chung kích thước của các lô nhỏ đó sẽ theo thứ tự hàng nghìn hoặc hàng triệu bộ, tùy thuộc vào luồng đầu vào. Bằng cách này, Trident khác với Storm, nó thực hiện xử lý từng tuple.

Khái niệm xử lý hàng loạt rất giống với các giao dịch cơ sở dữ liệu. Mỗi giao dịch đều được gán một ID giao dịch. Giao dịch được coi là thành công sau khi tất cả quá trình xử lý hoàn tất. Tuy nhiên, việc xử lý một trong các bộ giá trị của giao dịch không thành công sẽ khiến toàn bộ giao dịch được truyền lại. Đối với mỗi đợt, Trident sẽ gọi beginCommit khi bắt đầu giao dịch và cam kết vào cuối đợt đó.

Cấu trúc liên kết Trident

Trident API cho thấy một tùy chọn dễ dàng để tạo cấu trúc liên kết Trident bằng cách sử dụng lớp “TridentTopology”. Về cơ bản, cấu trúc liên kết Trident nhận luồng đầu vào từ vòi và thực hiện trình tự hoạt động có thứ tự (lọc, tổng hợp, nhóm, v.v.) trên luồng. Storm Tuple được thay thế bằng Trident Tuple và Bolts được thay thế bằng hoạt động. Một cấu trúc liên kết Trident đơn giản có thể được tạo như sau:

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident tuple là một danh sách các giá trị được đặt tên. Giao diện TridentTuple là mô hình dữ liệu của cấu trúc liên kết Trident. Giao diện TridentTuple là đơn vị dữ liệu cơ bản có thể được xử lý bởi cấu trúc liên kết Trident.

Vòi đinh ba

Vòi Trident tương tự như vòi Storm, với các tùy chọn bổ sung để sử dụng các tính năng của Trident. Trên thực tế, chúng tôi vẫn có thể sử dụng IRichSpout, mà chúng tôi đã sử dụng trong cấu trúc liên kết Storm, nhưng nó sẽ không mang tính giao dịch và chúng tôi sẽ không thể sử dụng các lợi thế do Trident cung cấp.

Vòi cơ bản có tất cả các chức năng để sử dụng các tính năng của Trident là "ITridentSpout". Nó hỗ trợ cả ngữ nghĩa giao dịch và không rõ ràng. Các vòi khác là IBatchSpout, IPartitionedTridentSpout và IOpaquePartitionedTridentSpout.

Ngoài những vòi chung này, Trident có nhiều mẫu thực hiện vòi đinh ba. Một trong số đó là vòi FeederBatchSpout, chúng ta có thể sử dụng để gửi danh sách các bộ đinh ba có tên một cách dễ dàng mà không cần lo lắng về việc xử lý hàng loạt, song song, v.v.

Việc tạo FeederBatchSpout và cung cấp dữ liệu có thể được thực hiện như hình dưới đây -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Hoạt động Trident

Trident dựa vào “Hoạt động của đinh ba” để xử lý dòng đầu vào của bộ đinh ba. Trident API có một số hoạt động được tích hợp sẵn để xử lý quá trình xử lý luồng từ đơn giản đến phức tạp. Các hoạt động này bao gồm từ xác nhận đơn giản đến phức tạp nhóm và tổng hợp các bộ giá trị đinh ba. Hãy cùng chúng tôi điểm qua các thao tác quan trọng nhất và được sử dụng thường xuyên.

Bộ lọc

Bộ lọc là một đối tượng được sử dụng để thực hiện nhiệm vụ xác nhận đầu vào. Bộ lọc Trident lấy một tập con gồm các trường đinh ba làm đầu vào và trả về true hoặc false tùy thuộc vào việc một số điều kiện nhất định có được thỏa mãn hay không. Nếu trả về true, thì tuple được giữ trong luồng đầu ra; nếu không, tuple sẽ bị xóa khỏi luồng. Bộ lọc về cơ bản sẽ kế thừa từBaseFilter lớp và thực hiện isKeepphương pháp. Đây là một triển khai mẫu của hoạt động bộ lọc -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Chức năng bộ lọc có thể được gọi trong cấu trúc liên kết bằng cách sử dụng phương thức "mỗi". Lớp “Fields” có thể được sử dụng để chỉ định đầu vào (tập con của bộ trident). Mã mẫu như sau:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Chức năng

Functionlà một đối tượng được sử dụng để thực hiện một hoạt động đơn giản trên một bộ đinh ba duy nhất. Nó nhận một tập hợp con của các trường tuple đinh ba và phát ra không hoặc nhiều trường tuple mới.

Function về cơ bản kế thừa từ BaseFunction lớp học và thực hiện executephương pháp. Một triển khai mẫu được đưa ra dưới đây:

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Cũng giống như hoạt động Bộ lọc, hoạt động Hàm có thể được gọi trong cấu trúc liên kết bằng cách sử dụng eachphương pháp. Mã mẫu như sau:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Tổng hợp

Tổng hợp là một đối tượng được sử dụng để thực hiện các hoạt động tổng hợp trên một lô hoặc phân vùng hoặc luồng đầu vào. Trident có ba kiểu tập hợp. Chúng như sau:

  • aggregate- Tổng hợp từng lô củ đinh ba một cách riêng biệt. Trong quá trình tổng hợp, các bộ giá trị ban đầu được phân vùng lại bằng cách sử dụng nhóm toàn cục để kết hợp tất cả các phân vùng của cùng một lô thành một phân vùng duy nhất.

  • partitionAggregate- Tổng hợp từng phân vùng thay vì toàn bộ loạt tuple trident. Đầu ra của tập hợp phân vùng thay thế hoàn toàn bộ dữ liệu đầu vào. Đầu ra của tập hợp phân vùng chứa một bộ trường duy nhất.

  • persistentaggregate - Tổng hợp trên tất cả bộ đinh ba trên tất cả các lô và lưu trữ kết quả trong bộ nhớ hoặc cơ sở dữ liệu.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Hoạt động tổng hợp có thể được tạo bằng cách sử dụng giao diện CombinerAggregator, ReducerAggregator hoặc Aggregator chung. Bộ tổng hợp "số lượng" được sử dụng trong ví dụ trên là một trong các bộ tổng hợp tích hợp. Nó được triển khai bằng cách sử dụng "CombinerAggregator". Việc triển khai như sau:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Phân nhóm

Hoạt động nhóm là một hoạt động có sẵn và có thể được gọi bởi groupByphương pháp. Phương thức groupBy định hướng lại luồng bằng cách thực hiện phân vùng Bằng các trường được chỉ định và sau đó trong mỗi phân vùng, nó nhóm các bộ dữ liệu lại với nhau có các trường nhóm bằng nhau. Thông thường, chúng tôi sử dụng “groupBy” cùng với “dai dẳngAggregate” để lấy tổng hợp được nhóm lại. Mã mẫu như sau:

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Hợp nhất và Gia nhập

Việc hợp nhất và kết hợp có thể được thực hiện bằng cách sử dụng phương pháp “hợp nhất” và “nối” tương ứng. Hợp nhất kết hợp một hoặc nhiều luồng. Tham gia tương tự như hợp nhất, ngoại trừ thực tế là việc tham gia sử dụng trường đinh ba từ cả hai phía để kiểm tra và tham gia hai luồng. Hơn nữa, việc tham gia sẽ chỉ hoạt động ở cấp độ hàng loạt. Mã mẫu như sau:

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Bảo trì trạng thái

Trident cung cấp một cơ chế để duy trì trạng thái. Thông tin trạng thái có thể được lưu trữ trong chính cấu trúc liên kết, nếu không bạn cũng có thể lưu trữ nó trong một cơ sở dữ liệu riêng biệt. Lý do là để duy trì trạng thái rằng nếu bất kỳ bộ dữ liệu nào bị lỗi trong quá trình xử lý, thì bộ dữ liệu bị lỗi sẽ được thử lại. Điều này tạo ra sự cố trong khi cập nhật trạng thái vì bạn không chắc liệu trạng thái của tuple này đã được cập nhật trước đó hay chưa. Nếu tuple bị lỗi trước khi cập nhật trạng thái, thì việc thử lại tuple sẽ làm cho trạng thái ổn định. Tuy nhiên, nếu tuple bị lỗi sau khi cập nhật trạng thái, thì việc thử lại cùng một tuple sẽ lại làm tăng số lượng trong cơ sở dữ liệu và làm cho trạng thái không ổn định. Người ta cần thực hiện các bước sau để đảm bảo thư chỉ được xử lý một lần -

  • Xử lý các bộ giá trị thành từng đợt nhỏ.

  • Chỉ định một ID duy nhất cho mỗi lô. Nếu lô được thử lại, lô sẽ được cung cấp cùng một ID duy nhất.

  • Các bản cập nhật trạng thái được sắp xếp giữa các đợt. Ví dụ: cập nhật trạng thái của lô thứ hai sẽ không thể thực hiện được cho đến khi hoàn thành cập nhật trạng thái cho lô đầu tiên.

RPC phân tán

RPC phân tán được sử dụng để truy vấn và lấy kết quả từ cấu trúc liên kết Trident. Storm có một máy chủ RPC phân tán sẵn có. Máy chủ RPC phân tán nhận yêu cầu RPC từ máy khách và chuyển nó đến cấu trúc liên kết. Cấu trúc liên kết xử lý yêu cầu và gửi kết quả đến máy chủ RPC phân tán, được máy chủ RPC phân phối chuyển hướng đến máy khách. Truy vấn RPC phân tán của Trident thực thi giống như một truy vấn RPC thông thường, ngoại trừ thực tế là các truy vấn này được chạy song song.

Khi nào thì dùng Trident?

Như trong nhiều trường hợp sử dụng, nếu yêu cầu là xử lý truy vấn chỉ một lần, chúng ta có thể đạt được nó bằng cách viết cấu trúc liên kết trong Trident. Mặt khác, sẽ rất khó để đạt được chính xác một lần xử lý trong trường hợp Storm. Do đó, Trident sẽ hữu ích cho những trường hợp sử dụng mà bạn yêu cầu xử lý chính xác một lần. Trident không dành cho tất cả các trường hợp sử dụng, đặc biệt là các trường hợp sử dụng hiệu suất cao vì nó tăng thêm độ phức tạp cho Storm và quản lý trạng thái.

Ví dụ làm việc của Trident

Chúng tôi sẽ chuyển đổi ứng dụng phân tích nhật ký cuộc gọi của chúng tôi đã được thực hiện trong phần trước sang khuôn khổ Trident. Ứng dụng Trident sẽ tương đối dễ dàng so với bão thông thường, nhờ API cấp cao của nó. Về cơ bản, Storm sẽ được yêu cầu thực hiện bất kỳ một trong các hoạt động Chức năng, Lọc, Tổng hợp, NhómBy, Tham gia và Hợp nhất trong Trident. Cuối cùng, chúng tôi sẽ khởi động Máy chủ DRPC bằng cách sử dụngLocalDRPC lớp và tìm kiếm một số từ khóa bằng cách sử dụng execute phương thức của lớp LocalDRPC.

Định dạng thông tin cuộc gọi

Mục đích của lớp FormatCall là định dạng thông tin cuộc gọi bao gồm "Số người gọi" và "Số người nhận". Mã chương trình hoàn chỉnh như sau:

Mã hóa: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Mục đích của lớp CSVSplit là tách chuỗi đầu vào dựa trên “dấu phẩy (,)” và phát ra mọi từ trong chuỗi. Hàm này được sử dụng để phân tích cú pháp đối số đầu vào của truy vấn phân tán. Mã hoàn chỉnh như sau:

Mã hóa: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Trình phân tích nhật ký

Đây là ứng dụng chính. Ban đầu, ứng dụng sẽ khởi tạo TridentTopology và thông tin người gọi nguồn cấp dữ liệu bằng cách sử dụngFeederBatchSpout. Luồng cấu trúc liên kết Trident có thể được tạo bằng cách sử dụngnewStreamphương pháp của lớp TridentTopology. Tương tự, luồng DRPC cấu trúc liên kết Trident có thể được tạo bằng cách sử dụngnewDRCPStreamphương pháp của lớp TridentTopology. Một máy chủ DRCP đơn giản có thể được tạo bằng cách sử dụng lớp LocalDRPC.LocalDRPCcó phương thức thực thi để tìm kiếm một số từ khóa. Mã hoàn chỉnh được đưa ra bên dưới.

Mã hóa: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Xây dựng và chạy ứng dụng

Ứng dụng hoàn chỉnh có ba mã Java. Chúng như sau:

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

Ứng dụng có thể được tạo bằng cách sử dụng lệnh sau:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Ứng dụng có thể được chạy bằng cách sử dụng lệnh sau:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Đầu ra

Khi ứng dụng được khởi động, ứng dụng sẽ xuất ra chi tiết đầy đủ về quá trình khởi động cụm, xử lý hoạt động, Máy chủ DRPC và thông tin máy khách, và cuối cùng là quá trình tắt cụm. Đầu ra này sẽ được hiển thị trên bảng điều khiển như hình dưới đây.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends