ट्विटर में अपाचे स्टॉर्म
यहाँ इस अध्याय में, हम अपाचे स्टॉर्म के एक वास्तविक समय के अनुप्रयोग पर चर्चा करेंगे। हम देखेंगे कि ट्विटर में तूफान का उपयोग कैसे किया जाता है।
ट्विटर
Twitter एक ऑनलाइन सोशल नेटवर्किंग सेवा है जो उपयोगकर्ता के ट्वीट भेजने और प्राप्त करने के लिए एक मंच प्रदान करती है। पंजीकृत उपयोगकर्ता ट्वीट पढ़ और पोस्ट कर सकते हैं, लेकिन अपंजीकृत उपयोगकर्ता केवल ट्वीट पढ़ सकते हैं। हैशटैग का उपयोग प्रासंगिक कीवर्ड से पहले # जोड़कर कीवर्ड द्वारा ट्वीट को वर्गीकृत करने के लिए किया जाता है। अब हम प्रति विषय सबसे अधिक उपयोग किए जाने वाले हैशटैग को खोजने का एक वास्तविक समय परिदृश्य लेते हैं।
सृजन करो
टोंटी का उद्देश्य लोगों द्वारा जल्द से जल्द ट्वीट प्रस्तुत करना है। ट्विटर "ट्विटर स्ट्रीमिंग एपीआई" प्रदान करता है, जो वास्तविक समय में लोगों द्वारा प्रस्तुत किए गए ट्वीट को पुनः प्राप्त करने के लिए एक वेब सेवा आधारित उपकरण है। ट्विटर स्ट्रीमिंग एपीआई को किसी भी प्रोग्रामिंग भाषा में एक्सेस किया जा सकता है।
twitter4j एक खुला स्रोत है, अनौपचारिक जावा पुस्तकालय, जो ट्विटर स्ट्रीमिंग एपीआई तक आसानी से पहुंचने के लिए जावा आधारित मॉड्यूल प्रदान करता है। twitter4jट्वीट्स तक पहुंचने के लिए एक श्रोता-आधारित ढांचा प्रदान करता है। Twitter स्ट्रीमिंग API तक पहुंचने के लिए, हमें Twitter डेवलपर खाते के लिए साइन इन करना होगा और निम्नलिखित OAuth प्रमाणीकरण विवरण प्राप्त करना चाहिए।
- Customerkey
- CustomerSecret
- AccessToken
- AccessTookenSecret
तूफान एक चहचहाना टोंटी प्रदान करता है, TwitterSampleSpout,इसकी स्टार्टर किट में। हम इसका उपयोग ट्वीट्स को पुनः प्राप्त करने के लिए करेंगे। टोंटी को OAuth प्रमाणीकरण विवरण और कम से कम एक कीवर्ड की आवश्यकता है। टोंटी कीवर्ड के आधार पर वास्तविक समय के ट्वीट्स का उत्सर्जन करेगी। पूरा प्रोग्राम कोड नीचे दिया गया है।
कोडिंग: TwitterSampleSpout.java
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
LinkedBlockingQueue<Status> queue = null;
TwitterStream _twitterStream;
String consumerKey;
String consumerSecret;
String accessToken;
String accessTokenSecret;
String[] keyWords;
public TwitterSampleSpout(String consumerKey, String consumerSecret,
String accessToken, String accessTokenSecret, String[] keyWords) {
this.consumerKey = consumerKey;
this.consumerSecret = consumerSecret;
this.accessToken = accessToken;
this.accessTokenSecret = accessTokenSecret;
this.keyWords = keyWords;
}
public TwitterSampleSpout() {
// TODO Auto-generated constructor stub
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
queue = new LinkedBlockingQueue<Status>(1000);
_collector = collector;
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
}
@Override
public void onDeletionNotice(StatusDeletionNotice sdn) {}
@Override
public void onTrackLimitationNotice(int i) {}
@Override
public void onScrubGeo(long l, long l1) {}
@Override
public void onException(Exception ex) {}
@Override
public void onStallWarning(StallWarning arg0) {
// TODO Auto-generated method stub
}
};
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);
_twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
_twitterStream.addListener(listener);
if (keyWords.length == 0) {
_twitterStream.sample();
}else {
FilterQuery query = new FilterQuery().track(keyWords);
_twitterStream.filter(query);
}
}
@Override
public void nextTuple() {
Status ret = queue.poll();
if (ret == null) {
Utils.sleep(50);
} else {
_collector.emit(new Values(ret));
}
}
@Override
public void close() {
_twitterStream.shutdown();
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config ret = new Config();
ret.setMaxTaskParallelism(1);
return ret;
}
@Override
public void ack(Object id) {}
@Override
public void fail(Object id) {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweet"));
}
}
हैशटैग रीडर बोल्ट
टोंटी द्वारा उत्सर्जित ट्वीट को अग्रेषित किया जाएगा HashtagReaderBolt, जो ट्वीट को संसाधित करेगा और सभी उपलब्ध हैशटैग का उत्सर्जन करेगा। HashtagReaderBolt का उपयोग करता हैgetHashTagEntitiestwitter4j द्वारा प्रदान की गई विधि। getHashTagEntities ने ट्वीट को पढ़ा और हैशटैग की सूची लौटा दी। पूरा कार्यक्रम कोड इस प्रकार है -
कोडिंग: HashtagReaderBolt.java
import java.util.HashMap;
import java.util.Map;
import twitter4j.*;
import twitter4j.conf.*;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class HashtagReaderBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
Status tweet = (Status) tuple.getValueByField("tweet");
for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
System.out.println("Hashtag: " + hashtage.getText());
this.collector.emit(new Values(hashtage.getText()));
}
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("hashtag"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
हैशटैग काउंटर बोल्ट
उत्सर्जित हैशटैग को अग्रेषित किया जाएगा HashtagCounterBolt। यह बोल्ट सभी हैशटैग को संसाधित करेगा और जावा मैप ऑब्जेक्ट का उपयोग करके प्रत्येक और प्रत्येक हैशटैग और इसकी गिनती को बचाएगा। पूरा प्रोग्राम कोड नीचे दिया गया है।
कोडिंग: हैशटैग एनकाउंटरबोल्ट.जवा
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class HashtagCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String key = tuple.getString(0);
if(!counterMap.containsKey(key)){
counterMap.put(key, 1);
}else{
Integer c = counterMap.get(key) + 1;
counterMap.put(key, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("hashtag"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
टोपोलॉजी सबमिट करना
एक टोपोलॉजी जमा करना मुख्य अनुप्रयोग है। ट्विटर टोपोलॉजी के होते हैंTwitterSampleSpout, HashtagReaderBolt, तथा HashtagCounterBolt। निम्न प्रोग्राम कोड दिखाता है कि टोपोलॉजी कैसे जमा करें।
कोडिंग: TwitterHashtagStorm.java
import java.util.*;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class TwitterHashtagStorm {
public static void main(String[] args) throws Exception{
String consumerKey = args[0];
String consumerSecret = args[1];
String accessToken = args[2];
String accessTokenSecret = args[3];
String[] arguments = args.clone();
String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
consumerSecret, accessToken, accessTokenSecret, keyWords));
builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
.shuffleGrouping("twitter-spout");
builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
.fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TwitterHashtagStorm", config,
builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
बिल्डिंग और अनुप्रयोग चल रहा है
पूर्ण आवेदन में चार जावा कोड हैं। वे इस प्रकार हैं -
- TwitterSampleSpout.java
- HashtagReaderBolt.java
- HashtagCounterBolt.java
- TwitterHashtagStorm.java
आप निम्नलिखित कमांड का उपयोग करके एप्लिकेशन को संकलित कर सकते हैं -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java
निम्नलिखित आदेशों का उपयोग करके आवेदन निष्पादित करें -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>
उत्पादन
आवेदन वर्तमान उपलब्ध हैशटैग और उसकी गिनती को प्रिंट करेगा। आउटपुट निम्नलिखित के समान होना चाहिए -
Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1