Apache Storm - Hızlı Kılavuz

Apache Storm nedir?

Apache Storm, dağıtılmış gerçek zamanlı bir büyük veri işleme sistemidir. Storm, büyük miktarda veriyi hataya dayanıklı ve yatay ölçeklenebilir bir yöntemle işlemek için tasarlanmıştır. En yüksek alım oranlarına sahip olan bir akış veri çerçevesidir. Storm devletsiz olsa da, dağıtılmış ortamı ve küme durumunu Apache ZooKeeper aracılığıyla yönetir. Bu basittir ve gerçek zamanlı veriler üzerinde her türlü değişikliği paralel olarak gerçekleştirebilirsiniz.

Apache Storm, gerçek zamanlı veri analitiğinde lider olmaya devam ediyor. Storm'un kurulumu, çalıştırılması kolaydır ve her mesajın en az bir kez topoloji aracılığıyla işleneceğini garanti eder.

Apache Storm ve Hadoop

Temel olarak Hadoop ve Storm çerçeveleri, büyük verileri analiz etmek için kullanılır. İkisi de birbirini tamamlar ve bazı açılardan farklılık gösterir. Apache Storm, kalıcılık dışındaki tüm işlemleri gerçekleştirirken, Hadoop her şeyde iyidir ancak gerçek zamanlı hesaplamada gecikir. Aşağıdaki tablo Storm ve Hadoop'un özelliklerini karşılaştırmaktadır.

Fırtına Hadoop
Gerçek zamanlı akış işleme Toplu işlem
Vatansız Durum bilgili
ZooKeeper tabanlı koordinasyon ile Master / Slave mimarisi. Ana düğüm olarak adlandırılırnimbus ve köleler supervisors. ZooKeeper tabanlı koordinasyon ile / olmadan master-slave mimarisi. Ana düğümjob tracker ve köle düğümü task tracker.
Bir Storm akış işlemi, kümede saniyede on binlerce mesaja erişebilir. Hadoop Dağıtılmış Dosya Sistemi (HDFS), dakikalar veya saatler süren büyük miktarda veriyi işlemek için MapReduce çerçevesini kullanır.
Fırtına topolojisi, kullanıcı tarafından kapatılıncaya veya beklenmeyen, kurtarılamayan bir arızaya kadar çalışır. MapReduce işleri sırayla yürütülür ve sonunda tamamlanır.
Both are distributed and fault-tolerant
Eğer nimbus / süpervizör ölürse, yeniden başlatma, durduğu yerden devam etmesini sağlar, dolayısıyla hiçbir şey etkilenmez. JobTracker ölürse, tüm çalışan işler kaybolur.

Apache Storm Kullanım Durumları

Apache Storm, gerçek zamanlı büyük veri akışı işlemesiyle çok ünlüdür. Bu nedenle çoğu şirket Storm'u sistemlerinin ayrılmaz bir parçası olarak kullanıyor. Bazı önemli örnekler aşağıdaki gibidir -

Twitter- Twitter, "Yayıncı Analizi ürünleri" yelpazesi için Apache Storm kullanıyor. "Yayıncı Analiz Ürünleri", Twitter Platformundaki her bir tweet'i ve tıklamayı işler. Apache Storm, Twitter altyapısıyla derinlemesine entegredir.

NaviSite- NaviSite, Olay günlüğü izleme / denetleme sistemi için Storm kullanıyor. Sistemde oluşturulan her günlük Fırtınadan geçecek. Storm, mesajı yapılandırılmış normal ifade kümesine göre kontrol edecek ve bir eşleşme varsa, bu belirli mesaj veritabanına kaydedilecektir.

Wego- Wego, Singapur'da bulunan bir seyahat meta arama motorudur. Seyahatle ilgili veriler, dünyanın her yerinden farklı zamanlamalara sahip birçok kaynaktan gelir. Storm, Wego'nun gerçek zamanlı verileri aramasına, eşzamanlılık sorunlarını çözmesine ve son kullanıcı için en iyi eşleşmeyi bulmasına yardımcı olur.

Apache Storm Faydaları

İşte Apache Storm'un sunduğu avantajların bir listesi -

  • Storm açık kaynak kodlu, sağlam ve kullanıcı dostudur. Küçük şirketlerde olduğu kadar büyük şirketlerde de kullanılabilir.

  • Storm hataya dayanıklı, esnek, güvenilirdir ve herhangi bir programlama dilini destekler.

  • Gerçek zamanlı akış işlemeye izin verir.

  • Fırtına inanılmaz derecede hızlı çünkü veriyi işlemede muazzam bir güce sahip.

  • Fırtına, kaynakları doğrusal olarak ekleyerek artan yük altında bile performansı artırabilir. Oldukça ölçeklenebilir.

  • Storm veri yenilemesini gerçekleştirir ve uçtan uca teslim yanıtı saniyeler veya dakikalar içinde soruna bağlıdır. Çok düşük gecikme süresine sahiptir.

  • Storm'un operasyonel zekası var.

  • Storm, kümedeki bağlı düğümlerden herhangi biri ölse veya mesajlar kaybolsa bile garantili veri işleme sağlar.

Apache Storm, gerçek zamanlı verilerin ham akışını bir uçtan okur ve bunu bir dizi küçük işlem biriminden geçirir ve diğer uçta işlenmiş / yararlı bilgileri verir.

Aşağıdaki diyagram, Apache Storm'un temel konseptini tasvir etmektedir.

Şimdi Apache Storm'un bileşenlerine daha yakından bakalım -

Bileşenler Açıklama
Tuple Tuple, Storm'daki ana veri yapısıdır. Sıralı elemanların bir listesidir. Varsayılan olarak, bir Tuple tüm veri türlerini destekler. Genellikle, virgülle ayrılmış değerler kümesi olarak modellenir ve bir Storm kümesine aktarılır.
Akış Akış, sıralanmamış bir demet dizisidir.
Musluklar Akışın kaynağı. Storm genel olarak Twitter Akış API'si, Apache Kafka kuyruğu, Kestrel kuyruğu vb. Gibi ham veri kaynaklarından giriş verilerini kabul eder. Aksi takdirde veri kaynaklarından veri okumak için spout'lar yazabilirsiniz. "ISpout", ağları uygulamak için temel arabirimdir. Belirli arabirimlerden bazıları IRichSpout, BaseRichSpout, KafkaSpout vb.
Cıvatalar Cıvatalar mantıksal işlem birimleridir. Borular, verileri cıvatalara ve cıvatalara aktarır ve yeni bir çıktı akışı üretir. Cıvatalar, veri kaynakları ve veritabanları ile filtreleme, toplama, birleştirme, etkileşimde bulunma işlemlerini gerçekleştirebilir. Bolt verileri alır ve bir veya daha fazla cıvataya yayar. "IBolt", cıvataların uygulanması için temel arayüzdür. Ortak arayüzlerden bazıları IRichBolt, IBasicBolt, vs.'dir.

Gerçek zamanlı bir "Twitter Analizi" örneğini ele alalım ve Apache Storm'da nasıl modellenebileceğini görelim. Aşağıdaki şema yapıyı göstermektedir.

"Twitter Analizi" girdisi, Twitter Akış API'sinden gelir. Spout, Twitter Streaming API kullanan kullanıcıların tweet'lerini okuyacak ve bir demet akışı olarak çıktı alacaktır. Musluktan tek bir demet, virgülle ayrılmış değerler olarak bir twitter kullanıcı adına ve tek bir tweet'e sahip olacaktır. Ardından, bu tuple buharı Bolt'a iletilecek ve Bolt, tweet'i ayrı kelimeye bölecek, kelime sayısını hesaplayacak ve bilgileri yapılandırılmış bir veri kaynağında saklayacaktır. Artık veri kaynağını sorgulayarak sonucu kolayca alabiliyoruz.

