ApacheStorm-実例

Apache Stormの主要な技術的詳細を確認しました。次に、いくつかの簡単なシナリオをコーディングします。

シナリオ–モバイル通話ログアナライザー

モバイル通話とその継続時間はApacheStormへの入力として提供され、Stormは同じ発信者と受信者の間の通話と、それらの通話の総数を処理してグループ化します。

注ぎ口の作成

注ぎ口は、データ生成に使用されるコンポーネントです。基本的に、注ぎ口はIRichSpoutインターフェースを実装します。「IRichSpout」インターフェースには、次の重要なメソッドがあります。

  • open−注ぎ口に実行する環境を提供します。エグゼキュータはこのメソッドを実行して、注ぎ口を初期化します。

  • nextTuple −コレクターを介して生成されたデータを出力します。

  • close −このメソッドは、注ぎ口がシャットダウンするときに呼び出されます。

  • declareOutputFields −タプルの出力スキーマを宣言します。

  • ack −特定のタプルが処理されたことを確認します

  • fail −特定のタプルが処理されず、再処理されないことを指定します。

開いた

の署名 open 方法は次のとおりです-

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf −この注ぎ口にストーム構成を提供します。

  • context −トポロジ内の注ぎ口の場所、そのタスクID、入力および出力情報に関する完全な情報を提供します。

  • collector −ボルトで処理されるタプルを放出できるようにします。

nextTuple

の署名 nextTuple 方法は次のとおりです-

nextTuple()

nextTuple()は、ack()およびfail()メソッドと同じループから定期的に呼び出されます。他のメソッドが呼び出される可能性があるように、実行する作業がないときにスレッドの制御を解放する必要があります。したがって、nextTupleの最初の行は、処理が終了したかどうかを確認します。その場合、戻る前にプロセッサの負荷を軽減するために、少なくとも1ミリ秒スリープする必要があります。

閉じる

の署名 close 方法は次のとおりです-

close()

宣言出力フィールド

の署名 declareOutputFields 方法は次のとおりです-

declareOutputFields(OutputFieldsDeclarer declarer)

declarer −出力ストリームID、出力フィールドなどを宣言するために使用されます。

このメソッドは、タプルの出力スキーマを指定するために使用されます。

ack

の署名 ack 方法は次のとおりです-

ack(Object msgId)

このメソッドは、特定のタプルが処理されたことを確認します。

不合格

の署名 nextTuple 方法は次のとおりです-

ack(Object msgId)

このメソッドは、特定のタプルが完全に処理されていないことを通知します。Stormは特定のタプルを再処理します。

FakeCallLogReaderSpout

このシナリオでは、通話ログの詳細を収集する必要があります。通話記録の情報にはが含まれています。

  • 発信者番号
  • レシーバー番号
  • duration

通話記録のリアルタイム情報がないため、偽の通話記録を生成します。偽の情報は、Randomクラスを使用して作成されます。完全なプログラムコードを以下に示します。

コーディング-FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

ボルトの作成

Boltは、タプルを入力として受け取り、タプルを処理し、出力として新しいタプルを生成するコンポーネントです。ボルトは実装しますIRichBoltインターフェース。このプログラムでは、2つのボルトクラスCallLogCreatorBolt そして CallLogCounterBolt 操作を実行するために使用されます。

IRichBoltインターフェースには次のメソッドがあります-

  • prepare−ボルトに実行環境を提供します。エグゼキュータはこのメソッドを実行して、注ぎ口を初期化します。

  • execute −入力の単一タプルを処理します。

  • cleanup −ボルトがシャットダウンするときに呼び出されます。

  • declareOutputFields −タプルの出力スキーマを宣言します。

準備する

の署名 prepare 方法は次のとおりです-

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf −このボルトのストーム構成を提供します。

  • context −トポロジ内のボルトの位置、そのタスクID、入力および出力情報などに関する完全な情報を提供します。

  • collector −処理されたタプルを発行できるようにします。

実行する

の署名 execute 方法は次のとおりです-

execute(Tuple tuple)

ここに tuple 処理される入力タプルです。

ザ・ executeメソッドは、一度に1つのタプルを処理します。タプルデータには、タプルクラスのgetValueメソッドからアクセスできます。入力タプルをすぐに処理する必要はありません。複数のタプルを処理して、単一の出力タプルとして出力できます。処理されたタプルは、OutputCollectorクラスを使用して発行できます。

掃除

の署名 cleanup 方法は次のとおりです-

cleanup()

宣言出力フィールド

