Apache Storm - szybki przewodnik

Co to jest Apache Storm?

Apache Storm to rozproszony system przetwarzania dużych zbiorów danych w czasie rzeczywistym. Storm jest zaprojektowany do przetwarzania ogromnych ilości danych w odpornej na błędy i poziomej skalowalnej metodzie. Jest to platforma przesyłania strumieniowego danych, która ma możliwość uzyskania najwyższych współczynników pozyskiwania. Chociaż Storm jest bezstanowy, zarządza środowiskiem rozproszonym i stanem klastra za pośrednictwem Apache ZooKeeper. Jest to proste i można równolegle wykonywać wszelkiego rodzaju manipulacje na danych w czasie rzeczywistym.

Apache Storm nadal jest liderem w dziedzinie analizy danych w czasie rzeczywistym. Storm jest łatwy w konfiguracji, obsłudze i gwarantuje, że każda wiadomość zostanie przynajmniej raz przetworzona w topologii.

Apache Storm vs Hadoop

Zasadniczo platformy Hadoop i Storm są używane do analizowania dużych zbiorów danych. Oba uzupełniają się i różnią w niektórych aspektach. Apache Storm wykonuje wszystkie operacje z wyjątkiem trwałości, podczas gdy Hadoop jest dobry we wszystkim, ale opóźnia się w obliczeniach w czasie rzeczywistym. W poniższej tabeli porównano atrybuty Storm i Hadoop.

Burza Hadoop
Przetwarzanie strumienia w czasie rzeczywistym Przetwarzanie wsadowe
Bezpaństwowcy Stanowy
Architektura Master / Slave z koordynacją opartą na ZooKeeper. Węzeł główny nosi nazwęnimbus i niewolnicy supervisors. Architektura master-slave z / bez koordynacji opartej na ZooKeeper. Węzeł główny tojob tracker a węzeł slave to task tracker.
Proces przesyłania strumieniowego Storm może uzyskać dostęp do dziesiątek tysięcy wiadomości na sekundę w klastrze. Rozproszony system plików Hadoop (HDFS) wykorzystuje strukturę MapReduce do przetwarzania ogromnych ilości danych, które zajmują minuty lub godziny.
Topologia Storm działa do momentu wyłączenia przez użytkownika lub nieoczekiwanej, nieodwracalnej awarii. Zadania MapReduce są wykonywane w kolejności i ostatecznie kończone.
Both are distributed and fault-tolerant
Jeśli nimbus / nadzorca umrze, ponowne uruchomienie sprawia, że ​​będzie kontynuowane od miejsca, w którym zostało zatrzymane, więc nic nie zostanie naruszone. Jeśli JobTracker umrze, wszystkie uruchomione zadania zostaną utracone.

Przypadki użycia Apache Storm

Apache Storm jest bardzo znany z przetwarzania dużych strumieni danych w czasie rzeczywistym. Z tego powodu większość firm używa Storm jako integralnej części swojego systemu. Oto niektóre godne uwagi przykłady -

Twitter- Twitter korzysta z Apache Storm w zakresie „produktów Analytics dla wydawców”. „Produkty analityczne wydawcy” przetwarzają każdy tweet i kliknięcie na platformie Twitter. Apache Storm jest głęboko zintegrowany z infrastrukturą Twittera.

NaviSite- NaviSite używa Storm do monitorowania / audytu dzienników zdarzeń. Wszystkie dzienniki wygenerowane w systemie przejdą przez burzę. Storm porówna wiadomość ze skonfigurowanym zestawem wyrażeń regularnych i jeśli znajdzie dopasowanie, ta konkretna wiadomość zostanie zapisana w bazie danych.

Wego- Wego to wyszukiwarka metasearch zlokalizowana w Singapurze. Dane dotyczące podróży pochodzą z wielu źródeł na całym świecie w różnym czasie. Storm pomaga Wego przeszukiwać dane w czasie rzeczywistym, rozwiązuje problemy z współbieżnością i znajduje najlepsze dopasowanie dla użytkownika końcowego.

Korzyści Apache Storm

Oto lista korzyści, które oferuje Apache Storm -

  • Storm jest oprogramowaniem open source, solidnym i przyjaznym dla użytkownika. Może znaleźć zastosowanie zarówno w małych firmach, jak i dużych korporacjach.

  • Storm jest odporny na błędy, elastyczny, niezawodny i obsługuje każdy język programowania.

  • Umożliwia przetwarzanie strumieniowe w czasie rzeczywistym.

  • Storm jest niewiarygodnie szybki, ponieważ ma ogromną moc przetwarzania danych.

  • Storm może utrzymać wydajność nawet przy rosnącym obciążeniu, liniowo dodając zasoby. Jest wysoce skalowalny.

  • Storm przeprowadza odświeżanie danych i kompleksową odpowiedź w ciągu kilku sekund lub minut, w zależności od problemu. Ma bardzo małe opóźnienie.

  • Storm ma inteligencję operacyjną.

  • Storm zapewnia gwarantowane przetwarzanie danych, nawet jeśli którykolwiek z podłączonych węzłów w klastrze umrze lub wiadomości zostaną utracone.

Apache Storm odczytuje nieprzetworzony strumień danych w czasie rzeczywistym z jednego końca i przekazuje go przez sekwencję małych jednostek przetwarzania, a na drugim końcu wysyła przetworzone / przydatne informacje.

Poniższy diagram przedstawia podstawową koncepcję Apache Storm.

Przyjrzyjmy się teraz bliżej składnikom Apache Storm -

składniki Opis
Tuple Krotka to główna struktura danych w Storm. Jest to lista uporządkowanych elementów. Domyślnie krotka obsługuje wszystkie typy danych. Ogólnie jest modelowany jako zestaw wartości oddzielonych przecinkami i przekazywany do klastra Storm.
Strumień Strumień to nieuporządkowana sekwencja krotek.
Wylewki Źródło strumienia. Ogólnie Storm akceptuje dane wejściowe z surowych źródeł danych, takich jak Twitter Streaming API, kolejka Apache Kafka, kolejka Kestrel itp. W przeciwnym razie możesz pisać spoty, aby odczytywać dane ze źródeł danych. „ISpout” to podstawowy interfejs do implementowania spoutów. Niektóre z konkretnych interfejsów to IRichSpout, BaseRichSpout, KafkaSpout itp.
Śruby Śruby to logiczne jednostki przetwarzające. Wylewki przekazują dane do śrub, a śruby przetwarzają i wytwarzają nowy strumień wyjściowy. Bolts może wykonywać operacje filtrowania, agregacji, łączenia, interakcji ze źródłami danych i bazami danych. Bolt odbiera dane i wysyła je do jednej lub kilku śrub. „IBolt” jest głównym interfejsem do implementacji śrub. Niektóre z typowych interfejsów to IRichBolt, IBasicBolt itp.

Weźmy przykład „analizy Twittera” w czasie rzeczywistym i zobaczmy, jak można go modelować w Apache Storm. Poniższy diagram przedstawia strukturę.