Topoloji

Musluklar ve cıvatalar birbirine bağlanır ve bir topoloji oluştururlar. Gerçek zamanlı uygulama mantığı, Storm topolojisinde belirtilir. Basit bir deyişle, bir topoloji, köşelerin hesaplama ve kenarların veri akışı olduğu yönlendirilmiş bir grafiktir.

Basit bir topoloji musluklarla başlar. Spout, verileri bir veya daha fazla cıvataya yayar. Cıvata, topolojide en küçük işleme mantığına sahip bir düğümü temsil eder ve bir cıvatanın çıkışı, girdi olarak başka bir cıvataya gönderilebilir.

Fırtına, siz topolojiyi yok edene kadar topolojiyi sürekli çalışır durumda tutar. Apache Storm'un ana görevi topolojiyi çalıştırmaktır ve belirli bir zamanda herhangi bir sayıda topolojiyi çalıştıracaktır.

Görevler

Artık musluklar ve cıvatalar hakkında temel bir fikriniz var. Topolojinin en küçük mantıksal birimleridir ve tek bir ağız ve bir dizi cıvata kullanılarak bir topoloji oluşturulur. Topolojinin başarılı bir şekilde çalışması için belirli bir sırada düzgün bir şekilde yürütülmeleri gerekir. Her bir musluğun ve cıvatanın Storm tarafından yürütülmesine "Görevler" denir. Basit bir deyişle, bir görev, bir musluğun veya bir cıvatanın yürütülmesidir. Belirli bir zamanda, her ağız ve cıvata, birden çok ayrı dişte çalışan birden çok örneğe sahip olabilir.

İşçiler

Bir topoloji, birden çok çalışan düğümde dağıtılmış bir şekilde çalışır. Storm, görevleri tüm çalışan düğümlerine eşit olarak yayar. Çalışan düğümünün rolü, işleri dinlemek ve yeni bir iş geldiğinde işlemleri başlatmak veya durdurmaktır.

Akış Gruplaması

Veri akışı, ağızlıklardan cıvatalara veya bir cıvatadan başka bir cıvataya akar. Akış gruplama, tupleların topolojide nasıl yönlendirildiğini kontrol eder ve topolojideki tuple akışını anlamamıza yardımcı olur. Aşağıda açıklandığı gibi dört yerleşik grup vardır.

Karışık Gruplama

Karışık gruplamada, eşit sayıda tuple, cıvataları çalıştıran tüm çalışanlara rastgele dağıtılır. Aşağıdaki şema yapıyı göstermektedir.

Alan Gruplaması

Demetlerdeki aynı değerlere sahip alanlar birlikte gruplanır ve kalan demetler dışarıda tutulur. Daha sonra, aynı alan değerlerine sahip demetler, cıvataları uygulayan aynı işçiye iletilir. Örneğin, akış "kelime" alanına göre gruplandırılmışsa, aynı dizeye sahip tuples, "Merhaba" aynı işçiye taşınacaktır. Aşağıdaki diyagram, Alan Gruplamasının nasıl çalıştığını gösterir.

Küresel Gruplama

Tüm akışlar gruplanabilir ve tek bir cıvataya iletilebilir. Bu gruplama, kaynağın tüm örnekleri tarafından oluşturulan demetleri tek bir hedef örneğe gönderir (özellikle, en düşük kimliğe sahip çalışanı seçin).

Tüm Gruplamalar

Tüm Gruplama, her demetin tek bir kopyasını, alıcı cıvatanın tüm örneklerine gönderir. Bu tür bir gruplama, cıvatalara sinyal göndermek için kullanılır. Tüm gruplama, birleştirme işlemleri için kullanışlıdır.

Apache Storm'un en önemli özelliklerinden biri, hataya dayanıklı olması ve "Tek Nokta Arıza" (SPOF) dağıtılmış uygulaması olmadan hızlı olmasıdır. Apache Storm'u uygulamanın kapasitesini artırmak için gerektiği kadar sisteme kurabiliriz.

Apache Storm kümesinin nasıl tasarlandığına ve iç mimarisine bir göz atalım. Aşağıdaki şema küme tasarımını göstermektedir.

Apache Storm'un iki tür düğümü vardır, Nimbus (ana düğüm) ve Supervisor(çalışan düğümü). Nimbus, Apache Storm'un ana bileşenidir. Nimbus'ın ana görevi Storm topolojisini çalıştırmaktır. Nimbus, topolojiyi analiz eder ve yürütülecek görevi toplar. Ardından, görevi uygun bir amirine dağıtır.

Bir süpervizörün bir veya daha fazla çalışan süreci olacaktır. Süpervizör, görevleri işçi süreçlerine devredecektir. İşçi süreci gerektiği kadar uygulayıcı üretecek ve görevi çalıştıracaktır. Apache Storm, nimbus ve denetleyiciler arasındaki iletişim için dahili bir dağıtılmış mesajlaşma sistemi kullanır.

Bileşenler Açıklama
Nimbus Nimbus, Storm kümesinin ana düğümüdür. Kümedeki diğer tüm düğümler şöyle adlandırılırworker nodes. Ana düğüm, verilerin tüm çalışan düğümler arasında dağıtılmasından, çalışan düğümlerine görevlerin atanmasından ve arızaların izlenmesinden sorumludur.
Süpervizör Nimbus tarafından verilen talimatları izleyen düğümler, Süpervizörler olarak adlandırılır. Birsupervisor birden çok çalışan sürecine sahiptir ve nimbus tarafından atanan görevleri tamamlamak için çalışan işlemlerini yönetir.
İşçi süreci Bir çalışan süreç, belirli bir topolojiyle ilgili görevleri yürütür. Bir çalışan süreç bir görevi kendi başına çalıştırmaz, onun yerine oluştururexecutorsve onlardan belirli bir görevi yerine getirmelerini ister. Bir çalışan işlemin birden fazla yürütücüsü olacaktır.
Cellat Bir yürütücü, bir işçi süreci tarafından ortaya çıkan tek bir iş parçacığından başka bir şey değildir. Bir uygulayıcı, bir veya daha fazla görevi çalıştırır, ancak yalnızca belirli bir ağız veya cıvata için.
Görev Bir görev, gerçek veri işlemeyi gerçekleştirir. Yani, bir ağızlık veya bir cıvata.
ZooKeeper çerçevesi

Apache ZooKeeper, bir küme (düğüm grubu) tarafından kendi aralarında koordinasyon sağlamak ve paylaşılan verileri sağlam senkronizasyon teknikleriyle sürdürmek için kullanılan bir hizmettir. Nimbus durum bilgisizdir, bu nedenle çalışan düğüm durumunu izlemesi ZooKeeper'a bağlıdır.

ZooKeeper, süpervizörün nimbus ile etkileşime girmesine yardımcı olur. Nimbus ve süpervizörün durumunu korumaktan sorumludur.

Fırtına, doğası gereği vatansızdır. Vatansız doğanın kendi dezavantajları olsa da, aslında Storm'un gerçek zamanlı verileri mümkün olan en iyi ve en hızlı şekilde işlemesine yardımcı oluyor.

Yine de fırtına tamamen devletsiz değil . Durumunu Apache ZooKeeper'da saklar. Durum Apache ZooKeeper'da mevcut olduğundan, arızalı bir nimbus yeniden başlatılabilir ve kaldığı yerden çalışması sağlanabilir. Genellikle, aşağıdaki gibi hizmet izleme araçlarımonit Nimbus'ı izleyecek ve herhangi bir arıza varsa onu yeniden başlatacaktır.

