Aplikacja czasu rzeczywistego (Twitter)

Przeanalizujmy aplikację działającą w czasie rzeczywistym, aby uzyskać najnowsze kanały z Twittera i jej hashtagi. Wcześniej widzieliśmy integrację Storm i Spark z Kafką. W obu scenariuszach stworzyliśmy Kafka Producer (używając cli) do wysyłania wiadomości do ekosystemu Kafka. Następnie integracja burzy i iskier odczytuje wiadomości za pomocą konsumenta Kafki i wstrzykuje je odpowiednio do ekosystemu burzy i iskier. Więc praktycznie musimy stworzyć Kafka Producer, który powinien -

  • Czytaj kanały z Twittera za pomocą „Twitter Streaming API”,
  • Przetwarzaj pasze,
  • Wyodrębnij HashTags i
  • Wyślij to do Kafki.

Gdy Kafka otrzyma HashTagi , integracja Storm / Spark otrzyma informacje i wyśle ​​je do ekosystemu Storm / Spark.

Twitter Streaming API

Dostęp do „Twitter Streaming API” można uzyskać w dowolnym języku programowania. „Twitter4j” to nieoficjalna biblioteka Java o otwartym kodzie źródłowym, która udostępnia oparty na Javie moduł umożliwiający łatwy dostęp do „Twitter Streaming API”. „Twitter4j” udostępnia strukturę opartą na odbiorniku umożliwiającą dostęp do tweetów. Aby uzyskać dostęp do „Twitter Streaming API”, musimy zalogować się na konto programisty na Twitterze i otrzymać następujące informacjeOAuth szczegóły uwierzytelniania.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Po utworzeniu konta programisty pobierz pliki jar „twitter4j” i umieść je w ścieżce klas Java.

Pełne kodowanie producenta na Twitterze Kafka (KafkaTwitterProducer.java) jest wymienione poniżej -

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName() 
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               + statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" + 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId + 
            "upToStatusId:" + upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Kompilacja

Skompiluj aplikację za pomocą następującego polecenia -

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

Wykonanie

Otwórz dwie konsole. Uruchom powyższą skompilowaną aplikację, jak pokazano poniżej, w jednej konsoli.

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

Uruchom dowolną aplikację Spark / Storm opisaną w poprzednim rozdziale w innym oknie. Należy przede wszystkim zauważyć, że zastosowany temat powinien być taki sam w obu przypadkach. Tutaj użyliśmy „mój-pierwszy-temat” jako nazwy tematu.

Wynik

Wynik tej aplikacji będzie zależał od słów kluczowych i aktualnego kanału na Twitterze. Przykładowe dane wyjściowe podano poniżej (integracja z burzą).

. . .
food : 1
foodie : 2
burger : 1
. . .