Dane wejściowe do „analizy Twittera” pochodzą z Twitter Streaming API. Spout odczyta tweety użytkowników korzystających z Twitter Streaming API i wyprowadzi jako strumień krotek. Pojedyncza krotka z wylewki będzie miała nazwę użytkownika Twittera i pojedynczy tweet jako wartości oddzielone przecinkami. Następnie ta para krotek zostanie przesłana do Bolta, a Bolt podzieli tweet na poszczególne słowa, obliczy liczbę słów i utrwali informacje w skonfigurowanym źródle danych. Teraz możemy łatwo uzyskać wynik, wysyłając zapytanie do źródła danych.

Topologia

Rzygacze i śruby są ze sobą połączone i tworzą topologię. Logika aplikacji czasu rzeczywistego jest określona w topologii Storm. Mówiąc prościej, topologia to skierowany graf, w którym wierzchołki są obliczane, a krawędzie są strumieniem danych.

Prosta topologia zaczyna się od wylotów. Wylewka wysyła dane do jednej lub więcej śrub. Śruba reprezentuje węzeł w topologii, który ma najmniejszą logikę przetwarzania, a dane wyjściowe śruby mogą być wysyłane do innej śruby jako dane wejściowe.

Storm utrzymuje topologię zawsze działającą, dopóki nie zostanie usunięta. Głównym zadaniem Apache Storm jest uruchomienie topologii i uruchomienie dowolnej liczby topologii w danym czasie.

Zadania

Teraz masz podstawowe pojęcie o wylewkach i śrubach. Są najmniejszą jednostką logiczną topologii, a topologia jest budowana przy użyciu jednego wylewki i szeregu śrub. Powinny być wykonane poprawnie w określonej kolejności, aby topologia działała pomyślnie. Wykonanie każdego dziobka i śruby przez Storm nazywane jest „Zadaniami”. W prostych słowach zadanie polega na wykonaniu wylewki lub zasuwy. W danym momencie każdy dziobek i śruba mogą mieć wiele wystąpień działających w wielu oddzielnych gwintach.

Pracownicy

Topologia działa w sposób rozproszony na wielu węzłach roboczych. Storm równomiernie rozkłada zadania na wszystkie węzły robocze. Rolą węzła roboczego jest nasłuchiwanie zadań i uruchamianie lub zatrzymywanie procesów za każdym razem, gdy nadejdzie nowe zadanie.

Grupowanie strumieni

Strumień danych przepływa z wylewek do śrub lub od jednej śruby do drugiej. Grupowanie strumieni kontroluje sposób kierowania krotek w topologii i pomaga nam zrozumieć przepływ krotek w topologii. Istnieją cztery wbudowane grupy, jak wyjaśniono poniżej.

Shuffle Grouping

W przypadku grupowania losowego równa liczba krotek jest rozdzielana losowo na wszystkich pracowników wykonujących śruby. Poniższy diagram przedstawia strukturę.

Grupowanie pól

Pola z takimi samymi wartościami w krotkach są grupowane, a pozostałe krotki trzymane na zewnątrz. Następnie krotki z tymi samymi wartościami pól są przesyłane dalej do tego samego pracownika wykonującego śruby. Na przykład, jeśli strumień jest zgrupowany według pola „słowo”, to krotki z tym samym ciągiem „Hello” zostaną przeniesione do tego samego procesu roboczego. Poniższy diagram przedstawia sposób działania grupowania pól.

Grupowanie globalne

Wszystkie strumienie można grupować i przekazywać do jednej śruby. To grupowanie wysyła krotki wygenerowane przez wszystkie wystąpienia źródła do pojedynczej instancji docelowej (w szczególności wybierz proces roboczy o najniższym identyfikatorze).

Wszystkie grupowanie

All Grouping wysyła jedną kopię każdej krotki do wszystkich wystąpień śruby odbierającej. Ten rodzaj grupowania służy do wysyłania sygnałów do rygli. Całe grupowanie jest przydatne w operacjach łączenia.

Jedną z głównych zalet Apache Storm jest to, że jest to odporna na błędy, szybka i pozbawiona rozproszonych aplikacji typu „Single Point of Failure” (SPOF). Możemy zainstalować Apache Storm w tylu systemach, ile potrzeba, aby zwiększyć pojemność aplikacji.

Przyjrzyjmy się, jak został zaprojektowany klaster Apache Storm i jego wewnętrzna architektura. Poniższy diagram przedstawia projekt klastra.

Apache Storm ma dwa typy węzłów, Nimbus (węzeł główny) i Supervisor(węzeł roboczy). Nimbus to centralny składnik Apache Storm. Głównym zadaniem Nimbusa jest uruchomienie topologii Storm. Nimbus analizuje topologię i zbiera zadanie do wykonania. Następnie przydzieli zadanie do dostępnego przełożonego.

Przełożony będzie miał jeden lub więcej procesów roboczych. Przełożony przekaże zadania do procesów roboczych. Proces roboczy utworzy tyle programów wykonawczych, ile potrzeba, i uruchomi zadanie. Apache Storm wykorzystuje wewnętrzny rozproszony system przesyłania wiadomości do komunikacji między nimbusem a przełożonymi.

składniki Opis
Chmura Nimbus to główny węzeł klastra Storm. Wszystkie inne węzły w klastrze nazywane są jakoworker nodes. Węzeł główny jest odpowiedzialny za dystrybucję danych do wszystkich węzłów roboczych, przydzielanie zadań do węzłów roboczych i monitorowanie awarii.
Kierownik Węzły, które wykonują instrukcje podane przez nimbus, nazywane są nadzorcami. ZAsupervisor ma wiele procesów roboczych i zarządza procesami roboczymi w celu wykonania zadań przypisanych przez nimbus.
Proces roboczy Proces roboczy będzie wykonywał zadania związane z określoną topologią. Proces roboczy nie uruchomi zadania samodzielnie, ale je utworzyexecutorsi prosi ich o wykonanie określonego zadania. Proces roboczy będzie miał wielu wykonawców.
Wykonawca Executor to nic innego jak pojedynczy wątek spawnowany przez proces roboczy. Wykonawca uruchamia jedno lub więcej zadań, ale tylko dla określonej wylewki lub śruby.
Zadanie Zadanie wykonuje faktyczne przetwarzanie danych. Jest to więc wylewka lub śruba.
Framework ZooKeeper

Apache ZooKeeper to usługa używana przez klaster (grupę węzłów) do koordynowania między sobą i utrzymywania współdzielonych danych za pomocą zaawansowanych technik synchronizacji. Nimbus jest bezstanowy, więc monitorowanie stanu węzła roboczego zależy od ZooKeepera.

ZooKeeper pomaga przełożonemu w interakcji z nimbem. Jest odpowiedzialny za utrzymywanie stanu nimbusa i nadzorcy.

Storm ma charakter bezpaństwowy. Mimo że bezstanowa natura ma swoje wady, w rzeczywistości pomaga Storm przetwarzać dane w czasie rzeczywistym w najlepszy możliwy i najszybszy sposób.

Storm nie jest jednak całkowicie bezpaństwowy. Przechowuje swój stan w Apache ZooKeeper. Ponieważ stan jest dostępny w Apache ZooKeeper, uszkodzony nimbus można uruchomić ponownie i rozpocząć pracę od miejsca, w którym został. Zwykle narzędzia do monitorowania usług, takie jakmonit będzie monitorować Nimbus i ponownie go uruchamiać, jeśli wystąpi jakakolwiek awaria.

