Apache Storm ใน Twitter

ในบทนี้เราจะพูดถึงการประยุกต์ใช้ Apache Storm แบบเรียลไทม์ เราจะมาดูกันว่า Storm ใช้อย่างไรใน Twitter

ทวิตเตอร์

Twitter เป็นบริการเครือข่ายสังคมออนไลน์ที่มีแพลตฟอร์มในการส่งและรับทวีตของผู้ใช้ ผู้ใช้ที่ลงทะเบียนสามารถอ่านและโพสต์ทวีตได้ แต่ผู้ใช้ที่ไม่ได้ลงทะเบียนสามารถอ่านทวีตได้เท่านั้น แฮชแท็กใช้ในการจัดหมวดหมู่ทวีตตามคีย์เวิร์ดโดยต่อท้าย # ก่อนคีย์เวิร์ดที่เกี่ยวข้อง ตอนนี้ให้เราใช้สถานการณ์แบบเรียลไทม์เพื่อค้นหาแฮชแท็กที่ใช้บ่อยที่สุดตามหัวข้อ

การสร้างรางน้ำ

จุดประสงค์ของพวยกาคือการรับทวีตที่ส่งโดยผู้คนโดยเร็วที่สุด Twitter มี“ Twitter Streaming API” ซึ่งเป็นเครื่องมือที่ใช้บริการบนเว็บเพื่อดึงทวีตที่ส่งโดยผู้คนแบบเรียลไทม์ Twitter Streaming API สามารถเข้าถึงได้ในภาษาโปรแกรมใดก็ได้

twitter4j เป็นไลบรารี Java แบบโอเพ่นซอร์สที่ไม่เป็นทางการซึ่งมีโมดูลที่ใช้ Java เพื่อเข้าถึง Twitter Streaming API ได้อย่างง่ายดาย twitter4jจัดเตรียมเฟรมเวิร์กสำหรับผู้ฟังเพื่อเข้าถึงทวีต ในการเข้าถึง Twitter Streaming API เราต้องลงชื่อเข้าใช้บัญชีผู้พัฒนา Twitter และควรได้รับรายละเอียดการตรวจสอบสิทธิ์ OAuth ดังต่อไปนี้

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Storm ให้ Twitter พวยกา TwitterSampleSpout,ในชุดเริ่มต้น เราจะใช้มันเพื่อดึงทวีต พวยกาต้องการรายละเอียดการตรวจสอบสิทธิ์ OAuth และอย่างน้อยคำหลัก พวยกาจะส่งเสียงทวีตตามเวลาจริงตามคำสำคัญ รหัสโปรแกรมที่สมบูรณ์จะได้รับด้านล่าง

การเข้ารหัส: TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }
		
   public TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }
					
            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}
					
            @Override
            public void onTrackLimitationNotice(int i) {}
					
            @Override
            public void onScrubGeo(long l, long l1) {}
					
            @Override
            public void onException(Exception ex) {}
					
            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = new ConfigurationBuilder();
				
         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);
				
         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   public void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }
			
   @Override
   public void close() {
      _twitterStream.shutdown();
   }
			
   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }
			
   @Override
   public void ack(Object id) {}
			
   @Override
   public void fail(Object id) {}
			
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

Hashtag Reader Bolt

ทวีตที่ปล่อยออกมาโดยพวยกาจะถูกส่งต่อไปยัง HashtagReaderBoltซึ่งจะประมวลผลทวีตและปล่อยแฮชแท็กทั้งหมดที่มี HashtagReaderBolt ใช้getHashTagEntitiesวิธีการให้โดย twitter4j getHashTagEntities อ่านทวีตและส่งคืนรายการแฮชแท็ก รหัสโปรแกรมที่สมบูรณ์มีดังนี้ -

การเข้ารหัส: HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Hashtag Counter Bolt

แฮชแท็กที่ปล่อยออกมาจะถูกส่งต่อไปยัง HashtagCounterBolt. สายฟ้านี้จะประมวลผลแฮชแท็กทั้งหมดและบันทึกแต่ละแฮชแท็กและจำนวนของมันในหน่วยความจำโดยใช้วัตถุ Java Map รหัสโปรแกรมที่สมบูรณ์จะได้รับด้านล่าง

การเข้ารหัส: HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

การส่งโทโพโลยี

การส่งโทโพโลยีเป็นแอปพลิเคชันหลัก โทโพโลยีของ Twitter ประกอบด้วยTwitterSampleSpout, HashtagReaderBoltและ HashtagCounterBolt. รหัสโปรแกรมต่อไปนี้แสดงวิธีการส่งโทโพโลยี

การเข้ารหัส: TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

การสร้างและเรียกใช้แอปพลิเคชัน

แอปพลิเคชันที่สมบูรณ์มีรหัส Java สี่ตัว มีดังนี้ -

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

คุณสามารถคอมไพล์แอปพลิเคชันโดยใช้คำสั่งต่อไปนี้ -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

ดำเนินการแอปพลิเคชันโดยใช้คำสั่งต่อไปนี้ -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

เอาต์พุต

แอปพลิเคชันจะพิมพ์แฮชแท็กที่มีอยู่ในปัจจุบันและจำนวนของมัน ผลลัพธ์ควรคล้ายกับสิ่งต่อไปนี้ -

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1