Apache Storm ayrıca gelişmiş bir topolojiye sahiptir: Trident Topologydurum bakımı ile ve ayrıca Pig gibi yüksek seviyeli bir API sağlar. Tüm bu özellikleri ilerleyen bölümlerde tartışacağız.

Çalışan bir Storm kümesinde bir nimbus ve bir veya daha fazla denetçi bulunmalıdır. Bir diğer önemli düğüm, nimbus ve denetleyiciler arasındaki koordinasyon için kullanılacak olan Apache ZooKeeper'dır.

Şimdi Apache Storm'un iş akışına yakından bakalım -

  • Başlangıçta nimbus, "Fırtına Topolojisi" nin kendisine sunulmasını bekleyecektir.

  • Bir topoloji gönderildikten sonra, topolojiyi işleyecek ve gerçekleştirilecek tüm görevleri ve görevin yürütüleceği sırayı toplayacaktır.

  • Daha sonra, nimbus görevleri mevcut tüm süpervizörlere eşit olarak dağıtacaktır.

  • Belirli bir zaman aralığında, tüm denetleyiciler hala hayatta olduklarını bildirmek için nimbus'a kalp atışları göndereceklerdir.

  • Bir süpervizör öldüğünde ve nimbus'a bir kalp atışı göndermediğinde, nimbus görevleri başka bir süpervizöre atar.

  • Nimbus öldüğünde, süpervizörler herhangi bir sorun olmadan önceden atanmış görev üzerinde çalışacaklardır.

  • Tüm görevler tamamlandığında, süpervizör yeni bir görevin gelmesini bekleyecektir.

  • Bu arada, ölü nimbus servis izleme araçları tarafından otomatik olarak yeniden başlatılacaktır.

  • Yeniden başlatılan nimbus durduğu yerden devam edecektir. Benzer şekilde, ölü gözetmen de otomatik olarak yeniden başlatılabilir. Hem nimbus hem de süpervizör otomatik olarak yeniden başlatılabildiğinden ve her ikisi de eskisi gibi devam edeceğinden, Storm'un tüm görevi en az bir kez işlemesi garanti edilir.

  • Tüm topolojiler işlendikten sonra, nimbus yeni bir topolojinin gelmesini bekler ve benzer şekilde süpervizör yeni görevler için bekler.

Varsayılan olarak, bir Fırtına kümesinde iki mod vardır -

  • Local mode- Bu mod geliştirme, test etme ve hata ayıklama için kullanılır çünkü birlikte çalışan tüm topoloji bileşenlerini görmenin en kolay yolu budur. Bu modda, topolojimizin farklı Storm yapılandırma ortamlarında nasıl çalıştığını görmemizi sağlayan parametreleri ayarlayabiliriz. Yerel modda, fırtına topolojileri yerel makinede tek bir JVM'de çalışır.

  • Production mode- Bu modda topolojimizi, genellikle farklı makinelerde çalışan birçok işlemden oluşan çalışma fırtınası kümesine sunuyoruz. Fırtınanın iş akışında tartışıldığı gibi, çalışan bir küme kapatılana kadar süresiz olarak çalışacaktır.

Apache Storm gerçek zamanlı verileri işler ve girdi normalde bir ileti sıralama sisteminden gelir. Harici bir dağıtılmış mesajlaşma sistemi, gerçek zamanlı hesaplama için gerekli girdiyi sağlayacaktır. Spout, mesajlaşma sistemindeki verileri okuyacak ve bunları demetlere dönüştürüp Apache Storm'a girecektir. İlginç gerçek, Apache Storm'un kendi nimbus ve yöneticisi arasındaki iletişim için dahili olarak kendi dağıtılmış mesajlaşma sistemini kullanmasıdır.

Dağıtılmış Mesajlaşma Sistemi nedir?

Dağıtılmış mesajlaşma, güvenilir mesaj kuyruğu kavramına dayanır. Mesajlar, istemci uygulamaları ve mesajlaşma sistemleri arasında eşzamansız olarak sıraya alınır. Dağıtılmış bir mesajlaşma sistemi, güvenilirlik, ölçeklenebilirlik ve kalıcılığın avantajlarını sağlar.

Mesajlaşma modellerinin çoğu, publish-subscribe model (basitçe Pub-Sub) mesajları gönderenlerin arandığı yer publishers ve mesajları almak isteyenler aranır subscribers.

Mesaj gönderen tarafından yayınlandıktan sonra, aboneler bir filtreleme seçeneği yardımıyla seçilen mesajı alabilirler. Genellikle iki tür filtrelememiz vardır, biritopic-based filtering ve diğeri content-based filtering.

Pub-sub modelinin yalnızca mesajlar aracılığıyla iletişim kurabildiğini unutmayın. Çok gevşek bağlı bir mimaridir; gönderenler bile abonelerinin kim olduğunu bilmiyor. Mesaj modellerinin çoğu, birçok abonenin zamanında erişmesi için mesaj aracısının yayın mesajlarını değiş tokuş etmesini sağlar. Gerçek hayattan bir örnek, spor, film, müzik vb. Gibi farklı kanalları yayınlayan Dish TV'dir ve herkes kendi kanallarına abone olabilir ve abone olduğu kanalları kullanılabilir olduğunda bunları alabilir.

Aşağıdaki tablo, popüler yüksek verimli mesajlaşma sistemlerinden bazılarını açıklamaktadır -

Dağıtılmış mesajlaşma sistemi Açıklama
Apache Kafka Kafka LinkedIn şirketinde geliştirildi ve daha sonra Apache'nin bir alt projesi oldu. Apache Kafka, aracı tarafından etkinleştirilen, kalıcı, dağıtılmış yayınlama-abone olma modeline dayanmaktadır. Kafka hızlı, ölçeklenebilir ve oldukça verimlidir.
RabbitMQ RabbitMQ, açık kaynak kodlu, dağıtılmış sağlam bir mesajlaşma uygulamasıdır. Tüm platformlarda kullanımı kolaydır ve çalışır.
JMS (Java Mesaj Servisi) JMS, bir uygulamadan diğerine mesaj oluşturmayı, okumayı ve göndermeyi destekleyen açık kaynaklı bir API'dir. Garantili mesaj teslimi sağlar ve yayınlama-abone olma modelini takip eder.
ActiveMQ ActiveMQ mesajlaşma sistemi, JMS'nin açık kaynaklı bir API'sidir.
ZeroMQ ZeroMQ aracısız eşler arası mesaj işlemedir. Push-pull, yönlendirici-bayi mesaj modelleri sağlar.
Kerkenez Kestrel, hızlı, güvenilir ve basit bir dağıtılmış mesaj kuyruğudur.

Tasarruf Protokolü

Thrift, diller arası hizmet geliştirme ve uzaktan yordam çağrısı (RPC) için Facebook'ta oluşturuldu. Daha sonra açık kaynaklı bir Apache projesi haline geldi. Apache Thrift birInterface Definition Language tanımlanmış veri türleri üzerine yeni veri türleri ve hizmet uygulamalarının kolay bir şekilde tanımlanmasına izin verir.

Apache Thrift ayrıca gömülü sistemleri, mobil uygulamaları, web uygulamalarını ve diğer birçok programlama dilini destekleyen bir iletişim çerçevesidir. Apache Thrift ile ilişkili temel özelliklerden bazıları modülerliği, esnekliği ve yüksek performansıdır. Ek olarak, dağıtılmış uygulamalarda akış, mesajlaşma ve RPC gerçekleştirebilir.