Apache Storm ma również zaawansowaną topologię o nazwie Trident Topologyz utrzymaniem stanu, a także zapewnia interfejs API wysokiego poziomu, taki jak Pig. Omówimy wszystkie te funkcje w następnych rozdziałach.

Działający klaster Storm powinien mieć jeden nimbus i co najmniej jednego nadzorcę. Innym ważnym węzłem jest Apache ZooKeeper, który będzie używany do koordynacji między nimbusem a nadzorcami.

Przyjrzyjmy się teraz bliżej przepływowi pracy Apache Storm -

  • Początkowo nimbus będzie czekał na przesłanie do niego „Topologii burzy”.

  • Po przesłaniu topologii, przetwarza topologię i zbiera wszystkie zadania, które mają być wykonane, oraz kolejność, w jakiej zadanie ma zostać wykonane.

  • Następnie nimbus równomiernie rozdzieli zadania między wszystkich dostępnych przełożonych.

  • W określonym przedziale czasu wszyscy nadzorcy będą wysyłali bicie serca do nimbusa, aby poinformować, że nadal żyją.

  • Kiedy przełożony umiera i nie wysyła bicia serca do nimbu, wtedy nimbus przydziela zadania innemu nadzorcy.

  • Kiedy sam nimb umrze, przełożeni będą bez problemu pracować nad już przydzielonym zadaniem.

  • Po wykonaniu wszystkich zadań przełożony będzie czekał na nadejście nowego zadania.

  • W międzyczasie martwy nimbus zostanie automatycznie ponownie uruchomiony przez narzędzia do monitorowania usług.

  • Zrestartowany nimb będzie kontynuowany od miejsca, w którym został zatrzymany. Podobnie martwego nadzorcy można również uruchomić ponownie automatycznie. Ponieważ zarówno nimbus, jak i nadzorca mogą zostać ponownie uruchomione automatycznie i oba będą kontynuowane jak poprzednio, Storm ma gwarancję, że przetworzy całe zadanie przynajmniej raz.

  • Po przetworzeniu wszystkich topologii nimbus czeka na nadejście nowej topologii i podobnie nadzorca czeka na nowe zadania.

Domyślnie w klastrze Storm są dwa tryby -

  • Local mode- Ten tryb jest używany do programowania, testowania i debugowania, ponieważ jest to najłatwiejszy sposób, aby zobaczyć wszystkie komponenty topologii współpracujące. W tym trybie możemy dostosować parametry, które pozwolą nam zobaczyć, jak nasza topologia działa w różnych środowiskach konfiguracyjnych Storm. W trybie lokalnym topologie burzy działają na komputerze lokalnym w jednej maszynie JVM.

  • Production mode- W tym trybie przekazujemy naszą topologię do działającego klastra burzowego, który składa się z wielu procesów, zwykle działających na różnych maszynach. Jak omówiono w przepływie pracy burzy, działający klaster będzie działał przez czas nieokreślony, dopóki nie zostanie zamknięty.

Apache Storm przetwarza dane w czasie rzeczywistym, a dane wejściowe zwykle pochodzą z systemu kolejkowania wiadomości. Zewnętrzny rozproszony system przesyłania wiadomości zapewni dane wejściowe niezbędne do obliczeń w czasie rzeczywistym. Spout odczyta dane z systemu przesyłania wiadomości i przekształci je w krotki i wprowadzi do Apache Storm. Ciekawostką jest to, że Apache Storm wykorzystuje swój własny rozproszony system przesyłania wiadomości wewnętrznie do komunikacji między nimbusem a przełożonym.

Co to jest rozproszony system przesyłania wiadomości?

Rozproszone przesyłanie wiadomości opiera się na koncepcji niezawodnego kolejkowania wiadomości. Wiadomości są kolejkowane asynchronicznie między aplikacjami klienckimi a systemami przesyłania wiadomości. Rozproszony system obsługi wiadomości zapewnia korzyści w postaci niezawodności, skalowalności i trwałości.

Większość wzorców przesyłania wiadomości jest zgodna z publish-subscribe model (po prostu Pub-Sub), gdzie są wywoływani nadawcy wiadomości publishers i ci, którzy chcą otrzymywać orędzia, są wezwani subscribers.

Po opublikowaniu wiadomości przez nadawcę abonenci mogą odebrać wybraną wiadomość za pomocą opcji filtrowania. Zwykle mamy dwa rodzaje filtrowania, jeden totopic-based filtering a inny jest content-based filtering.

Należy pamiętać, że model pub-sub może komunikować się tylko za pośrednictwem wiadomości. Jest to bardzo luźno związana architektura; nawet nadawcy nie wiedzą, kim są ich subskrybenci. Wiele wzorców komunikatów umożliwia brokerowi komunikatów wymianę komunikatów publikowania w celu uzyskania szybkiego dostępu dla wielu subskrybentów. Przykładem z życia jest Dish TV, które publikuje różne kanały, takie jak sport, filmy, muzyka itp., A każdy może subskrybować własny zestaw kanałów i pobierać je, gdy tylko dostępne są jego kanały.

W poniższej tabeli opisano niektóre popularne systemy przesyłania wiadomości o dużej przepustowości -

Rozproszony system przesyłania wiadomości Opis
Apache Kafka Kafka powstał w korporacji LinkedIn, a później stał się podprojektem Apache. Apache Kafka jest oparty na trwałym, rozproszonym modelu publikowania i subskrybowania, z obsługą brokera. Kafka jest szybka, skalowalna i bardzo wydajna.
RabbitMQ RabbitMQ to niezawodna, rozproszona aplikacja do przesyłania wiadomości typu open source. Jest łatwy w użyciu i działa na wszystkich platformach.
JMS (Java Message Service) JMS to interfejs API typu open source, który obsługuje tworzenie, odczytywanie i wysyłanie wiadomości z jednej aplikacji do drugiej. Zapewnia gwarantowane dostarczanie wiadomości i jest zgodny z modelem publikowania i subskrybowania.
ActiveMQ System przesyłania wiadomości ActiveMQ jest interfejsem API typu open source dla JMS.
ZeroMQ ZeroMQ to przetwarzanie komunikatów peer-peer bez brokera. Zapewnia wzorce komunikatów typu push-pull, router-dealer.
Pustułka Kestrel to szybka, niezawodna i prosta rozproszona kolejka komunikatów.

Protokół oszczędności

Thrift został stworzony na Facebooku w celu rozwoju usług międzyjęzykowych i zdalnego wywoływania procedur (RPC). Później stał się projektem Open Source Apache. Apache Thrift toInterface Definition Language i pozwala w łatwy sposób definiować nowe typy danych i wdrażanie usług na podstawie zdefiniowanych typów danych.

Apache Thrift to także platforma komunikacyjna obsługująca systemy wbudowane, aplikacje mobilne, aplikacje internetowe i wiele innych języków programowania. Niektóre z kluczowych funkcji związanych z Apache Thrift to jego modułowość, elastyczność i wysoka wydajność. Ponadto umożliwia przesyłanie strumieniowe, przesyłanie wiadomości i RPC w aplikacjach rozproszonych.

