Aplikasi Real Time (Twitter)

Mari kita analisis aplikasi waktu nyata untuk mendapatkan feed twitter terbaru dan tagar-nya. Sebelumnya, kami telah melihat integrasi Storm dan Spark dengan Kafka. Dalam kedua skenario, kami membuat Produsen Kafka (menggunakan cli) untuk mengirim pesan ke ekosistem Kafka. Kemudian, badai dan percikan api membaca pesan dengan menggunakan konsumen Kafka dan memasukkannya ke dalam badai dan memicu ekosistem masing-masing. Jadi, secara praktis kita perlu membuat Produser Kafka, yang harus -

  • Baca feed twitter menggunakan "Twitter Streaming API",
  • Proses feed,
  • Ekstrak HashTag dan
  • Kirimkan ke Kafka.

Setelah HashTag diterima oleh Kafka, integrasi Storm / Spark menerima informasi tersebut dan mengirimkannya ke ekosistem Storm / Spark.

API Streaming Twitter

"Twitter Streaming API" dapat diakses dalam bahasa pemrograman apa pun. "Twitter4j" adalah open source, pustaka Java tidak resmi, yang menyediakan modul berbasis Java untuk mengakses "Twitter Streaming API" dengan mudah. The "twitter4j" menyediakan kerangka kerja berbasis pendengar untuk mengakses tweet. Untuk mengakses "Twitter Streaming API", kita perlu masuk ke akun pengembang Twitter dan mendapatkan yang berikut iniOAuth detail otentikasi.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Setelah akun pengembang dibuat, unduh file jar “twitter4j” dan letakkan di jalur kelas java.

Pengodean produser Kafka Twitter Lengkap (KafkaTwitterProducer.java) tercantum di bawah -

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName() 
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               + statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" + 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId + 
            "upToStatusId:" + upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Kompilasi

Kompilasi aplikasi menggunakan perintah berikut -

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

Eksekusi

Buka dua konsol. Jalankan aplikasi yang dikompilasi di atas seperti yang ditunjukkan di bawah ini dalam satu konsol.

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

Jalankan salah satu dari aplikasi Spark / Storm yang dijelaskan di bab sebelumnya di win-dow lain. Hal utama yang perlu diperhatikan adalah bahwa topik yang digunakan harus sama dalam kedua kasus tersebut. Di sini, kami telah menggunakan "topik-pertama-saya" sebagai nama topik.

Keluaran

Output dari aplikasi ini akan tergantung pada kata kunci dan feed twitter saat ini. Keluaran sampel ditentukan di bawah ini (integrasi badai).

. . .
food : 1
foodie : 2
burger : 1
. . .