Storm, dahili iletişimi ve veri tanımı için kapsamlı bir şekilde Thrift Protokolünü kullanır. Fırtına topolojisi basitçeThrift Structs. Apache Storm'da topolojiyi çalıştıran Storm Nimbus,Thrift service.

Şimdi makinenize Apache Storm çerçevesinin nasıl kurulacağını görelim. Burada üç majo adım var -

  • Zaten sahip değilseniz, sisteminize Java yükleyin.
  • ZooKeeper çerçevesini yükleyin.
  • Apache Storm çerçevesini yükleyin.

Adım 1 - Java Kurulumunu Doğrulama

Java'nın sisteminizde zaten yüklü olup olmadığını kontrol etmek için aşağıdaki komutu kullanın.

$ java -version

Java zaten oradaysa, sürüm numarasını görürsünüz. Aksi takdirde, JDK'nın en son sürümünü indirin.

Adım 1.1 - JDK'yı İndirin

Aşağıdaki bağlantıyı kullanarak JDK'nın en son sürümünü indirin - www.oracle.com

En son sürüm JDK 8u 60'tır ve dosya “jdk-8u60-linux-x64.tar.gz”. Dosyayı makinenize indirin.

Adım 1.2 - Dosyaları çıkarın

Genellikle dosyalar downloadsKlasör. Aşağıdaki komutları kullanarak tar kurulumunu çıkarın.

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

Adım 1.3 - Tercihler dizinine gidin

Java'yı tüm kullanıcılar için kullanılabilir hale getirmek için, çıkarılan java içeriğini "/ usr / local / java" klasörüne taşıyın.

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

Adım 1.4 - Yolu ayarlayın

Yol ve JAVA_HOME değişkenlerini ayarlamak için aşağıdaki komutları ~ / .bashrc dosyasına ekleyin.

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

Şimdi tüm değişiklikleri mevcut çalışan sisteme uygulayın.

$ source ~/.bashrc

Adım 1.5 - Java Alternatifleri

Java alternatiflerini değiştirmek için aşağıdaki komutu kullanın.

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Adım 1.6

Şimdi doğrulama komutunu kullanarak Java kurulumunu doğrulayın (java -version) Adım 1'de açıklanmıştır.

Adım 2 - ZooKeeper Framework Kurulumu

Adım 2.1 - ZooKeeper'ı İndirin

ZooKeeper çerçevesini makinenize kurmak için aşağıdaki bağlantıyı ziyaret edin ve ZooKeeper'ın en son sürümünü indirin http://zookeeper.apache.org/releases.html

Şu an itibariyle, ZooKeeper'ın en son sürümü 3.4.6'dır (ZooKeeper-3.4.6.tar.gz).

Adım 2.2 - tar dosyasını çıkartın

Tar dosyasını aşağıdaki komutları kullanarak çıkarın -

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

Adım 2.3 - Yapılandırma dosyası oluşturun

"Vi conf / zoo.cfg" komutunu kullanarak "conf / zoo.cfg" adlı yapılandırma dosyasını açın ve aşağıdaki tüm parametreleri başlangıç ​​noktası olarak ayarlayın.

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

Yapılandırma dosyası başarıyla kaydedildikten sonra, ZooKeeper sunucusunu başlatabilirsiniz.

Adım 2.4 - ZooKeeper Sunucusunu Başlatın

ZooKeeper sunucusunu başlatmak için aşağıdaki komutu kullanın.

$ bin/zkServer.sh start

Bu komutu çalıştırdıktan sonra, aşağıdaki gibi bir yanıt alacaksınız -

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

Adım 2.5 - CLI'yi başlatın

CLI'yi başlatmak için aşağıdaki komutu kullanın.

$ bin/zkCli.sh

Yukarıdaki komutu yürüttükten sonra ZooKeeper sunucusuna bağlanacak ve aşağıdaki yanıtı alacaksınız.

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

Adım 2.6 - ZooKeeper Sunucusunu Durdurun

Sunucuyu bağladıktan ve tüm işlemleri gerçekleştirdikten sonra, aşağıdaki komutu kullanarak ZooKeeper sunucusunu durdurabilirsiniz.

bin/zkServer.sh stop

Makinenize Java ve ZooKeeper'ı başarıyla yüklediniz. Şimdi Apache Storm çerçevesini kurma adımlarını görelim.

Adım 3 - Apache Storm Framework Kurulumu

Adım 3.1 Storm'u İndirin

Storm çerçevesini makinenize kurmak için aşağıdaki bağlantıyı ziyaret edin ve Storm'un en son sürümünü indirin http://storm.apache.org/downloads.html

Şu an itibariyle, Storm'un en son sürümü "apache-storm-0.9.5.tar.gz" dir.

Adım 3.2 - tar dosyasını çıkartın

Tar dosyasını aşağıdaki komutları kullanarak çıkarın -

$ cd opt/
$ tar -zxf apache-storm-0.9.5.tar.gz
$ cd apache-storm-0.9.5
$ mkdir data

Adım 3.3 - Yapılandırma dosyasını açın

Storm'un şu anki sürümü "conf / storm.yaml" adresinde Storm arka plan yordamlarını yapılandıran bir dosya içeriyor. Aşağıdaki bilgileri bu dosyaya ekleyin.

$ vi conf/storm.yaml
storm.zookeeper.servers:
 - "localhost"
storm.local.dir: “/path/to/storm/data(any path)”
nimbus.host: "localhost"
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703

Tüm değişiklikleri uyguladıktan sonra kaydedin ve terminale geri dönün.

Adım 3.4 - Nimbus'ı başlatın

$ bin/storm nimbus

Adım 3.5 - Süpervizörü Başlatın

$ bin/storm supervisor

Adım 3.6 Kullanıcı Arayüzünü Başlatın

$ bin/storm ui

Storm kullanıcı arayüzü uygulamasını başlattıktan sonra URL'yi yazın http://localhost:8080favori tarayıcınızda ve Storm küme bilgilerini ve çalışan topolojisini görebilirsiniz. Sayfa, aşağıdaki ekran görüntüsüne benzer görünmelidir.

Apache Storm'un temel teknik ayrıntılarını inceledik ve şimdi bazı basit senaryoları kodlamanın zamanı geldi.

Senaryo - Mobil Arama Kaydı Analizcisi

Mobil arama ve süresi, Apache Storm'a girdi olarak verilecek ve Fırtına, aramayı aynı arayan ve alıcı ile toplam arama sayısı arasında işleyecek ve gruplayacaktır.

Musluk Oluşturma

Spout, veri üretimi için kullanılan bir bileşendir. Temel olarak, bir ağzı bir IRichSpout arabirimi uygulayacaktır. "IRichSpout" arayüzü aşağıdaki önemli yöntemlere sahiptir -

  • open- Musluğun çalıştırılması için bir ortam sağlar. Uygulayıcılar, musluğu başlatmak için bu yöntemi çalıştıracaktır.

  • nextTuple - Oluşturulan verileri toplayıcı aracılığıyla yayar.

  • close - Bu yöntem, bir çıkış ağzı kapanacağı zaman çağrılır.

  • declareOutputFields - Demetin çıkış şemasını bildirir.

  • ack - Belirli bir demetin işlendiğini onaylar

  • fail - Belirli bir başlığın işlenmediğini ve yeniden işlenmeyeceğini belirtir.

Açık

İmzası open yöntem aşağıdaki gibidir -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - Bu çıkış ağzı için fırtına yapılandırması sağlar.

  • context - Topoloji içindeki çıkış yeri, görev kimliği, giriş ve çıkış bilgileri hakkında eksiksiz bilgi sağlar.

  • collector - Cıvatalarla işlenecek demeti yaymamızı sağlar.

nextTuple

İmzası nextTuple yöntem aşağıdaki gibidir -

