Apache Storm - Ví dụ làm việc

Chúng ta đã xem qua các chi tiết kỹ thuật cốt lõi của Apache Storm và bây giờ đã đến lúc viết mã một số kịch bản đơn giản.

Kịch bản - Trình phân tích nhật ký cuộc gọi di động

Cuộc gọi di động và thời lượng của nó sẽ được cung cấp làm đầu vào cho Apache Storm và Storm sẽ xử lý và nhóm cuộc gọi giữa người gọi và người nhận giống nhau và tổng số cuộc gọi của họ.

Tạo vòi

Spout là một thành phần được sử dụng để tạo dữ liệu. Về cơ bản, một vòi sẽ triển khai giao diện IRichSpout. Giao diện “IRichSpout” có các phương thức quan trọng sau:

  • open- Cung cấp cho vòi có môi trường để thực thi. Những người thực thi sẽ chạy phương thức này để khởi tạo vòi.

  • nextTuple - Truyền dữ liệu đã tạo thông qua bộ thu thập.

  • close - Phương thức này được gọi khi vòi sắp tắt.

  • declareOutputFields - Khai báo lược đồ đầu ra của bộ tuple.

  • ack - Xác nhận rằng một tuple cụ thể được xử lý

  • fail - Chỉ định rằng một tuple cụ thể không được xử lý và không được xử lý lại.

Mở

Chữ ký của open phương pháp như sau:

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Cung cấp cấu hình bão cho vòi này.

  • context - Cung cấp thông tin đầy đủ về vị trí vòi trong cấu trúc liên kết, id nhiệm vụ, thông tin đầu vào và đầu ra của nó.

  • collector - Cho phép chúng tôi phát ra tuple sẽ được xử lý bởi các bu lông.

nextTuple

Chữ ký của nextTuple phương pháp như sau:

nextTuple()

nextTuple () được gọi định kỳ từ cùng một vòng lặp với các phương thức ack () và fail (). Nó phải giải phóng quyền kiểm soát luồng khi không có việc gì phải làm, để các phương thức khác có cơ hội được gọi. Vì vậy, dòng đầu tiên của nextTuple kiểm tra xem quá trình xử lý đã hoàn tất chưa. Nếu vậy, nó nên ngủ ít nhất một phần nghìn giây để giảm tải cho bộ xử lý trước khi quay trở lại.

đóng

Chữ ký của close phương pháp như sau:

close()

statementOutputFields

Chữ ký của declareOutputFields phương pháp như sau:

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Nó được sử dụng để khai báo id luồng đầu ra, trường đầu ra, v.v.

Phương thức này được sử dụng để chỉ định lược đồ đầu ra của bộ tuple.

ack

Chữ ký của ack phương pháp như sau:

ack(Object msgId)

Phương thức này xác nhận rằng một tuple cụ thể đã được xử lý.

Thất bại

Chữ ký của nextTuple phương pháp như sau:

ack(Object msgId)

Phương thức này thông báo rằng một tuple cụ thể chưa được xử lý hoàn toàn. Storm sẽ xử lý lại bộ tuple cụ thể.

FakeCallLogReaderSpout

Trong kịch bản của chúng tôi, chúng tôi cần thu thập chi tiết nhật ký cuộc gọi. Thông tin của nhật ký cuộc gọi chứa.

  • số người gọi
  • số người nhận
  • duration

Vì chúng tôi không có thông tin thời gian thực của nhật ký cuộc gọi, chúng tôi sẽ tạo nhật ký cuộc gọi giả. Thông tin giả sẽ được tạo bằng lớp Random. Mã chương trình hoàn chỉnh được cung cấp bên dưới.

Mã hóa - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Tạo bu lông

Bolt là một thành phần lấy các bộ giá trị làm đầu vào, xử lý bộ giá trị và tạo ra các bộ giá trị mới làm đầu ra. Bu lông sẽ thực hiệnIRichBoltgiao diện. Trong chương trình này, hai lớp bu lôngCallLogCreatorBoltCallLogCounterBolt được sử dụng để thực hiện các hoạt động.

Giao diện IRichBolt có các phương thức sau:

  • prepare- Cung cấp cho bu lông một môi trường để thực thi. Những người thực thi sẽ chạy phương thức này để khởi tạo vòi.

  • execute - Xử lý một bộ dữ liệu đầu vào.

  • cleanup - Được gọi khi một bu lông sắp tắt.

  • declareOutputFields - Khai báo lược đồ đầu ra của bộ tuple.

Chuẩn bị

Chữ ký của prepare phương pháp như sau:

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Cung cấp cấu hình Storm cho bu lông này.

  • context - Cung cấp thông tin đầy đủ về vị trí bu lông trong cấu trúc liên kết, id nhiệm vụ của nó, thông tin đầu vào và đầu ra, v.v.

  • collector - Cho phép chúng tôi phát ra bộ tuple đã xử lý.

hành hình

Chữ ký của execute phương pháp như sau:

execute(Tuple tuple)

Đây tuple là bộ dữ liệu đầu vào được xử lý.

Các executephương thức xử lý một bộ dữ liệu tại một thời điểm. Dữ liệu tuple có thể được truy cập bằng phương thức getValue của lớp Tuple. Không cần thiết phải xử lý bộ đầu vào ngay lập tức. Nhiều bộ có thể được xử lý và xuất ra như một bộ đầu ra duy nhất. Tuple đã xử lý có thể được phát ra bằng cách sử dụng lớp OutputCollector.

dọn dẹp

Chữ ký của cleanup phương pháp như sau:

cleanup()

statementOutputFields

Chữ ký của declareOutputFields phương pháp như sau:

declareOutputFields(OutputFieldsDeclarer declarer)

Đây là thông số declarer được sử dụng để khai báo id luồng đầu ra, trường đầu ra, v.v.

Phương thức này được sử dụng để chỉ định lược đồ đầu ra của bộ tuple

Nhật ký cuộc gọi Bolt Creator

Chốt tạo nhật ký cuộc gọi nhận bộ ghi nhật ký cuộc gọi. Bộ ghi nhật ký cuộc gọi có số người gọi, số người nhận và thời lượng cuộc gọi. Chốt này chỉ đơn giản là tạo ra một giá trị mới bằng cách kết hợp số người gọi và số người nhận. Định dạng của giá trị mới là "Số người gọi - Số người nhận" và nó được đặt tên là trường mới, "cuộc gọi". Mã hoàn chỉnh được đưa ra bên dưới.

Mã hóa - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Nhật ký cuộc gọi Chốt bộ đếm

Chốt bộ đếm nhật ký cuộc gọi nhận cuộc gọi và thời lượng của nó dưới dạng một bộ. Bu lông này khởi tạo một đối tượng từ điển (Bản đồ) trong phương thức chuẩn bị. Trongexecute, nó sẽ kiểm tra tuple và tạo một mục nhập mới trong đối tượng từ điển cho mọi giá trị "gọi" mới trong bộ tuple và đặt giá trị 1 trong đối tượng từ điển. Đối với mục nhập đã có sẵn trong từ điển, nó chỉ tăng giá trị của nó. Nói một cách dễ hiểu, bu lông này lưu cuộc gọi và số lượng của nó trong đối tượng từ điển. Thay vì lưu cuộc gọi và số lượng của nó trong từ điển, chúng tôi cũng có thể lưu nó vào một nguồn dữ liệu. Mã chương trình hoàn chỉnh như sau:

Mã hóa - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Tạo cấu trúc liên kết

Cấu trúc liên kết Storm về cơ bản là một cấu trúc Tiết kiệm. Lớp TopologyBuilder cung cấp các phương thức đơn giản và dễ dàng để tạo các cấu trúc liên kết phức tạp. Lớp TopologyBuilder có các phương thức để thiết lập vòi(setSpout) và để đặt bu lông (setBolt). Cuối cùng, TopologyBuilder có createTopology để tạo cấu trúc liên kết. Sử dụng đoạn mã sau để tạo cấu trúc liên kết -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping các phương pháp giúp thiết lập nhóm dòng cho vòi và bu lông.

Cụm cục bộ

Đối với mục đích phát triển, chúng ta có thể tạo một cụm cục bộ bằng cách sử dụng đối tượng "LocalCluster" và sau đó gửi cấu trúc liên kết bằng cách sử dụng phương pháp "submitTopology" của lớp "LocalCluster". Một trong những đối số cho "submitTopology" là một thể hiện của lớp "Config". Lớp "Cấu hình" được sử dụng để đặt các tùy chọn cấu hình trước khi gửi cấu trúc liên kết. Tùy chọn cấu hình này sẽ được hợp nhất với cấu hình cụm tại thời điểm chạy và được gửi đến tất cả tác vụ (vòi và chốt) bằng phương thức chuẩn bị. Khi cấu trúc liên kết được gửi đến cụm, chúng tôi sẽ đợi 10 giây để cụm tính toán cấu trúc liên kết đã gửi và sau đó tắt cụm bằng cách sử dụng phương pháp "tắt máy" của "LocalCluster". Mã chương trình hoàn chỉnh như sau:

Mã hóa - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Xây dựng và chạy ứng dụng

Ứng dụng hoàn chỉnh có bốn mã Java. Họ là -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

Ứng dụng có thể được tạo bằng lệnh sau:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Ứng dụng có thể được chạy bằng lệnh sau:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Đầu ra

Khi ứng dụng được khởi động, nó sẽ xuất ra thông tin chi tiết đầy đủ về quy trình khởi động cụm, xử lý vòi và chốt, và cuối cùng là quy trình tắt cụm. Trong "CallLogCounterBolt", chúng tôi đã in chi tiết cuộc gọi và số lượng của nó. Thông tin này sẽ được hiển thị trên bảng điều khiển như sau:

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Ngôn ngữ không phải JVM

Các cấu trúc liên kết Storm được triển khai bởi các giao diện Thrift, giúp dễ dàng gửi cấu trúc liên kết bằng bất kỳ ngôn ngữ nào. Storm hỗ trợ Ruby, Python và nhiều ngôn ngữ khác. Chúng ta hãy xem liên kết python.

Python Binding

Python là một ngôn ngữ lập trình thông dịch, tương tác, hướng đối tượng và cấp cao có mục đích chung. Storm hỗ trợ Python để triển khai cấu trúc liên kết của nó. Python hỗ trợ các hoạt động phát ra, neo, đánh dấu và ghi nhật ký.

Như bạn đã biết, bu lông có thể được định nghĩa bằng bất kỳ ngôn ngữ nào. Các bu lông được viết bằng ngôn ngữ khác được thực thi dưới dạng các quy trình con và Storm giao tiếp với các quy trình con đó bằng các thông báo JSON qua stdin / stdout. Trước tiên, hãy lấy một chốt mẫu WordCount hỗ trợ liên kết python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Đây lớp WordCount thực hiện IRichBoltgiao diện và chạy với việc triển khai python đã chỉ định đối số siêu phương thức "splitword.py". Bây giờ, hãy tạo một triển khai python có tên "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

Đây là triển khai mẫu cho Python để đếm các từ trong một câu nhất định. Tương tự, bạn cũng có thể liên kết với các ngôn ngữ hỗ trợ khác.