Storm intensywnie używa protokołu Thrift do wewnętrznej komunikacji i definiowania danych. Topologia burzy jest prostaThrift Structs. Storm Nimbus obsługujący topologię w Apache Storm to plikThrift service.

Zobaczmy teraz, jak zainstalować framework Apache Storm na twoim komputerze. Tutaj są trzy główne kroki -

  • Zainstaluj Javę w swoim systemie, jeśli jeszcze jej nie masz.
  • Zainstaluj framework ZooKeeper.
  • Zainstaluj framework Apache Storm.

Krok 1 - weryfikacja instalacji Java

Użyj następującego polecenia, aby sprawdzić, czy w systemie jest już zainstalowana Java.

$ java -version

Jeśli Java już tam jest, zobaczysz jej numer wersji. W przeciwnym razie pobierz najnowszą wersję JDK.

Krok 1.1 - Pobierz JDK

Pobierz najnowszą wersję JDK, korzystając z następującego łącza - www.oracle.com

Najnowsza wersja to JDK 8u 60, a plik to “jdk-8u60-linux-x64.tar.gz”. Pobierz plik na swój komputer.

Krok 1.2 - Rozpakuj pliki

Zwykle pliki są pobierane do downloadsteczka. Wyodrębnij ustawienia tar za pomocą następujących poleceń.

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

Krok 1.3 - Przejdź do katalogu opt

Aby udostępnić środowisko Java wszystkim użytkownikom, przenieś wyodrębnioną zawartość Java do folderu „/ usr / local / java”.

$ su
password: (type password of root user)
$ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/

Krok 1.4 - Ustaw ścieżkę

Aby ustawić ścieżkę i zmienne JAVA_HOME, dodaj następujące polecenia do pliku ~ / .bashrc.

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

Teraz zastosuj wszystkie zmiany w aktualnie działającym systemie.

$ source ~/.bashrc

Krok 1.5 - Alternatywy dla języka Java

Użyj następującego polecenia, aby zmienić alternatywy Java.

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Krok 1.6

Teraz zweryfikuj instalację Java za pomocą polecenia weryfikacji (java -version) wyjaśnione w kroku 1.

Krok 2 - Instalacja oprogramowania ZooKeeper Framework

Krok 2.1 - Pobierz ZooKeeper

Aby zainstalować środowisko ZooKeeper na swoim komputerze, kliknij poniższe łącze i pobierz najnowszą wersję ZooKeeper http://zookeeper.apache.org/releases.html

W chwili obecnej najnowsza wersja ZooKeeper to 3.4.6 (ZooKeeper-3.4.6.tar.gz).

Krok 2.2 - Wypakuj plik tar

Wyodrębnij plik tar, używając następujących poleceń -

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6
$ mkdir data

Krok 2.3 - Utwórz plik konfiguracyjny

Otwórz plik konfiguracyjny o nazwie „conf / zoo.cfg”, używając polecenia „vi conf / zoo.cfg” i ustawiając wszystkie poniższe parametry jako punkt początkowy.

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

Po pomyślnym zapisaniu pliku konfiguracyjnego możesz uruchomić serwer ZooKeeper.

Krok 2.4 - Uruchom serwer ZooKeeper

Użyj następującego polecenia, aby uruchomić serwer ZooKeeper.

$ bin/zkServer.sh start

Po wykonaniu tego polecenia otrzymasz następującą odpowiedź -

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED

Krok 2.5 - Uruchom CLI

Użyj następującego polecenia, aby uruchomić CLI.

$ bin/zkCli.sh

Po wykonaniu powyższego polecenia zostaniesz połączony z serwerem ZooKeeper i otrzymasz następującą odpowiedź.

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

Krok 2.6 - Zatrzymaj serwer ZooKeeper

Po podłączeniu serwera i wykonaniu wszystkich operacji możesz zatrzymać serwer ZooKeeper za pomocą następującego polecenia.

bin/zkServer.sh stop

Pomyślnie zainstalowałeś Javę i ZooKeepera na swoim komputerze. Zobaczmy teraz, jak zainstalować framework Apache Storm.

Krok 3 - Instalacja Apache Storm Framework

Krok 3.1 Pobierz Storm

Aby zainstalować platformę Storm na swoim komputerze, odwiedź poniższy link i pobierz najnowszą wersję Storm http://storm.apache.org/downloads.html

Obecnie najnowszą wersją Storm jest „apache-storm-0.9.5.tar.gz”.

Krok 3.2 - Wypakuj plik tar

Wyodrębnij plik tar, używając następujących poleceń -

$ cd opt/
$ tar -zxf apache-storm-0.9.5.tar.gz $ cd apache-storm-0.9.5
$ mkdir data

Krok 3.3 - Otwórz plik konfiguracyjny

Bieżąca wersja Storm zawiera plik w „conf / storm.yaml”, który konfiguruje demony Storm. Dodaj następujące informacje do tego pliku.

$ vi conf/storm.yaml
storm.zookeeper.servers:
 - "localhost"
storm.local.dir: “/path/to/storm/data(any path)”
nimbus.host: "localhost"
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703

Po zastosowaniu wszystkich zmian zapisz i wróć do terminala.

Krok 3.4 - Uruchom Nimbus

$ bin/storm nimbus

Krok 3.5 - Uruchom Supervisora

$ bin/storm supervisor

Krok 3.6 Uruchom interfejs użytkownika

$ bin/storm ui

Po uruchomieniu aplikacji interfejsu użytkownika Storm wpisz adres URL http://localhost:8080w ulubionej przeglądarce i możesz zobaczyć informacje o klastrze Storm i jego działającą topologię. Strona powinna wyglądać podobnie do poniższego zrzutu ekranu.

Zapoznaliśmy się z podstawowymi szczegółami technicznymi Apache Storm, a teraz nadszedł czas, aby zakodować kilka prostych scenariuszy.

Scenariusz - mobilny analizator dziennika połączeń

Połączenie mobilne i czas jego trwania zostaną podane jako dane wejściowe do Apache Storm, a Storm przetworzy i pogrupuje połączenie między tego samego dzwoniącego i odbierającego oraz ich całkowitą liczbę połączeń.

Tworzenie wylewki

Wylewka to element służący do generowania danych. Zasadniczo wylewka będzie implementowała interfejs IRichSpout. Interfejs „IRichSpout” ma następujące ważne metody -

  • open- Zapewnia wylewce środowisko do wykonania. Wykonawcy uruchomią tę metodę w celu zainicjowania wylewki.

  • nextTuple - Emituje wygenerowane dane za pośrednictwem kolektora.

  • close - Ta metoda jest wywoływana, gdy wylewka będzie się wyłączać.

  • declareOutputFields - Deklaruje schemat wyjściowy krotki.

  • ack - potwierdza, że ​​przetwarzana jest konkretna krotka

  • fail - określa, że ​​określona krotka nie jest przetwarzana i nie ma być ponownie przetwarzana.

otwarty

Podpis open metoda jest następująca -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Zapewnia konfigurację burzową dla tej wylewki.

  • context - Zapewnia pełne informacje o miejscu wylewki w topologii, jego identyfikatorze zadania, danych wejściowych i wyjściowych.

  • collector - Umożliwia nam emitowanie krotki, która będzie przetwarzana przez śruby.

nextTuple

Podpis nextTuple metoda jest następująca -