nextTuple()

nextTuple (), ack () ve fail () yöntemleriyle aynı döngüden periyodik olarak çağrılır. Diğer yöntemlerin çağrılma şansı olması için, yapılacak iş olmadığında iş parçacığının denetimini bırakması gerekir. Bu nedenle nextTuple'ın ilk satırı işlemenin bitip bitmediğini kontrol eder. Öyleyse, geri dönmeden önce işlemci üzerindeki yükü azaltmak için en az bir milisaniye uyuması gerekir.

kapat

İmzası close yöntem aşağıdaki gibidir -

close()

declareOutputFields

İmzası declareOutputFields yöntem aşağıdaki gibidir -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - Çıkış akış kimliklerini, çıktı alanlarını vb. Bildirmek için kullanılır.

Bu yöntem, başlığın çıktı şemasını belirtmek için kullanılır.

ack

İmzası ack yöntem aşağıdaki gibidir -

ack(Object msgId)

Bu yöntem, belirli bir demetin işlendiğini kabul eder.

başarısız

İmzası nextTuple yöntem aşağıdaki gibidir -

ack(Object msgId)

Bu yöntem, belirli bir demetin tam olarak işlenmediğini bildirir. Fırtına, belirli bir grubu yeniden işleyecektir.

FakeCallLogReaderSpout

Senaryomuzda, arama günlüğü ayrıntılarını toplamamız gerekiyor. Çağrı kaydı bilgileri içerir.

  • arayan numarası
  • alıcı numarası
  • duration

Gerçek zamanlı arama kayıtları bilgisine sahip olmadığımız için sahte arama kayıtları oluşturacağız. Sahte bilgiler Random sınıfı kullanılarak oluşturulacaktır. Tam program kodu aşağıda verilmiştir.

Kodlama - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt Oluşturma

Bolt, tuple'ları girdi olarak alan, tuple'ı işleyen ve çıktı olarak yeni tuples üreten bir bileşendir. Cıvatalar uygulayacakIRichBoltarayüz. Bu programda iki cıvata sınıfıCallLogCreatorBolt ve CallLogCounterBolt işlemleri gerçekleştirmek için kullanılır.

IRichBolt arayüzü aşağıdaki yöntemlere sahiptir -

  • prepare- Cıvataya uygulanacak bir ortam sağlar. Uygulayıcılar, musluğu başlatmak için bu yöntemi çalıştıracaktır.

  • execute - Tek bir giriş demeti işleyin.

  • cleanup - Bir cıvata kapanacağı zaman çağrılır.

  • declareOutputFields - Demetin çıkış şemasını bildirir.

Hazırlamak

İmzası prepare yöntem aşağıdaki gibidir -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - Bu cıvata için Storm yapılandırması sağlar.

  • context - Topoloji içindeki cıvata yeri, görev kimliği, giriş ve çıkış bilgileri vb. Hakkında eksiksiz bilgi sağlar.

  • collector - İşlenmiş demeti yaymamızı sağlar.

yürütmek

İmzası execute yöntem aşağıdaki gibidir -

execute(Tuple tuple)

Buraya tuple işlenecek girdi demetidir.

executeyöntem bir seferde tek bir demeti işler. Tuple verilerine Tuple sınıfının getValue yöntemi ile erişilebilir. Giriş demetini hemen işlemek gerekli değildir. Birden çok demet işlenebilir ve tek bir çıktı demeti olarak çıkarılabilir. İşlenen tuple, OutputCollector sınıfı kullanılarak yayınlanabilir.

Temizlemek

İmzası cleanup yöntem aşağıdaki gibidir -

cleanup()

declareOutputFields

İmzası declareOutputFields yöntem aşağıdaki gibidir -

declareOutputFields(OutputFieldsDeclarer declarer)

İşte parametre declarer çıktı akış kimliklerini, çıktı alanlarını vb. bildirmek için kullanılır.

Bu yöntem, tuple'ın çıktı şemasını belirtmek için kullanılır.

Çağrı kaydı Oluşturucu Bolt

Arama günlüğü oluşturucu cıvatası, arama günlüğü grubunu alır. Arama kaydı grubu, arayan numarası, alıcı numarası ve arama süresine sahiptir. Bu cıvata, arayan numarasını ve alıcı numarasını birleştirerek yeni bir değer yaratır. Yeni değerin biçimi "Arayan numarası - Alıcı numarası" şeklindedir ve yeni alan "ara" olarak adlandırılır. Kodun tamamı aşağıda verilmiştir.

Kodlama - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Çağrı kaydı Sayaç Cıvatası

Arama günlüğü sayaç cıvatası, aramayı ve süresini bir tuple olarak alır. Bu cıvata, hazırlama yönteminde bir sözlük (Harita) nesnesini başlatır. İçindeexecuteyöntem, demeti kontrol eder ve demetteki her yeni "çağrı" değeri için sözlük nesnesinde yeni bir giriş oluşturur ve sözlük nesnesinde bir 1 değeri ayarlar. Sözlükte zaten mevcut olan giriş için, yalnızca değerini artırır. Basit bir ifadeyle, bu cıvata, aramayı ve onun sayısını sözlük nesnesine kaydeder. Çağrıyı ve sayısını sözlüğe kaydetmek yerine, bir veri kaynağına da kaydedebiliriz. Tam program kodu aşağıdaki gibidir -

Kodlama - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Topoloji Oluşturma

Fırtına topolojisi temelde bir Thrift yapısıdır. TopologyBuilder sınıfı, karmaşık topolojiler oluşturmak için basit ve kolay yöntemler sağlar. TopologyBuilder sınıfı, musluğu ayarlamak için yöntemler içerir(setSpout) ve cıvatayı ayarlamak için (setBolt). Son olarak, TopologyBuilder topoloji oluşturmak için createTopology'ye sahiptir. Bir topoloji oluşturmak için aşağıdaki kod parçacığını kullanın -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping ve fieldsGrouping yöntemler, ağız ve cıvatalar için akış gruplamasının ayarlanmasına yardımcı olur.

Yerel Küme

Geliştirme amacıyla, "LocalCluster" nesnesini kullanarak yerel bir küme oluşturabilir ve ardından "LocalCluster" sınıfının "submitTopology" yöntemini kullanarak topolojiyi sunabiliriz. "SubmitTopology" argümanlarından biri "Config" sınıfının bir örneğidir. "Config" sınıfı, topolojiyi göndermeden önce yapılandırma seçeneklerini ayarlamak için kullanılır. Bu yapılandırma seçeneği, çalışma zamanında küme yapılandırmasıyla birleştirilecek ve hazırlama yöntemiyle tüm göreve (ağız ve cıvata) gönderilecektir. Topoloji kümeye gönderildikten sonra, kümenin gönderilen topolojiyi hesaplaması için 10 saniye bekleyeceğiz ve ardından "LocalCluster" ın "kapatma" yöntemini kullanarak kümeyi kapatacağız. Tam program kodu aşağıdaki gibidir -

Kodlama - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Uygulamayı Oluşturma ve Çalıştırma

Tam uygulamanın dört Java kodu vardır. Onlar -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

Uygulama aşağıdaki komut kullanılarak oluşturulabilir -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Uygulama aşağıdaki komut kullanılarak çalıştırılabilir -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Çıktı

Uygulama başlatıldıktan sonra, küme başlatma süreci, ağızlık ve cıvata işleme ve son olarak küme kapatma işlemiyle ilgili tüm ayrıntıları verir. "CallLogCounterBolt" da, çağrıyı ve sayım ayrıntılarını yazdırdık. Bu bilgiler konsolda aşağıdaki şekilde görüntülenecektir -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

