अपाचे स्टॉर्म - वर्किंग उदाहरण

हम अपाचे स्टॉर्म के मुख्य तकनीकी विवरण से गुजरे हैं और अब कुछ सरल परिदृश्यों को कोड करने का समय आ गया है।

परिदृश्य - मोबाइल कॉल लॉग विश्लेषक

मोबाइल कॉल और इसकी अवधि अपाचे स्टॉर्म के इनपुट के रूप में दी जाएगी और स्टॉर्म एक ही कॉलर और रिसीवर और उनके कुल कॉल के बीच कॉल को संसाधित और समूह करेगा।

सृजन करो

टोंटी एक घटक है जो डेटा पीढ़ी के लिए उपयोग किया जाता है। मूल रूप से, एक टोंटी एक IRichSpout इंटरफ़ेस को लागू करेगा। "IRichSpout" इंटरफ़ेस में निम्नलिखित महत्वपूर्ण विधियाँ हैं -

  • open- निष्पादित करने के लिए वातावरण के साथ टोंटी प्रदान करता है। जल्लाद को इनिशियलाइज़ करने के लिए एग्जिक्यूटर्स इस विधि को चलाएंगे।

  • nextTuple - कलेक्टर के माध्यम से उत्पन्न डेटा का उत्सर्जन करता है।

  • close - इस पद्धति को कहा जाता है जब एक टोंटी बंद करने जा रही है।

  • declareOutputFields - टपल के आउटपुट स्कीमा की घोषणा करता है।

  • ack - यह स्वीकार करता है कि एक विशिष्ट टपल संसाधित है

  • fail - निर्दिष्ट करता है कि एक विशिष्ट टपल को संसाधित नहीं किया जाता है और न ही पुन: संसाधित किया जाता है।

खुला हुआ

के हस्ताक्षर open विधि इस प्रकार है -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - इस टोंटी के लिए तूफान विन्यास प्रदान करता है।

  • context - टोपोलॉजी के बारे में पूरी जानकारी, उसकी टास्क आईडी, इनपुट और आउटपुट की जानकारी।

  • collector - बोल्ट द्वारा संसाधित किए जाने वाले टपल का उत्सर्जन करने में हमें सक्षम बनाता है।

nextTuple

के हस्ताक्षर nextTuple विधि इस प्रकार है -

nextTuple()

अगली ट्यूपल () को समय-समय पर एक ही लूप से ack () और फेल () विधियों के रूप में कहा जाता है। यह थ्रेड का नियंत्रण जारी करना चाहिए जब कोई काम नहीं करना है, ताकि अन्य तरीकों को कॉल करने का मौका मिले। तो अगली लाइन की अगली पंक्ति यह देखने के लिए जांचती है कि प्रसंस्करण समाप्त हो गया है या नहीं। यदि हां, तो लौटने से पहले प्रोसेसर पर लोड को कम करने के लिए कम से कम एक मिलीसेकंड के लिए सोना चाहिए।

बंद करे

के हस्ताक्षर close विधि इस प्रकार है -

close()

declareOutputFields

के हस्ताक्षर declareOutputFields विधि इस प्रकार है -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - इसका उपयोग आउटपुट स्ट्रीम आईडी, आउटपुट फ़ील्ड आदि घोषित करने के लिए किया जाता है।

इस पद्धति का उपयोग टुपल के आउटपुट स्कीमा को निर्दिष्ट करने के लिए किया जाता है।

एसीके

के हस्ताक्षर ack विधि इस प्रकार है -

ack(Object msgId)

यह विधि स्वीकार करती है कि एक विशिष्ट टपल संसाधित किया गया है।

विफल

के हस्ताक्षर nextTuple विधि इस प्रकार है -

ack(Object msgId)

यह विधि बताती है कि एक विशिष्ट टपल को पूरी तरह से संसाधित नहीं किया गया है। तूफान विशिष्ट टपल को पुन: उत्पन्न करेगा।

FakeCallLogReaderSpout

हमारे परिदृश्य में, हमें कॉल लॉग विवरण एकत्र करने की आवश्यकता है। कॉल लॉग की जानकारी शामिल है।

  • कॉलर नंबर
  • रिसीवर संख्या
  • duration

चूंकि, हमारे पास कॉल लॉग्स की वास्तविक समय की जानकारी नहीं है, हम नकली कॉल लॉग्स उत्पन्न करेंगे। फर्जी सूचना का उपयोग रैंडम क्लास का उपयोग करके बनाया जाएगा। पूरा प्रोग्राम कोड नीचे दिया गया है।

कोडिंग - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

बोल्ट निर्माण

बोल्ट एक घटक है जो ट्यूपल्स को इनपुट के रूप में लेता है, टपल को संसाधित करता है, और आउटपुट के रूप में नए ट्यूपल्स का उत्पादन करता है। बोल्ट लागू करेंगेIRichBoltइंटरफेस। इस कार्यक्रम में, दो बोल्ट कक्षाएंCallLogCreatorBolt तथा CallLogCounterBolt संचालन करने के लिए उपयोग किया जाता है।