nextTuple()

nextTuple () jest wywoływana okresowo z tej samej pętli, co metody ACK () i Fail (). Musi zwolnić kontrolę nad wątkiem, gdy nie ma pracy do wykonania, aby inne metody miały szansę zostać wywołane. Zatem pierwsza linia nextTuple sprawdza, czy przetwarzanie zostało zakończone. Jeśli tak, powinien spać przez co najmniej jedną milisekundę, aby zmniejszyć obciążenie procesora przed powrotem.

blisko

Podpis close metoda jest następująca -

close()

deklarujOutputFields

Podpis declareOutputFields metoda jest następująca -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Służy do deklarowania identyfikatorów strumieni wyjściowych, pól wyjściowych itp.

Ta metoda służy do określania schematu wyjściowego spójnej kolekcji.

ACK

Podpis ack metoda jest następująca -

ack(Object msgId)

Ta metoda potwierdza, że ​​została przetworzona konkretna krotka.

zawieść

Podpis nextTuple metoda jest następująca -

ack(Object msgId)

Ta metoda informuje, że określona krotka nie została w pełni przetworzona. Storm ponownie przetworzy określoną krotkę.

FakeCallLogReaderSpout

W naszym scenariuszu musimy zebrać szczegóły rejestru połączeń. Zawiera informacje z rejestru połączeń.

  • numer dzwoniącego
  • numer odbiorcy
  • duration

Ponieważ nie mamy informacji o dziennikach połączeń w czasie rzeczywistym, będziemy generować fałszywe dzienniki połączeń. Fałszywe informacje zostaną utworzone za pomocą klasy Random. Pełny kod programu znajduje się poniżej.

Kodowanie - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Tworzenie śrub

Bolt to komponent, który pobiera krotki jako dane wejściowe, przetwarza krotkę i generuje nowe krotki jako dane wyjściowe. Śruby będą realizowaćIRichBoltberło. W tym programie dwie klasy śrubCallLogCreatorBolt i CallLogCounterBolt służą do wykonywania operacji.

Interfejs IRichBolt ma następujące metody -

  • prepare- Zapewnia śrubie środowisko do wykonania. Wykonawcy uruchomią tę metodę w celu zainicjowania wylewki.

  • execute - Przetwarzaj pojedynczą krotkę danych wejściowych.

  • cleanup - Wezwany, gdy śruba się wyłączy.

  • declareOutputFields - Deklaruje schemat wyjściowy krotki.

Przygotować

Podpis prepare metoda jest następująca -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Zapewnia konfigurację burzy dla tej śruby.

  • context - Zapewnia pełne informacje o położeniu śruby w topologii, identyfikatorze zadania, danych wejściowych i wyjściowych itp.

  • collector - Umożliwia nam emitowanie przetworzonej krotki.

wykonać

Podpis execute metoda jest następująca -

execute(Tuple tuple)

Tutaj tuple jest krotką wejściową do przetworzenia.

Plik executemetoda przetwarza pojedynczą krotkę naraz. Dostęp do danych krotki można uzyskać za pomocą metody getValue klasy Tuple. Nie jest konieczne natychmiastowe przetwarzanie krotki wejściowej. Wiele krotek może być przetwarzanych i wyprowadzanych jako jedna krotka wyjściowa. Przetworzoną krotkę można wyemitować przy użyciu klasy OutputCollector.

sprzątać

Podpis cleanup metoda jest następująca -

cleanup()

deklarujOutputFields

Podpis declareOutputFields metoda jest następująca -

declareOutputFields(OutputFieldsDeclarer declarer)

Tutaj parametr declarer służy do deklarowania identyfikatorów strumieni wyjściowych, pól wyjściowych itp.

Ta metoda służy do określania schematu wyjściowego spójnej kolekcji

Twórca rejestru połączeń Bolt

Bolt kreatora rejestru połączeń odbiera krotkę rejestru połączeń. Krotka rejestru połączeń zawiera numer dzwoniącego, numer odbiorcy i czas trwania połączenia. Ta śruba po prostu tworzy nową wartość, łącząc numer dzwoniącego i numer odbiorcy. Format nowej wartości to „Numer dzwoniącego - Numer odbiorcy” i nazywa się ją nowym polem „Zadzwoń”. Pełny kod podano poniżej.

Kodowanie - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Rejestr połączeń Bolt licznika

Blokada rejestru połączeń odbiera połączenie i jego czas trwania jako krotkę. Ta śruba inicjuje obiekt Dictionary (Map) w metodzie przygotowania. Wexecutesprawdza krotkę i tworzy nowy wpis w obiekcie słownika dla każdej nowej wartości „wywołania” w krotce i ustawia wartość 1 w obiekcie słownika. Dla już dostępnego wpisu w słowniku po prostu zwiększa jego wartość. Mówiąc prościej, ten rygiel zapisuje wywołanie i jego liczbę w obiekcie słownika. Zamiast zapisywać wywołanie i jego liczbę w słowniku, możemy również zapisać je w źródle danych. Pełny kod programu jest następujący -

Kodowanie - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Tworzenie topologii

Topologia Storm jest w zasadzie strukturą Thrift. Klasa TopologyBuilder udostępnia proste i łatwe metody tworzenia złożonych topologii. Klasa TopologyBuilder zawiera metody ustawiania wylewu(setSpout) i ustawić rygiel (setBolt). Wreszcie, TopologyBuilder ma createTopology do tworzenia topologii. Użyj poniższego fragmentu kodu, aby utworzyć topologię -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping i fieldsGrouping metody pomagają ustawić grupowanie strumieni dla wylewek i śrub.

Klaster lokalny

Dla celów programistycznych możemy stworzyć klaster lokalny za pomocą obiektu „LocalCluster”, a następnie przesłać topologię metodą „submitTopology” klasy „LocalCluster”. Jednym z argumentów argumentu „submitTopology” jest instancja klasy „Config”. Klasa „Config” służy do ustawiania opcji konfiguracyjnych przed przesłaniem topologii. Ta opcja konfiguracji zostanie scalona z konfiguracją klastra w czasie wykonywania i wysłana do wszystkich zadań (wylewki i śruby) metodą przygotowania. Po przesłaniu topologii do klastra będziemy czekać 10 sekund, aż klaster obliczy przesłaną topologię, a następnie zamknie klaster przy użyciu metody „shutdown” „LocalCluster”. Pełny kod programu jest następujący -

Kodowanie - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Tworzenie i uruchamianie aplikacji

Cała aplikacja zawiera cztery kody Java. Oni są -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

Aplikację można zbudować za pomocą następującego polecenia -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Aplikację można uruchomić za pomocą następującego polecenia -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Wynik

Po uruchomieniu aplikacja wyświetli szczegółowe informacje o procesie uruchamiania klastra, przetwarzaniu spout i śruby, a na końcu o procesie zamykania klastra. W „CallLogCounterBolt” wydrukowaliśmy wywołanie i jego liczbę. Te informacje zostaną wyświetlone na konsoli w następujący sposób -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Języki spoza JVM

Topologie Storm są implementowane przez interfejsy Thrift, co ułatwia przesyłanie topologii w dowolnym języku. Storm obsługuje Ruby, Python i wiele innych języków. Rzućmy okiem na powiązanie Pythona.