の署名 declareOutputFields 方法は次のとおりです-

declareOutputFields(OutputFieldsDeclarer declarer)

ここでパラメータ declarer 出力ストリームID、出力フィールドなどを宣言するために使用されます。

このメソッドは、タプルの出力スキーマを指定するために使用されます

ログクリエーターボルトを呼び出す

通話記録作成ボルトは、通話記録タプルを受け取ります。コールログタプルには、発信者番号、受信者番号、および通話時間が含まれます。このボルトは、発信者番号と受信者番号を組み合わせて新しい値を作成するだけです。新しい値の形式は「発信者番号–受信者番号」であり、新しいフィールド「call」という名前が付けられています。完全なコードを以下に示します。

コーディング-CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

通話記録カウンターボルト

コールログカウンターボルトは、コールとその期間をタプルとして受け取ります。このボルトは、prepareメソッドの辞書(Map)オブジェクトを初期化します。にexecuteメソッドでは、タプルをチェックし、タプル内の新しい「呼び出し」値ごとにディクショナリオブジェクトに新しいエントリを作成し、ディクショナリオブジェクトに値1を設定します。ディクショナリですでに使用可能なエントリの場合、値をインクリメントするだけです。簡単に言うと、このボルトは呼び出しとそのカウントを辞書オブジェクトに保存します。呼び出しとそのカウントを辞書に保存する代わりに、データソースに保存することもできます。完全なプログラムコードは次のとおりです-

コーディング-CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

トポロジの作成

ストームトポロジは基本的にスリフト構造です。TopologyBuilderクラスは、複雑なトポロジを作成するためのシンプルで簡単なメソッドを提供します。TopologyBuilderクラスには、注ぎ口を設定するメソッドがあります(setSpout) とボルトを設定します (setBolt)。最後に、TopologyBuilderにはトポロジを作成するためのcreateTopologyがあります。次のコードスニペットを使用してトポロジを作成します-

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping そして fieldsGrouping メソッドは、注ぎ口とボルトのストリームグループを設定するのに役立ちます。

ローカルクラスター

開発目的では、「LocalCluster」オブジェクトを使用してローカルクラスターを作成し、「LocalCluster」クラスの「submitTopology」メソッドを使用してトポロジを送信できます。「submitTopology」の引数の1つは、「Config」クラスのインスタンスです。「Config」クラスは、トポロジを送信する前に構成オプションを設定するために使用されます。この構成オプションは、実行時にクラスター構成とマージされ、prepareメソッドを使用してすべてのタスク(注ぎ口とボルト)に送信されます。トポロジがクラスターに送信されると、クラスターが送信されたトポロジを計算するまで10秒間待機し、「LocalCluster」の「shutdown」メソッドを使用してクラスターをシャットダウンします。完全なプログラムコードは次のとおりです-

コーディング-LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

アプリケーションの構築と実行

完全なアプリケーションには4つのJavaコードがあります。彼らは-

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

アプリケーションは、次のコマンドを使用して構築できます-

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

アプリケーションは、次のコマンドを使用して実行できます-

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

出力

アプリケーションが開始されると、クラスターの起動プロセス、注ぎ口とボルトの処理、そして最後にクラスターのシャットダウンプロセスに関する完全な詳細が出力されます。「CallLogCounterBolt」では、呼び出しとそのカウントの詳細を出力しました。この情報は、コンソールに次のように表示されます-

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非JVM言語

ストームトポロジはThriftインターフェイスによって実装されているため、任意の言語でトポロジを簡単に送信できます。Stormは、Ruby、Python、およびその他の多くの言語をサポートしています。Pythonバインディングを見てみましょう。

Pythonバインディング

Pythonは、汎用のインタープリター型、インタラクティブ、オブジェクト指向、および高水準プログラミング言語です。Stormは、Pythonをサポートしてトポロジを実装します。Pythonは、放出、アンカー、確認、およびロギング操作をサポートしています。

ご存知のように、ボルトはどの言語でも定義できます。別の言語で記述されたボルトはサブプロセスとして実行され、Stormはstdin / stdoutを介してJSONメッセージでそれらのサブプロセスと通信します。まず、PythonバインディングをサポートするサンプルボルトWordCountを取得します。

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

ここでクラス WordCount を実装します IRichBoltインターフェイスとPython実装で実行すると、スーパーメソッド引数「splitword.py」が指定されます。次に、「splitword.py」という名前のPython実装を作成します。

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

これは、特定の文の単語をカウントするPythonのサンプル実装です。同様に、他のサポート言語とバインドすることもできます。