IRichBolt इंटरफ़ेस के निम्नलिखित तरीके हैं -

  • prepare- निष्पादित करने के लिए पर्यावरण के साथ बोल्ट प्रदान करता है। जल्लाद को इनिशियलाइज़ करने के लिए एग्जिक्यूटर्स इस विधि को चलाएंगे।

  • execute - इनपुट के एकल टपल की प्रक्रिया करें।

  • cleanup - जब बोल्ट बंद करने जा रहा हो तो कॉल किया जाता है।

  • declareOutputFields - टपल के आउटपुट स्कीमा की घोषणा करता है।

तैयार

के हस्ताक्षर prepare विधि इस प्रकार है -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - इस बोल्ट के लिए तूफान विन्यास प्रदान करता है।

  • context - टोपोलॉजी के भीतर बोल्ट की जगह, उसकी टास्क आईडी, इनपुट और आउटपुट की जानकारी आदि के बारे में पूरी जानकारी उपलब्ध कराता है।

  • collector - हमें प्रोसेस्ड ट्यूपल का उत्सर्जन करने में सक्षम बनाता है।

निष्पादित

के हस्ताक्षर execute विधि इस प्रकार है -

execute(Tuple tuple)

यहाँ tuple संसाधित करने के लिए इनपुट टपल है।

executeविधि एक बार में एक ही नलिका बनाती है। टपल डेटा को टपल क्लास के गेटवैल्यू विधि द्वारा एक्सेस किया जा सकता है। इनपुट टपल को तुरंत संसाधित करना आवश्यक नहीं है। एकाधिक ट्यूपल को एकल आउटपुट टपल के रूप में संसाधित और आउटपुट किया जा सकता है। आउटपुट ट्यूपल को आउटपुटकॉल्टर क्लास का उपयोग करके उत्सर्जित किया जा सकता है।

साफ - सफाई

के हस्ताक्षर cleanup विधि इस प्रकार है -

cleanup()

declareOutputFields

के हस्ताक्षर declareOutputFields विधि इस प्रकार है -

declareOutputFields(OutputFieldsDeclarer declarer)

यहाँ पैरामीटर declarer is used to declare output stream ids, output fields, etc.

This method is used to specify the output schema of the tuple

Call log Creator Bolt

Call log creator bolt receives the call log tuple. The call log tuple has caller number, receiver number, and call duration. This bolt simply creates a new value by combining the caller number and the receiver number. The format of the new value is "Caller number – Receiver number" and it is named as new field, "call". The complete code is given below.

Coding − CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Call log Counter Bolt

Call log counter bolt receives call and its duration as a tuple. This bolt initializes a dictionary (Map) object in the prepare method. In execute method, it checks the tuple and creates a new entry in the dictionary object for every new “call” value in the tuple and sets a value 1 in the dictionary object. For the already available entry in the dictionary, it just increment its value. In simple terms, this bolt saves the call and its count in the dictionary object. Instead of saving the call and its count in the dictionary, we can also save it to a datasource. The complete program code is as follows −

Coding − CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Creating Topology

The Storm topology is basically a Thrift structure. TopologyBuilder class provides simple and easy methods to create complex topologies. The TopologyBuilder class has methods to set spout (setSpout) and to set bolt (setBolt). Finally, TopologyBuilder has createTopology to create topology. Use the following code snippet to create a topology −

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping and fieldsGrouping methods help to set stream grouping for spout and bolts.

Local Cluster

For development purpose, we can create a local cluster using "LocalCluster" object and then submit the topology using "submitTopology" method of "LocalCluster" class. One of the arguments for "submitTopology" is an instance of "Config" class. The "Config" class is used to set configuration options before submitting the topology. This configuration option will be merged with the cluster configuration at run time and sent to all task (spout and bolt) with the prepare method. Once topology is submitted to the cluster, we will wait 10 seconds for the cluster to compute the submitted topology and then shutdown the cluster using “shutdown” method of "LocalCluster". The complete program code is as follows −

Coding − LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Building and Running the Application

The complete application has four Java codes. They are −

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

The application can be built using the following command −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

The application can be run using the following command −

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Output

Once the application is started, it will output the complete details about the cluster startup process, spout and bolt processing, and finally, the cluster shutdown process. In "CallLogCounterBolt", we have printed the call and its count details. This information will be displayed on the console as follows −

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Non-JVM languages

Storm topologies are implemented by Thrift interfaces which makes it easy to submit topologies in any language. Storm supports Ruby, Python and many other languages. Let’s take a look at python binding.

Python Binding

Python is a general-purpose interpreted, interactive, object-oriented, and high-level programming language. Storm supports Python to implement its topology. Python supports emitting, anchoring, acking, and logging operations.

As you know, bolts can be defined in any language. Bolts written in another language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdout. First take a sample bolt WordCount that supports python binding.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Here the class WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py". Now create a python implementation named "splitword.py".

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

This is the sample implementation for Python that counts the words in a given sentence. Similarly you can bind with other supporting languages as well.