Python Binding

Python to interpretowany, interaktywny, zorientowany obiektowo język programowania wysokiego poziomu ogólnego przeznaczenia. Storm obsługuje Pythona w celu implementacji swojej topologii. Python obsługuje operacje emitowania, zakotwiczania, potwierdzania i rejestrowania.

Jak wiesz, śruby można definiować w dowolnym języku. Śruby napisane w innym języku są wykonywane jako podprocesy, a Storm komunikuje się z tymi podprocesami za pomocą komunikatów JSON przez stdin / stdout. Najpierw weź przykładową śrubę WordCount, która obsługuje powiązanie języka Python.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

Tutaj klasa WordCount implementuje IRichBoltinterfejs i działa z implementacją Pythona z określonym argumentem super metody "splitword.py". Teraz utwórz implementację Pythona o nazwie „splitword.py”.

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

To jest przykładowa implementacja dla Pythona, która liczy słowa w danym zdaniu. Podobnie możesz również łączyć się z innymi językami pomocniczymi.

Trident to rozszerzenie Storm. Podobnie jak Storm, Trident również został opracowany przez Twittera. Głównym powodem rozwoju Trident jest zapewnienie wysokiego poziomu abstrakcji na szczycie Storm, wraz z przetwarzaniem strumieniowym i rozproszonymi zapytaniami o niskim opóźnieniu.

Trident używa dziobka i śruby, ale te komponenty niskiego poziomu są automatycznie generowane przez Trident przed wykonaniem. Trident ma funkcje, filtry, łączenia, grupowanie i agregację.

Trident przetwarza strumienie jako serie partii, które nazywane są transakcjami. Ogólnie rzecz biorąc, wielkość tych małych partii będzie rzędu tysięcy lub milionów krotek, w zależności od strumienia wejściowego. W ten sposób Trident różni się od Storm, który przetwarza krotkę po krotce.

Koncepcja przetwarzania wsadowego jest bardzo podobna do transakcji w bazie danych. Każda transakcja ma przypisany identyfikator transakcji. Transakcja jest uważana za udaną, gdy wszystkie jej przetwarzanie zostaną zakończone. Jednak niepowodzenie w przetwarzaniu jednej z krotek transakcji spowoduje retransmisję całej transakcji. Dla każdej partii Trident wywoła na początku transakcji beginCommit i zatwierdzi na końcu.

Topologia Trident

Trident API udostępnia łatwą opcję tworzenia topologii Trident przy użyciu klasy „TridentTopology”. Zasadniczo topologia Trident odbiera strumień wejściowy z wylewki i wykonuje uporządkowaną sekwencję operacji (filtrowanie, agregacja, grupowanie itp.) Na strumieniu. Krąg burzy zostaje zastąpiony krotką z trójzębem, a śruby - operacjami. Prostą topologię Trident można utworzyć w następujący sposób -

TridentTopology topology = new TridentTopology();

Krotki trójzębne

Krotka trójzębna to nazwana lista wartości. Interfejs TridentTuple jest modelem danych topologii Trident. Interfejs TridentTuple jest podstawową jednostką danych, które mogą być przetwarzane przez topologię Trident.

Wylewka Trident

Wylewka Trident jest podobna do wylewki Storm, z dodatkowymi opcjami wykorzystania funkcji Trident. Właściwie nadal możemy korzystać z IRichSpout, którego używaliśmy w topologii Storm, ale będzie on miał charakter nietransakcyjny i nie będziemy mogli korzystać z zalet oferowanych przez Trident.

Podstawową wylewką posiadającą wszystkie funkcje potrzebne do korzystania z funkcji Trident jest „ITridentSpout”. Obsługuje zarówno transakcyjną, jak i nieprzejrzystą semantykę transakcyjną. Pozostałe wylewki to IBatchSpout, IPartitionedTridentSpout i IOpaquePartitionedTridentSpout.

Oprócz tych typowych wylewek, Trident ma wiele przykładowych realizacji wylewki Trident. Jednym z nich jest wylewka FeederBatchSpout, za pomocą której możemy łatwo wysłać nazwaną listę krotek trójzębnych, nie martwiąc się o przetwarzanie wsadowe, równoległość itp.

Tworzenie FeederBatchSpout i podawanie danych można wykonać w sposób pokazany poniżej -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Operacje Trident

Trident wykorzystuje „Operację Trident” do przetwarzania strumienia wejściowego krotek trójzębu. Trident API ma wiele wbudowanych operacji do obsługi prostego do złożonego przetwarzania strumieniowego. Operacje te obejmują zakres od prostej walidacji po złożone grupowanie i agregację krotek trójzębnych. Przejdźmy przez najważniejsze i najczęściej używane operacje.

Filtr

Filtr to obiekt używany do wykonywania zadania sprawdzania poprawności danych wejściowych. Filtr Trident pobiera podzbiór trójzębowych pól krotek jako dane wejściowe i zwraca wartość true lub false w zależności od tego, czy określone warunki są spełnione, czy nie. Jeśli zwracana jest wartość true, krotka jest przechowywana w strumieniu wyjściowym; w przeciwnym razie krotka zostanie usunięta ze strumienia. Filtr w zasadzie odziedziczy poBaseFilter klasę i zaimplementuj isKeepmetoda. Oto przykładowa implementacja operacji filtra -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Funkcję filtru można wywołać w topologii za pomocą metody „each”. Do określenia danych wejściowych można użyć klasy „Fields” (podzbiór krotki trójzębnej). Przykładowy kod jest następujący -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Funkcjonować

Functionjest obiektem używanym do wykonania prostej operacji na pojedynczej krotce trójzębu. Pobiera podzbiór pól krotek trójzębnych i emituje zero lub więcej nowych pól krotek trójzębnych.

Function zasadniczo dziedziczy z BaseFunction i implementuje executemetoda. Przykładowa implementacja jest podana poniżej -

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Podobnie jak operacja filtru, operację funkcji można wywołać w topologii przy użyciu rozszerzenia eachmetoda. Przykładowy kod jest następujący -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Zbiór

Agregacja to obiekt używany do wykonywania operacji agregacji na wejściowym wsadzie, partycji lub strumieniu. Trident ma trzy typy agregacji. Są następujące -

  • aggregate- Agreguje każdą partię krotki trójzębu oddzielnie. Podczas procesu agregacji krotki są początkowo ponownie partycjonowane przy użyciu grupowania globalnego w celu połączenia wszystkich partycji tej samej partii w jedną partycję.

  • partitionAggregate- Agreguje każdą partycję zamiast całej partii trójzębnej krotki. Dane wyjściowe agregacji partycji całkowicie zastępują krotkę wejściową. Dane wyjściowe agregatu partycji zawierają krotkę pojedynczego pola.

  • persistentaggregate - Agreguje wszystkie krotki trójzębne we wszystkich wsadach i zapisuje wynik w pamięci lub w bazie danych.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Operację agregacji można utworzyć przy użyciu CombinerAggregator, ReducerAggregator lub ogólnego interfejsu Aggregator. Agregator „count” użyty w powyższym przykładzie jest jednym z wbudowanych agregatorów. Jest zaimplementowany przy użyciu narzędzia „CombinerAggregator”. Implementacja jest następująca -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Grupowanie

