Apache Storm in Twitter
Hier in diesem Kapitel werden wir eine Echtzeitanwendung von Apache Storm diskutieren. Wir werden sehen, wie Storm in Twitter verwendet wird.
Twitter ist ein sozialer Online-Netzwerkdienst, der eine Plattform zum Senden und Empfangen von Benutzer-Tweets bietet. Registrierte Benutzer können Tweets lesen und veröffentlichen, nicht registrierte Benutzer können jedoch nur Tweets lesen. Mit dem Hashtag werden Tweets nach Schlüsselwörtern kategorisiert, indem # vor dem entsprechenden Schlüsselwort angehängt wird. Nehmen wir nun ein Echtzeitszenario, um das am häufigsten verwendete Hashtag pro Thema zu finden.
Auslauferstellung
Der Zweck von Spout ist es, die von Personen eingereichten Tweets so schnell wie möglich zu erhalten. Twitter bietet die „Twitter Streaming API“, ein auf Webdiensten basierendes Tool, mit dem die von Personen übermittelten Tweets in Echtzeit abgerufen werden können. Auf die Twitter Streaming API kann in jeder Programmiersprache zugegriffen werden.
twitter4j ist eine inoffizielle Open-Source-Java-Bibliothek, die ein Java-basiertes Modul für den einfachen Zugriff auf die Twitter-Streaming-API bereitstellt. twitter4jbietet ein Listener-basiertes Framework für den Zugriff auf die Tweets. Um auf die Twitter-Streaming-API zugreifen zu können, müssen wir uns für das Twitter-Entwicklerkonto anmelden und die folgenden OAuth-Authentifizierungsdetails erhalten.
- Customerkey
- CustomerSecret
- AccessToken
- AccessTookenSecret
Storm bietet einen Twitter-Auslauf, TwitterSampleSpout,in seinem Starter-Kit. Wir werden es verwenden, um die Tweets abzurufen. Der Auslauf benötigt OAuth-Authentifizierungsdetails und mindestens ein Schlüsselwort. Der Auslauf sendet Echtzeit-Tweets basierend auf Schlüsselwörtern aus. Der vollständige Programmcode ist unten angegeben.
Codierung: TwitterSampleSpout.java
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
LinkedBlockingQueue<Status> queue = null;
TwitterStream _twitterStream;
String consumerKey;
String consumerSecret;
String accessToken;
String accessTokenSecret;
String[] keyWords;
public TwitterSampleSpout(String consumerKey, String consumerSecret,
String accessToken, String accessTokenSecret, String[] keyWords) {
this.consumerKey = consumerKey;
this.consumerSecret = consumerSecret;
this.accessToken = accessToken;
this.accessTokenSecret = accessTokenSecret;
this.keyWords = keyWords;
}
public TwitterSampleSpout() {
// TODO Auto-generated constructor stub
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
queue = new LinkedBlockingQueue<Status>(1000);
_collector = collector;
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
}
@Override
public void onDeletionNotice(StatusDeletionNotice sdn) {}
@Override
public void onTrackLimitationNotice(int i) {}
@Override
public void onScrubGeo(long l, long l1) {}
@Override
public void onException(Exception ex) {}
@Override
public void onStallWarning(StallWarning arg0) {
// TODO Auto-generated method stub
}
};
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);
_twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
_twitterStream.addListener(listener);
if (keyWords.length == 0) {
_twitterStream.sample();
}else {
FilterQuery query = new FilterQuery().track(keyWords);
_twitterStream.filter(query);
}
}
@Override
public void nextTuple() {
Status ret = queue.poll();
if (ret == null) {
Utils.sleep(50);
} else {
_collector.emit(new Values(ret));
}
}
@Override
public void close() {
_twitterStream.shutdown();
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config ret = new Config();
ret.setMaxTaskParallelism(1);
return ret;
}
@Override
public void ack(Object id) {}
@Override
public void fail(Object id) {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweet"));
}
}
Hashtag Reader Bolt
Der vom Auslauf ausgestrahlte Tweet wird an weitergeleitet HashtagReaderBolt, der den Tweet verarbeitet und alle verfügbaren Hashtags ausgibt. HashtagReaderBolt verwendetgetHashTagEntitiesMethode von twitter4j zur Verfügung gestellt. getHashTagEntities liest den Tweet und gibt die Liste der Hashtags zurück. Der vollständige Programmcode lautet wie folgt:
Codierung: HashtagReaderBolt.java
import java.util.HashMap;
import java.util.Map;
import twitter4j.*;
import twitter4j.conf.*;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class HashtagReaderBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
Status tweet = (Status) tuple.getValueByField("tweet");
for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
System.out.println("Hashtag: " + hashtage.getText());
this.collector.emit(new Values(hashtage.getText()));
}
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("hashtag"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Hashtag Counter Bolt
Das ausgegebene Hashtag wird an weitergeleitet HashtagCounterBolt. Diese Schraube verarbeitet alle Hashtags und speichert jedes einzelne Hashtag und seine Anzahl im Speicher mithilfe des Java Map-Objekts. Der vollständige Programmcode ist unten angegeben.
Codierung: HashtagCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class HashtagCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String key = tuple.getString(0);
if(!counterMap.containsKey(key)){
counterMap.put(key, 1);
}else{
Integer c = counterMap.get(key) + 1;
counterMap.put(key, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("hashtag"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Einreichen einer Topologie
Das Einreichen einer Topologie ist die Hauptanwendung. Die Twitter-Topologie besteht ausTwitterSampleSpout, HashtagReaderBolt, und HashtagCounterBolt. Der folgende Programmcode zeigt, wie eine Topologie gesendet wird.
Codierung: TwitterHashtagStorm.java
import java.util.*;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class TwitterHashtagStorm {
public static void main(String[] args) throws Exception{
String consumerKey = args[0];
String consumerSecret = args[1];
String accessToken = args[2];
String accessTokenSecret = args[3];
String[] arguments = args.clone();
String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
consumerSecret, accessToken, accessTokenSecret, keyWords));
builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
.shuffleGrouping("twitter-spout");
builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
.fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TwitterHashtagStorm", config,
builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
Erstellen und Ausführen der Anwendung
Die vollständige Anwendung verfügt über vier Java-Codes. Sie sind wie folgt -
- TwitterSampleSpout.java
- HashtagReaderBolt.java
- HashtagCounterBolt.java
- TwitterHashtagStorm.java
Sie können die Anwendung mit dem folgenden Befehl kompilieren:
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java
Führen Sie die Anwendung mit den folgenden Befehlen aus:
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>
Ausgabe
Die Anwendung druckt das aktuell verfügbare Hashtag und dessen Anzahl. Die Ausgabe sollte wie folgt aussehen:
Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1