JVM olmayan diller

Fırtına topolojileri, herhangi bir dilde topolojilerin gönderilmesini kolaylaştıran Thrift arayüzleri tarafından uygulanır. Storm, Ruby, Python ve diğer birçok dili destekler. Python bağlamasına bir göz atalım.

Python Bağlama

Python, genel amaçlı yorumlanmış, etkileşimli, nesne yönelimli ve üst düzey bir programlama dilidir. Storm, topolojisini uygulamak için Python'u destekler. Python, yayma, bağlama, onaylama ve günlüğe kaydetme işlemlerini destekler.

Bildiğiniz gibi cıvatalar herhangi bir dilde tanımlanabilir. Başka bir dilde yazılmış cıvatalar alt süreçler olarak yürütülür ve Storm bu alt süreçlerle stdin / stdout üzerinden JSON mesajlarıyla iletişim kurar. Öncelikle python bağlamayı destekleyen örnek bir WordCount cıvatası alın.

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

İşte sınıf WordCount uygular IRichBoltarayüz ve python uygulamasıyla çalışan süper yöntem bağımsız değişkeni "splitword.py" ile çalışıyor. Şimdi "splitword.py" adlı bir python uygulaması oluşturun.

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

Bu, belirli bir cümledeki kelimeleri sayan Python için örnek uygulama. Benzer şekilde, diğer destekleyici dillerle de bağlanabilirsiniz.

Trident, Storm'un bir uzantısıdır. Storm gibi Trident da Twitter tarafından geliştirildi. Trident'i geliştirmenin arkasındaki ana neden, durum bilgili akış işleme ve düşük gecikmeli dağıtılmış sorgulama ile birlikte Storm'un üzerinde yüksek düzeyde bir soyutlama sağlamaktır.

Trident, ağızlık ve cıvata kullanır, ancak bu düşük seviyeli bileşenler, yürütülmeden önce Trident tarafından otomatik olarak oluşturulur. Trident'in işlevleri, filtreleri, birleştirmeleri, gruplamaları ve kümelenmesi vardır.

Trident, akışları işlemler olarak adlandırılan bir dizi parti olarak işler. Genel olarak, bu küçük grupların boyutu, giriş akışına bağlı olarak binlerce veya milyonlarca tuple düzeninde olacaktır. Bu şekilde, Trident, tek tek işlemeyi gerçekleştiren Storm'dan farklıdır.

Toplu işlem kavramı, veritabanı işlemlerine çok benzer. Her işleme bir işlem kimliği atanır. İşlem tamamlandıktan sonra işlem başarılı kabul edilir. Bununla birlikte, işlemin demetlerinden birinin işlenmesindeki bir başarısızlık, tüm işlemin yeniden iletilmesine neden olacaktır. Her parti için Trident, işlemin başında beginCommit'i çağırır ve sonunda taahhüt eder.

Trident Topolojisi

Trident API, "TridentTopology" sınıfını kullanarak Trident topolojisi oluşturmak için kolay bir seçenek sunar. Temel olarak, Trident topolojisi çıkış ağzından giriş akışını alır ve akışta sıralı işlem (filtre, toplama, gruplama vb.) Gerçekleştirir. Storm Tuple, Trident Tuple ile değiştirilir ve Cıvatalar operasyonlarla değiştirilir. Aşağıdaki gibi basit bir Trident topolojisi oluşturulabilir -

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident tuple, adlandırılmış bir değerler listesidir. TridentTuple arayüzü, bir Trident topolojisinin veri modelidir. TridentTuple arayüzü, bir Trident topolojisi tarafından işlenebilen temel veri birimidir.

Trident Emzik

Trident ağzı, Trident'in özelliklerini kullanmak için ek seçeneklerle birlikte Fırtına musluğuna benzer. Aslında Storm topolojisinde kullandığımız IRichSpout'u hala kullanabiliriz, ancak doğası gereği işlem dışı olacak ve Trident'in sağladığı avantajları kullanamayacağız.

Trident'in özelliklerini kullanmak için tüm işlevselliğe sahip temel ağızlık "ITridentSpout" dur. Hem işlemsel hem de opak işlemsel semantiği destekler. Diğer musluklar IBatchSpout, IPartitionedTridentSpout ve IOpaquePartitionedTridentSpout'tur.

Bu genel ağızlıklara ek olarak, Trident, alabalık ağızlığın birçok örnek uygulamasına sahiptir. Bunlardan biri, toplu işlem, paralellik vb. Hakkında endişelenmeden, adlandırılmış trident demetler listesini kolayca göndermek için kullanabileceğimiz FeederBatchSpout musluğu.

FeederBatchSpout oluşturma ve veri besleme aşağıda gösterildiği gibi yapılabilir -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident Operasyonları

Trident, üç dişli dizilerin giriş akışını işlemek için "Üç Dişli Mızrak Operasyonu" na güvenir. Trident API, basitten karmaşığa akış işlemeyi idare etmek için bir dizi yerleşik işleme sahiptir. Bu işlemler, basit doğrulamadan karmaşık gruplamaya ve trident tuple'ların toplanmasına kadar çeşitlilik gösterir. En önemli ve sık kullanılan işlemlerden geçelim.

Filtrele

Filtre, giriş doğrulama görevini gerçekleştirmek için kullanılan bir nesnedir. Bir Trident filtresi, giriş olarak trident tuple alanlarının bir alt kümesini alır ve belirli koşulların karşılanıp karşılanmadığına bağlı olarak doğru veya yanlış döndürür. True döndürülürse, tuple çıktı akışında tutulur; aksi takdirde, demet akıştan kaldırılır. Filtre temeldeBaseFilter sınıflayın ve uygulayın isKeepyöntem. İşte filtre işleminin örnek bir uygulaması -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

Filtre işlevi, "her" yöntem kullanılarak topolojide çağrılabilir. "Alanlar" sınıfı, girişi belirtmek için kullanılabilir (üç dişli dizinin alt kümesi). Örnek kod aşağıdaki gibidir -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

Fonksiyon

Functiontek bir trident demet üzerinde basit bir işlem gerçekleştirmek için kullanılan bir nesnedir. Trident tuple alanlarının bir alt kümesini alır ve sıfır veya daha fazla trident demet alanı yayar.

Function temelde miras alır BaseFunction sınıf ve uygular executeyöntem. Aşağıda örnek bir uygulama verilmiştir -

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Tıpkı Filtre işlemi gibi, İşlev işlemi de bir topolojide çağrılabilir. eachyöntem. Örnek kod aşağıdaki gibidir -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

Toplama

Toplama, bir girdi grubu, bölüm veya akış üzerinde toplama işlemleri gerçekleştirmek için kullanılan bir nesnedir. Trident'in üç tür toplama vardır. Bunlar aşağıdaki gibidir -

  • aggregate- Her bir üç dişli demet grubunu ayrı ayrı toplar. Birleştirme işlemi sırasında, gruplar başlangıçta aynı partinin tüm bölümlerini tek bir bölümde birleştirmek için genel gruplandırma kullanılarak yeniden bölümlenir.

  • partitionAggregate- Üç dişli dizinin tamamı yerine her bölümü toplar. Bölüm toplamının çıktısı, giriş demetinin tamamen yerini alır. Bölüm toplamının çıktısı tek bir alan demeti içerir.

  • persistentaggregate - Tüm toplu işteki tüm trident tuple üzerinde toplanır ve sonucu bellek veya veri tabanında depolar.

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Toplama işlemi, CombinerAggregator, ReducerAggregator veya jenerik Toplayıcı arabirimi kullanılarak oluşturulabilir. Yukarıdaki örnekte kullanılan "sayım" toplayıcı, yerleşik toplayıcılardan biridir. "CombinerAggregator" kullanılarak uygulanır. Uygulama aşağıdaki gibidir -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Gruplama