Operacja grupowania jest operacją wbudowaną i może być wywoływana przez groupBymetoda. Metoda groupBy dzieli strumień na partycje, wykonując partycjonowanie na określonych polach, a następnie w ramach każdej partycji grupuje krotki, których pola grup są równe. Zwykle używamy „groupBy” razem z „persistentAggregate”, aby uzyskać zgrupowaną agregację. Przykładowy kod jest następujący -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Scalanie i łączenie

Scalanie i łączenie można wykonać odpowiednio metodą „merge” i „join”. Scalanie łączy jeden lub więcej strumieni. Łączenie jest podobne do scalania, z wyjątkiem faktu, że łączenie wykorzystuje trójzębne pole krotki z obu stron do sprawdzania i łączenia dwóch strumieni. Co więcej, łączenie będzie działać tylko na poziomie partii. Przykładowy kod jest następujący -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Utrzymanie stanu

Trident zapewnia mechanizm utrzymania stanu. Informacje o stanie mogą być przechowywane w samej topologii, w przeciwnym razie można je również przechowywać w oddzielnej bazie danych. Powodem jest utrzymanie stanu, w którym jeśli jakakolwiek krotka ulegnie awarii podczas przetwarzania, zostanie ponowiona próba wykonania błędnej krotki. Stwarza to problem podczas aktualizowania stanu, ponieważ nie masz pewności, czy stan tej krotki został zaktualizowany wcześniej, czy nie. Jeśli krotka nie powiodła się przed zaktualizowaniem stanu, ponowienie próby spowoduje, że stan będzie stabilny. Jeśli jednak krotka zakończyła się niepowodzeniem po zaktualizowaniu stanu, ponowna próba wykonania tej samej krotki ponownie zwiększy liczbę w bazie danych i spowoduje niestabilność stanu. Aby wiadomość została przetworzona tylko raz, należy wykonać następujące kroki -

  • Przetwarzaj krotki w małych partiach.

  • Przypisz unikalny identyfikator do każdej partii. Jeśli partia zostanie ponowiona, otrzyma ten sam unikalny identyfikator.

  • Aktualizacje stanu są uporządkowane w partiach. Na przykład aktualizacja stanu drugiej partii nie będzie możliwa do czasu zakończenia aktualizacji stanu dla pierwszej partii.

Rozproszone RPC

Rozproszone RPC służy do wykonywania zapytań i pobierania wyników z topologii Trident. Storm ma wbudowany rozproszony serwer RPC. Rozproszony serwer RPC odbiera żądanie RPC od klienta i przekazuje je do topologii. Topologia przetwarza żądanie i wysyła wynik do rozproszonego serwera RPC, który jest przekierowywany przez rozproszony serwer RPC do klienta. Rozproszone zapytanie RPC Trident jest wykonywane jak zwykłe zapytanie RPC, z wyjątkiem faktu, że zapytania te są wykonywane równolegle.

Kiedy używać Trident?

Podobnie jak w wielu przypadkach użycia, jeśli wymaganiem jest przetworzenie zapytania tylko raz, możemy to osiągnąć, pisząc topologię w Trident. Z drugiej strony w przypadku Storma trudno będzie uzyskać dokładnie raz przetworzenie. Dlatego Trident będzie przydatny w tych przypadkach użycia, w których potrzebujesz dokładnie raz przetworzyć. Trident nie jest przeznaczony dla wszystkich przypadków użycia, zwłaszcza przypadków użycia o wysokiej wydajności, ponieważ zwiększa złożoność Storm i zarządza stanem.

Roboczy przykład Trident

Zamierzamy przekonwertować naszą aplikację do analizy dzienników połączeń opracowaną w poprzedniej sekcji do frameworka Trident. Aplikacja Trident będzie stosunkowo łatwa w porównaniu do zwykłej burzy, dzięki wysokopoziomowemu API. Storm będzie zasadniczo wymagany do wykonania dowolnej z operacji Funkcji, Filtruj, Agreguj, Grupuj według, Połącz i Połącz w Trident. Na koniec uruchomimy serwer DRPC przy użyciu rozszerzeniaLocalDRPC class i wyszukaj słowo kluczowe za pomocą execute metoda klasy LocalDRPC.

Formatowanie informacji o połączeniu

Zadaniem klasy FormatCall jest formatowanie informacji o połączeniu zawierającej „Numer dzwoniącego” i „Numer odbiorcy”. Pełny kod programu jest następujący -

Kodowanie: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

Celem klasy CSVSplit jest podzielenie ciągu wejściowego na podstawie „przecinka (,)” i wyemitowanie każdego słowa w ciągu. Ta funkcja jest używana do analizowania argumentu wejściowego zapytań rozproszonych. Kompletny kod wygląda następująco -

Kodowanie: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Analizator logów

To jest główna aplikacja. Początkowo aplikacja zainicjuje TridentTopology i przekaże informacje o dzwoniącym przy użyciuFeederBatchSpout. Strumień topologii Trident można utworzyć za pomocąnewStreammetoda klasy TridentTopology. Podobnie, strumień DRPC topologii Trident można utworzyć przy użyciunewDRCPStreammetoda klasy TridentTopology. Prosty serwer DRCP można utworzyć za pomocą klasy LocalDRPC.LocalDRPCma metodę execute, aby wyszukać jakieś słowo kluczowe. Pełny kod podano poniżej.

Kodowanie: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Tworzenie i uruchamianie aplikacji

Cała aplikacja ma trzy kody Java. Są następujące -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

Aplikację można zbudować za pomocą następującego polecenia -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Aplikację można uruchomić za pomocą następującego polecenia -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Wynik

Po uruchomieniu aplikacji aplikacja wyświetli szczegółowe informacje o procesie uruchamiania klastra, przetwarzaniu operacji, serwerze DRPC i kliencie, a na końcu o procesie zamykania klastra. Te dane wyjściowe zostaną wyświetlone na konsoli, jak pokazano poniżej.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends

W tym rozdziale omówimy zastosowanie Apache Storm w czasie rzeczywistym. Zobaczymy, jak Storm jest używany na Twitterze.

Świergot

Twitter to serwis społecznościowy online, który zapewnia platformę do wysyłania i odbierania tweetów użytkowników. Zarejestrowani użytkownicy mogą czytać i wysyłać tweety, ale niezarejestrowani użytkownicy mogą czytać tylko tweety. Hashtag służy do kategoryzowania tweetów według słów kluczowych poprzez dodanie znaku # przed odpowiednim słowem kluczowym. Teraz przyjrzyjmy się scenariuszowi w czasie rzeczywistym znajdowania najczęściej używanego hashtagu na temat.

Tworzenie wylewki

Celem spouta jest jak najszybsze otrzymywanie tweetów przesłanych przez ludzi. Twitter zapewnia „Twitter Streaming API”, narzędzie oparte na usłudze internetowej do pobierania tweetów przesłanych przez ludzi w czasie rzeczywistym. Dostęp do interfejsu API przesyłania strumieniowego na Twitterze można uzyskać w dowolnym języku programowania.

