ट्विटर में अपाचे स्टॉर्म

यहाँ इस अध्याय में, हम अपाचे स्टॉर्म के एक वास्तविक समय के अनुप्रयोग पर चर्चा करेंगे। हम देखेंगे कि ट्विटर में तूफान का उपयोग कैसे किया जाता है।

ट्विटर

Twitter एक ऑनलाइन सोशल नेटवर्किंग सेवा है जो उपयोगकर्ता के ट्वीट भेजने और प्राप्त करने के लिए एक मंच प्रदान करती है। पंजीकृत उपयोगकर्ता ट्वीट पढ़ और पोस्ट कर सकते हैं, लेकिन अपंजीकृत उपयोगकर्ता केवल ट्वीट पढ़ सकते हैं। हैशटैग का उपयोग प्रासंगिक कीवर्ड से पहले # जोड़कर कीवर्ड द्वारा ट्वीट को वर्गीकृत करने के लिए किया जाता है। अब हम प्रति विषय सबसे अधिक उपयोग किए जाने वाले हैशटैग को खोजने का एक वास्तविक समय परिदृश्य लेते हैं।

सृजन करो

टोंटी का उद्देश्य लोगों द्वारा जल्द से जल्द ट्वीट प्रस्तुत करना है। ट्विटर "ट्विटर स्ट्रीमिंग एपीआई" प्रदान करता है, जो वास्तविक समय में लोगों द्वारा प्रस्तुत किए गए ट्वीट को पुनः प्राप्त करने के लिए एक वेब सेवा आधारित उपकरण है। ट्विटर स्ट्रीमिंग एपीआई को किसी भी प्रोग्रामिंग भाषा में एक्सेस किया जा सकता है।

twitter4j एक खुला स्रोत है, अनौपचारिक जावा पुस्तकालय, जो ट्विटर स्ट्रीमिंग एपीआई तक आसानी से पहुंचने के लिए जावा आधारित मॉड्यूल प्रदान करता है। twitter4jट्वीट्स तक पहुंचने के लिए एक श्रोता-आधारित ढांचा प्रदान करता है। Twitter स्ट्रीमिंग API तक पहुंचने के लिए, हमें Twitter डेवलपर खाते के लिए साइन इन करना होगा और निम्नलिखित OAuth प्रमाणीकरण विवरण प्राप्त करना चाहिए।

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

तूफान एक चहचहाना टोंटी प्रदान करता है, TwitterSampleSpout,इसकी स्टार्टर किट में। हम इसका उपयोग ट्वीट्स को पुनः प्राप्त करने के लिए करेंगे। टोंटी को OAuth प्रमाणीकरण विवरण और कम से कम एक कीवर्ड की आवश्यकता है। टोंटी कीवर्ड के आधार पर वास्तविक समय के ट्वीट्स का उत्सर्जन करेगी। पूरा प्रोग्राम कोड नीचे दिया गया है।

कोडिंग: TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }
		
   public TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }
					
            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}
					
            @Override
            public void onTrackLimitationNotice(int i) {}
					
            @Override
            public void onScrubGeo(long l, long l1) {}
					
            @Override
            public void onException(Exception ex) {}
					
            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = new ConfigurationBuilder();
				
         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);
				
         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   public void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }
			
   @Override
   public void close() {
      _twitterStream.shutdown();
   }
			
   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }
			
   @Override
   public void ack(Object id) {}
			
   @Override
   public void fail(Object id) {}
			
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

हैशटैग रीडर बोल्ट

टोंटी द्वारा उत्सर्जित ट्वीट को अग्रेषित किया जाएगा HashtagReaderBolt, जो ट्वीट को संसाधित करेगा और सभी उपलब्ध हैशटैग का उत्सर्जन करेगा। HashtagReaderBolt का उपयोग करता हैgetHashTagEntitiestwitter4j द्वारा प्रदान की गई विधि। getHashTagEntities ने ट्वीट को पढ़ा और हैशटैग की सूची लौटा दी। पूरा कार्यक्रम कोड इस प्रकार है -

कोडिंग: HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

हैशटैग काउंटर बोल्ट

उत्सर्जित हैशटैग को अग्रेषित किया जाएगा HashtagCounterBolt। यह बोल्ट सभी हैशटैग को संसाधित करेगा और जावा मैप ऑब्जेक्ट का उपयोग करके प्रत्येक और प्रत्येक हैशटैग और इसकी गिनती को बचाएगा। पूरा प्रोग्राम कोड नीचे दिया गया है।

कोडिंग: हैशटैग एनकाउंटरबोल्ट.जवा

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

टोपोलॉजी सबमिट करना

एक टोपोलॉजी जमा करना मुख्य अनुप्रयोग है। ट्विटर टोपोलॉजी के होते हैंTwitterSampleSpout, HashtagReaderBolt, तथा HashtagCounterBolt। निम्न प्रोग्राम कोड दिखाता है कि टोपोलॉजी कैसे जमा करें।

कोडिंग: TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

बिल्डिंग और अनुप्रयोग चल रहा है

पूर्ण आवेदन में चार जावा कोड हैं। वे इस प्रकार हैं -

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

आप निम्नलिखित कमांड का उपयोग करके एप्लिकेशन को संकलित कर सकते हैं -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

निम्नलिखित आदेशों का उपयोग करके आवेदन निष्पादित करें -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

उत्पादन

आवेदन वर्तमान उपलब्ध हैशटैग और उसकी गिनती को प्रिंट करेगा। आउटपुट निम्नलिखित के समान होना चाहिए -

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1