Gruplama işlemi dahili bir işlemdir ve groupByyöntem. GroupBy yöntemi, belirtilen alanlarda bir partitionBy yaparak akışı yeniden bölümler ve ardından her bölüm içinde, grup alanları eşit olan grupları bir arada gruplar. Normalde, gruplanmış toplamayı elde etmek için "persistentAggregate" ile birlikte "groupBy" kullanırız. Örnek kod aşağıdaki gibidir -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Birleştirme ve Birleştirme

Birleştirme ve birleştirme sırasıyla “birleştirme” ve “birleştirme” yöntemi kullanılarak yapılabilir. Birleştirme, bir veya daha fazla akışı birleştirir. Birleştirme, birleştirmenin iki akışı kontrol etmek ve birleştirmek için her iki taraftan trident tuple alanını kullanması dışında birleştirmeye benzer. Dahası, birleştirme yalnızca toplu iş düzeyinde çalışacaktır. Örnek kod aşağıdaki gibidir -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

Durum Bakımı

Trident, durum bakımı için bir mekanizma sağlar. Durum bilgisi topolojinin kendisinde saklanabilir, aksi takdirde ayrı bir veritabanında da saklayabilirsiniz. Bunun nedeni, işleme sırasında herhangi bir başlığın başarısız olması durumunda, başarısız olan başlığın yeniden deneneceği bir durumu sürdürmektir. Bu, durumu güncellerken bir sorun yaratır çünkü bu demet durumunun daha önce güncellenip güncellenmediğinden emin değilsiniz. Tuple durumu güncellemeden önce başarısız olduysa, demeti yeniden denemek durumu kararlı hale getirecektir. Ancak, durum güncellendikten sonra demet başarısız olursa, aynı demeti yeniden denemek veritabanındaki sayımı tekrar artıracak ve durumu kararsız hale getirecektir. Bir mesajın yalnızca bir kez işlenmesini sağlamak için aşağıdaki adımları gerçekleştirmeniz gerekir -

  • Demetleri küçük gruplar halinde işleyin.

  • Her partiye benzersiz bir kimlik atayın. Parti yeniden denenirse, aynı benzersiz kimlik verilir.

  • Durum güncellemeleri gruplar arasında sıralanır. Örneğin, ikinci partinin durum güncellemesi, birinci partinin durum güncellemesi tamamlanana kadar mümkün olmayacaktır.

Dağıtılmış RPC

Dağıtılmış RPC, sonucu sorgulamak ve Trident topolojisinden almak için kullanılır. Storm'un dahili dağıtılmış bir RPC sunucusu vardır. Dağıtılmış RPC sunucusu istemciden RPC talebini alır ve bunu topolojiye iletir. Topoloji, isteği işler ve sonucu, dağıtılmış RPC sunucusu tarafından istemciye yeniden yönlendirilen dağıtılmış RPC sunucusuna gönderir. Trident'in dağıtılmış RPC sorgusu, bu sorguların paralel olarak çalıştırılması dışında normal bir RPC sorgusu gibi yürütülür.

Trident Ne Zaman Kullanılır?

Çoğu kullanım durumunda olduğu gibi, gereksinim bir sorguyu yalnızca bir kez işleme koymaksa, bunu Trident'te bir topoloji yazarak gerçekleştirebiliriz. Öte yandan, Storm durumunda tam olarak bir kez işlendiğinde elde etmek zor olacaktır. Bu nedenle Trident, tam olarak bir kez işlemeye ihtiyaç duyduğunuz kullanım durumları için faydalı olacaktır. Trident, Storm'a karmaşıklık kattığı ve durumu yönettiği için tüm kullanım durumları, özellikle yüksek performanslı kullanım durumları için değildir.

Trident'in Çalışma Örneği

Önceki bölümde oluşturulan arama günlüğü analizörü uygulamamızı Trident çerçevesine dönüştüreceğiz. Trident uygulaması, üst düzey API'si sayesinde düz fırtınaya kıyasla nispeten kolay olacaktır. Storm'un temelde Trident'teki Function, Filter, Aggregate, GroupBy, Join ve Merge işlemlerinden herhangi birini gerçekleştirmesi gerekecek. Son olarak, DRPC SunucusunuLocalDRPC sınıfına girin ve bazı anahtar kelimeleri arayın execute LocalDRPC sınıfının yöntemi.

Çağrı bilgilerini biçimlendirme

FormatCall sınıfının amacı, "Arayan numarası" ve "Alıcı numarası" ndan oluşan çağrı bilgilerini biçimlendirmektir. Tam program kodu aşağıdaki gibidir -

Kodlama: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit sınıfının amacı, girdi dizesini "virgül (,)" temelinde bölmek ve dizedeki her kelimeyi yaymaktır. Bu işlev, dağıtılmış sorgulamanın girdi bağımsız değişkenini çözümlemek için kullanılır. Kodun tamamı aşağıdaki gibidir -

Kodlama: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

Log Analyzer

Bu ana uygulamadır. Başlangıçta, uygulama TridentTopology'yi başlatacak ve arayan bilgilerini kullanarakFeederBatchSpout. Trident topoloji akışı,newStreamTridentTopology sınıfının yöntemi. Benzer şekilde, Trident topoloji DRPC akışı kullanılarak oluşturulabilir.newDRCPStreamTridentTopology sınıfının yöntemi. LocalDRPC sınıfı kullanılarak basit bir DRCP sunucusu oluşturulabilir.LocalDRPCbazı anahtar kelimeleri aramak için bir yöntem yürütür. Kodun tamamı aşağıda verilmiştir.

Kodlama: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

Uygulamayı Oluşturma ve Çalıştırma

Tam uygulamanın üç Java kodu vardır. Bunlar aşağıdaki gibidir -

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

Uygulama aşağıdaki komut kullanılarak oluşturulabilir -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

Uygulama aşağıdaki komut kullanılarak çalıştırılabilir -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Çıktı

Uygulama başlatıldıktan sonra, uygulama küme başlatma süreci, işlemlerin işlenmesi, DRPC Sunucusu ve istemci bilgileri ve son olarak küme kapatma işlemi hakkında tüm ayrıntıları verir. Bu çıktı, aşağıda gösterildiği gibi konsolda görüntülenecektir.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends

İşte bu bölümde, Apache Storm'un gerçek zamanlı bir uygulamasını tartışacağız. Fırtına'nın nasıl kullanıldığını Twitter'da göreceğiz.

Twitter

Twitter, kullanıcı tweetleri göndermek ve almak için bir platform sağlayan çevrimiçi bir sosyal ağ hizmetidir. Kayıtlı kullanıcılar tweet okuyabilir ve gönderebilir ancak kayıtsız kullanıcılar sadece tweet okuyabilir. Hashtag, ilgili anahtar kelimeden önce # ekleyerek tweetleri anahtar kelimeye göre kategorize etmek için kullanılır. Şimdi konu başına en çok kullanılan hashtag'i bulmanın gerçek zamanlı bir senaryosunu ele alalım.

Musluk Oluşturma

Spout'un amacı, insanlar tarafından gönderilen tweet'leri mümkün olan en kısa sürede almaktır. Twitter, kişiler tarafından gönderilen tweet'leri gerçek zamanlı olarak almak için web hizmeti tabanlı bir araç olan “Twitter Akış API” sını sağlar. Twitter Streaming API'ye herhangi bir programlama dilinde erişilebilir.