twitter4j to nieoficjalna biblioteka Java o otwartym kodzie źródłowym, która zapewnia oparty na Javie moduł umożliwiający łatwy dostęp do interfejsu API przesyłania strumieniowego na Twitterze. twitter4judostępnia strukturę opartą na odbiornikach umożliwiającą dostęp do tweetów. Aby uzyskać dostęp do Twitter Streaming API, musimy zalogować się na konto programisty Twittera i powinniśmy uzyskać następujące dane uwierzytelniania OAuth.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Storm zapewnia wylewkę twittera, TwitterSampleSpout,w zestawie startowym. Będziemy go używać do pobierania tweetów. Dziobek wymaga szczegółów uwierzytelniania OAuth i przynajmniej słowa kluczowego. Dziobek będzie emitował tweety w czasie rzeczywistym na podstawie słów kluczowych. Pełny kod programu znajduje się poniżej.

Kodowanie: TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }
		
   public TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }
					
            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}
					
            @Override
            public void onTrackLimitationNotice(int i) {}
					
            @Override
            public void onScrubGeo(long l, long l1) {}
					
            @Override
            public void onException(Exception ex) {}
					
            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = new ConfigurationBuilder();
				
         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);
				
         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   public void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }
			
   @Override
   public void close() {
      _twitterStream.shutdown();
   }
			
   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }
			
   @Override
   public void ack(Object id) {}
			
   @Override
   public void fail(Object id) {}
			
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

Hashtag Reader Bolt

Tweet wyemitowany przez dziobek zostanie przekazany do HashtagReaderBolt, który przetworzy tweet i wyemituje wszystkie dostępne hashtagi. HashtagReaderBolt używagetHashTagEntitiesmetoda dostarczona przez twitter4j. getHashTagEntities odczytuje tweet i zwraca listę hashtagów. Pełny kod programu jest następujący -

Kodowanie: HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Hashtag Counter Bolt

Wyemitowany hashtag zostanie przekazany do HashtagCounterBolt. Ten rygiel przetworzy wszystkie hashtagi i zapisze każdy hashtag i jego liczbę w pamięci za pomocą obiektu Java Map. Pełny kod programu znajduje się poniżej.

Kodowanie: HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Przesyłanie topologii

Główną aplikacją jest przesłanie topologii. Topologia Twittera składa się zTwitterSampleSpout, HashtagReaderBolt, i HashtagCounterBolt. Poniższy kod programu pokazuje, jak przesłać topologię.

Kodowanie: TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Tworzenie i uruchamianie aplikacji

Cała aplikacja zawiera cztery kody Java. Są następujące -

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

Możesz skompilować aplikację za pomocą następującego polecenia -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

Uruchom aplikację za pomocą następujących poleceń -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

Wynik

Aplikacja wydrukuje aktualny dostępny hashtag i jego liczbę. Wynik powinien być podobny do następującego -

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1

Wieśniak! Finanse to wiodąca witryna internetowa zawierająca wiadomości biznesowe i dane finansowe. Jest częścią Yahoo! i zawiera informacje o nowościach finansowych, statystykach rynkowych, danych z rynków międzynarodowych i inne informacje o zasobach finansowych, do których każdy ma dostęp.

Jeśli jesteś zarejestrowanym użytkownikiem Yahoo! użytkownika, możesz dostosować Yahoo! Finanse, aby skorzystać z niektórych ofert. Wieśniak! Finance API służy do wykonywania zapytań dotyczących danych finansowych z Yahoo!

Ten interfejs API wyświetla dane opóźnione o 15 minut od czasu rzeczywistego i aktualizuje swoją bazę danych co 1 minutę, aby uzyskać dostęp do aktualnych informacji dotyczących zapasów. Przyjrzyjmy się teraz scenariuszowi firmy w czasie rzeczywistym i zobaczmy, jak zgłosić alert, gdy jej wartość akcji spadnie poniżej 100.

Tworzenie wylewki

Zadaniem wylewki jest poznanie danych firmy i przekazanie ceny do śrub. Możesz użyć następującego kodu programu, aby utworzyć wylewkę.

Kodowanie: YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
   private SpoutOutputCollector collector;
   private boolean completed = false;
   private TopologyContext context;
	
   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      try {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this.collector.emit(new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("company", "price"));
   }

   @Override
   public void close() {}
	
   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Tworzenie śrub

Tutaj celem bolt jest przetworzenie cen danej firmy, gdy ceny spadną poniżej 100. Używa obiektu Java Map do ustawienia ostrzeżenia o granicznej cenie jako truegdy ceny akcji spadną poniżej 100; inaczej fałszywe. Pełny kod programu jest następujący -

Kodowanie: PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;
	
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.cutOffMap = new HashMap <String, Integer>();
      this.cutOffMap.put("INTC", 100);
      this.cutOffMap.put("AAPL", 100);
      this.cutOffMap.put("GOOGL", 100);

      this.resultMap = new HashMap<String, Boolean>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this.resultMap.put(company, true);
         } else {
            this.resultMap.put(company, false);
         }
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("cut_off_price"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Przesyłanie topologii

Jest to główna aplikacja, w której YahooFinanceSpout.java i PriceCutOffBolt.java są ze sobą połączone i tworzą topologię. Poniższy kod programu przedstawia, w jaki sposób można przesłać topologię.

Kodowanie: YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Tworzenie i uruchamianie aplikacji

Cała aplikacja ma trzy kody Java. Są następujące -

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

Aplikację można zbudować za pomocą następującego polecenia -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

Aplikację można uruchomić za pomocą następującego polecenia -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

Wynik

Wynik będzie podobny do następującego -

GOOGL : false
AAPL : false
INTC : true

Framework Apache Storm obsługuje wiele z najlepszych współczesnych aplikacji przemysłowych. W tym rozdziale przedstawimy bardzo krótki przegląd niektórych z najważniejszych zastosowań Storm.

Klout

Klout to aplikacja, która wykorzystuje analitykę mediów społecznościowych do oceniania swoich użytkowników w oparciu o wpływy społeczne online Klout Score, która jest wartością liczbową od 1 do 100. Klout używa wbudowanej abstrakcji Trident Apache Storm do tworzenia złożonych topologii, które przesyłają strumieniowo dane.

Kanał pogodowy

Weather Channel wykorzystuje topologie Storm do pozyskiwania danych pogodowych. Połączył się z Twitterem, aby umożliwić wyświetlanie reklam opartych na pogodzie na Twitterze i w aplikacjach mobilnych.OpenSignal to firma specjalizująca się w mapowaniu zasięgu sieci bezprzewodowej. StormTag i WeatherSignalto projekty oparte na pogodzie stworzone przez OpenSignal. StormTag to stacja pogodowa Bluetooth podłączana do pęku kluczy. Dane pogodowe zebrane przez urządzenie są przesyłane do aplikacji WeatherSignal i serwerów OpenSignal.

Przemysł telekomunikacyjny

Dostawcy usług telekomunikacyjnych obsługują miliony połączeń telefonicznych na sekundę. Przeprowadzają badania kryminalistyczne dotyczące zerwanych połączeń i słabej jakości dźwięku. Rekordy szczegółów połączeń napływają z szybkością milionów na sekundę, a Apache Storm przetwarza je w czasie rzeczywistym i identyfikuje wszelkie niepokojące wzorce. Analiza burzy może służyć do ciągłej poprawy jakości połączeń.