twitter4j Twitter Streaming API'ye kolayca erişmek için Java tabanlı bir modül sağlayan açık kaynaklı, resmi olmayan bir Java kitaplığıdır. twitter4jtweetlere erişmek için dinleyici tabanlı bir çerçeve sağlar. Twitter Streaming API'ye erişmek için Twitter geliştirici hesabı için oturum açmamız ve aşağıdaki OAuth kimlik doğrulama ayrıntılarını almamız gerekir.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Storm bir twitter ağzı sağlar, TwitterSampleSpout,başlangıç ​​kitinde. Tweetleri almak için kullanacağız. Çıkış, OAuth kimlik doğrulama ayrıntılarına ve en azından bir anahtar kelimeye ihtiyaç duyar. Çıkış, anahtar kelimelere göre gerçek zamanlı tweetler yayınlayacaktır. Tam program kodu aşağıda verilmiştir.

Kodlama: TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }
		
   public TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }
					
            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}
					
            @Override
            public void onTrackLimitationNotice(int i) {}
					
            @Override
            public void onScrubGeo(long l, long l1) {}
					
            @Override
            public void onException(Exception ex) {}
					
            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = new ConfigurationBuilder();
				
         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);
				
         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   public void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }
			
   @Override
   public void close() {
      _twitterStream.shutdown();
   }
			
   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }
			
   @Override
   public void ack(Object id) {}
			
   @Override
   public void fail(Object id) {}
			
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

Hashtag Okuyucu Cıvatası

Emzikten çıkan tweet şu adrese iletilecek: HashtagReaderBolt, tweet'i işleyecek ve mevcut tüm hashtag'leri gönderecek. HashtagReaderBolt kullanırgetHashTagEntitiestwitter4j tarafından sağlanan yöntem. getHashTagEntities tweet'i okur ve hashtag listesini döndürür. Tam program kodu aşağıdaki gibidir -

Kodlama: HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Hashtag Sayaç Cıvatası

Yayınlanan hashtag, adresine iletilecek HashtagCounterBolt. Bu cıvata, tüm hashtag'leri işleyecek ve her bir hashtag'i ve bunların sayısını Java Map nesnesini kullanarak belleğe kaydedecektir. Tam program kodu aşağıda verilmiştir.

Kodlama: HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Bir Topoloji Gönderme

Topoloji göndermek ana uygulamadır. Twitter topolojisi şunlardan oluşur:TwitterSampleSpout, HashtagReaderBolt, ve HashtagCounterBolt. Aşağıdaki program kodu, bir topolojinin nasıl gönderileceğini gösterir.

Kodlama: TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Uygulamayı Oluşturma ve Çalıştırma

Tam uygulamanın dört Java kodu vardır. Bunlar aşağıdaki gibidir -

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

Aşağıdaki komutu kullanarak uygulamayı derleyebilirsiniz -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

Aşağıdaki komutları kullanarak uygulamayı çalıştırın -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

Çıktı

Uygulama, mevcut mevcut hashtag'i ve sayısını yazdıracaktır. Çıktı aşağıdakine benzer olmalıdır -

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1

Yahoo! Finans, İnternet'in önde gelen iş haberleri ve finansal veriler web sitesidir. Yahoo! finans haberleri, piyasa istatistikleri, uluslararası piyasa verileri ve finans kaynakları hakkında herkesin erişebileceği diğer bilgiler hakkında bilgi verir.

Kayıtlı bir Yahoo! kullanıcı, daha sonra Yahoo! Finans, belirli tekliflerinden yararlanmak için. Yahoo! Finans API, Yahoo!

Bu API, gerçek zamanlı olarak 15 dakika geciken verileri görüntüler ve mevcut stokla ilgili bilgilere erişmek için veritabanını her 1 dakikada bir günceller. Şimdi bir şirketin gerçek zamanlı senaryosunu ele alalım ve hisse senedi değeri 100'ün altına düştüğünde nasıl uyarı verileceğini görelim.

Musluk Oluşturma

Emziklerin amacı, firma detaylarını almak ve fiyatları civatalara yaymaktır. Bir dağıtıcı oluşturmak için aşağıdaki program kodunu kullanabilirsiniz.

Kodlama: YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
   private SpoutOutputCollector collector;
   private boolean completed = false;
   private TopologyContext context;
	
   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      try {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this.collector.emit(new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("company", "price"));
   }

   @Override
   public void close() {}
	
   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Bolt Oluşturma

Burada cıvatanın amacı, fiyatlar 100'ün altına düştüğünde verilen şirketin fiyatlarını işlemektir. Kesme fiyatı sınırı uyarısını şu şekilde ayarlamak için Java Map nesnesini kullanır: truehisse senedi fiyatları 100'ün altına düştüğünde; aksi takdirde yanlış. Tam program kodu aşağıdaki gibidir -

Kodlama: PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;
	
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.cutOffMap = new HashMap <String, Integer>();
      this.cutOffMap.put("INTC", 100);
      this.cutOffMap.put("AAPL", 100);
      this.cutOffMap.put("GOOGL", 100);

      this.resultMap = new HashMap<String, Boolean>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this.resultMap.put(company, true);
         } else {
            this.resultMap.put(company, false);
         }
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("cut_off_price"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Bir Topoloji Gönderme

YahooFinanceSpout.java ve PriceCutOffBolt.java'nın birbirine bağlandığı ve bir topoloji oluşturduğu ana uygulamadır. Aşağıdaki program kodu, bir topolojiyi nasıl gönderebileceğinizi gösterir.

Kodlama: YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Uygulamayı Oluşturma ve Çalıştırma

Tam uygulamanın üç Java kodu vardır. Bunlar aşağıdaki gibidir -

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

Uygulama aşağıdaki komut kullanılarak oluşturulabilir -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

Uygulama aşağıdaki komut kullanılarak çalıştırılabilir -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

Çıktı

Çıktı aşağıdakine benzer olacaktır -

GOOGL : false
AAPL : false
INTC : true

Apache Storm çerçevesi, günümüzün en iyi endüstriyel uygulamalarının çoğunu destekler. Bu bölümde Storm'un en dikkate değer uygulamalarından bazılarına çok kısa bir genel bakış sunacağız.

Klout

Klout, kullanıcılarını çevrimiçi sosyal etkiye göre sıralamak için sosyal medya analitiğini kullanan bir uygulamadır. Klout Score, 1 ile 100 arasında sayısal bir değerdir. Klout, veri akışı sağlayan karmaşık topolojiler oluşturmak için Apache Storm'un dahili Trident soyutlamasını kullanır.

Hava Kanalı

Hava Durumu Kanalı, hava durumu verilerini almak için Storm topolojilerini kullanır. Twitter ve mobil uygulamalarda hava durumu bilgisine sahip reklamcılığı etkinleştirmek için Twitter ile bağlantı kurdu.OpenSignal kablosuz kapsama alanı haritalama konusunda uzmanlaşmış bir şirkettir. StormTag ve WeatherSignalOpenSignal tarafından oluşturulan hava durumuna dayalı projelerdir. StormTag, bir anahtarlığa takılan bir Bluetooth hava istasyonudur. Cihaz tarafından toplanan hava durumu verileri, WeatherSignal uygulamasına ve OpenSignal sunucularına gönderilir.

Telekom Endüstrisi

Telekomünikasyon sağlayıcıları saniyede milyonlarca telefon görüşmesini işler. Düşen aramalarda ve düşük ses kalitesinde adli tıp yaparlar. Çağrı ayrıntısı kayıtları saniyede milyonlarca akar ve Apache Storm bunları gerçek zamanlı olarak işler ve rahatsız edici kalıpları tanımlar. Fırtına analizi, çağrı kalitesini sürekli olarak iyileştirmek için kullanılabilir.