Apache Storm - Çalışma Örneği
Apache Storm'un temel teknik ayrıntılarını inceledik ve şimdi bazı basit senaryoları kodlamanın zamanı geldi.
Senaryo - Mobil Arama Kaydı Analizcisi
Mobil arama ve süresi, Apache Storm'a girdi olarak verilecek ve Fırtına, aramayı aynı arayan ve alıcı ile toplam arama sayısı arasında işleyecek ve gruplayacaktır.
Musluk Oluşturma
Spout, veri üretimi için kullanılan bir bileşendir. Temel olarak, bir ağzı bir IRichSpout arabirimi uygulayacaktır. "IRichSpout" arayüzü aşağıdaki önemli yöntemlere sahiptir -
open- Musluğun çalıştırılması için bir ortam sağlar. Uygulayıcılar, musluğu başlatmak için bu yöntemi çalıştıracaktır.
nextTuple - Oluşturulan verileri toplayıcı aracılığıyla yayar.
close - Bu yöntem, bir musluk kapanacağı zaman çağrılır.
declareOutputFields - Demetin çıkış şemasını bildirir.
ack - Belirli bir demetin işlendiğini onaylar
fail - Belirli bir başlığın işlenmediğini ve yeniden işlenmeyeceğini belirtir.
Açık
İmzası open yöntem aşağıdaki gibidir -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - Bu çıkış ağzı için fırtına yapılandırması sağlar.
context - Topoloji içindeki çıkış yeri, görev kimliği, giriş ve çıkış bilgileri hakkında eksiksiz bilgi sağlar.
collector - Cıvatalarla işlenecek demeti yaymamızı sağlar.
nextTuple
İmzası nextTuple yöntem aşağıdaki gibidir -
nextTuple()
nextTuple (), ack () ve fail () yöntemleriyle aynı döngüden periyodik olarak çağrılır. Diğer yöntemlerin çağrılma şansı olması için yapılacak iş olmadığında iş parçacığının denetimini bırakması gerekir. Bu nedenle nextTuple'ın ilk satırı işlemenin bitip bitmediğini kontrol eder. Öyleyse, geri dönmeden önce işlemci üzerindeki yükü azaltmak için en az bir milisaniye uyuması gerekir.
kapat
İmzası close yöntem aşağıdaki gibidir -
close()
declareOutputFields
İmzası declareOutputFields yöntem aşağıdaki gibidir -
declareOutputFields(OutputFieldsDeclarer declarer)
declarer - Çıkış akış kimliklerini, çıktı alanlarını vb. Bildirmek için kullanılır.
Bu yöntem, başlığın çıktı şemasını belirtmek için kullanılır.
ack
İmzası ack yöntem aşağıdaki gibidir -
ack(Object msgId)
Bu yöntem, belirli bir demetin işlendiğini kabul eder.
başarısız
İmzası nextTuple yöntem aşağıdaki gibidir -
ack(Object msgId)
Bu yöntem, belirli bir demetin tam olarak işlenmediğini bildirir. Fırtına, belirli bir grubu yeniden işleyecektir.
FakeCallLogReaderSpout
Senaryomuzda, arama günlüğü ayrıntılarını toplamamız gerekiyor. Çağrı kaydı bilgileri içerir.
- arayan numarası
- alıcı numarası
- duration
Gerçek zamanlı arama kayıtları bilgisine sahip olmadığımız için sahte arama kayıtları oluşturacağız. Sahte bilgiler Random sınıfı kullanılarak oluşturulacaktır. Tam program kodu aşağıda verilmiştir.
Kodlama - FakeCallLogReaderSpout.java
import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface
to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;
//Create instance for TopologyContext which contains topology data.
private TopologyContext context;
//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
if(this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
//Override all the interface methods
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Bolt Oluşturma
Bolt, tuple'ları girdi olarak alan, tuple'ı işleyen ve çıktı olarak yeni tuples üreten bir bileşendir. Cıvatalar uygulayacakIRichBoltarayüz. Bu programda iki cıvata sınıfıCallLogCreatorBolt ve CallLogCounterBolt işlemleri gerçekleştirmek için kullanılır.
IRichBolt arayüzü aşağıdaki yöntemlere sahiptir -
prepare- Cıvataya uygulanacak bir ortam sağlar. Uygulayıcılar, musluğu başlatmak için bu yöntemi çalıştıracaktır.
execute - Tek bir giriş demeti işleyin.
cleanup - Bir cıvata kapanacağı zaman çağrılır.
declareOutputFields - Demetin çıkış şemasını bildirir.
Hazırlamak
İmzası prepare yöntem aşağıdaki gibidir -
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf - Bu cıvata için Storm yapılandırması sağlar.
context - Topoloji içindeki cıvata yeri, görev kimliği, giriş ve çıkış bilgileri vb. Hakkında eksiksiz bilgi sağlar.
collector - İşlenmiş demeti yaymamızı sağlar.
yürütmek
İmzası execute yöntem aşağıdaki gibidir -
execute(Tuple tuple)
Buraya tuple işlenecek girdi demetidir.
executeyöntem bir seferde tek bir demeti işler. Tuple verilerine Tuple sınıfının getValue yöntemi ile erişilebilir. Giriş demetini hemen işlemek gerekli değildir. Birden çok demet işlenebilir ve tek bir çıktı demeti olarak çıkarılabilir. İşlenen tuple, OutputCollector sınıfı kullanılarak yayınlanabilir.
Temizlemek
İmzası cleanup yöntem aşağıdaki gibidir -
cleanup()
declareOutputFields
İmzası declareOutputFields yöntem aşağıdaki gibidir -
declareOutputFields(OutputFieldsDeclarer declarer)
İşte parametre declarer çıktı akış kimliklerini, çıktı alanlarını vb. bildirmek için kullanılır.
Bu yöntem, tuple'ın çıktı şemasını belirtmek için kullanılır.
Çağrı kaydı Oluşturucu Bolt
Arama günlüğü oluşturucu cıvatası, arama günlüğü grubunu alır. Arama kaydı grubu, arayan numarası, alıcı numarası ve arama süresine sahiptir. Bu cıvata, arayan numarasını ve alıcı numarasını birleştirerek yeni bir değer yaratır. Yeni değerin biçimi "Arayan numarası - Alıcı numarası" şeklindedir ve yeni alan "ara" olarak adlandırılır. Kodun tamamı aşağıda verilmiştir.
Kodlama - CallLogCreatorBolt.java
//import util packages
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " - " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Çağrı kaydı Sayaç Cıvatası
Arama günlüğü sayaç cıvatası, aramayı ve süresini bir tuple olarak alır. Bu cıvata, hazırlama yönteminde bir sözlük (Harita) nesnesini başlatır. İçindeexecuteyöntem, demeti kontrol eder ve demetteki her yeni "çağrı" değeri için sözlük nesnesinde yeni bir giriş oluşturur ve sözlük nesnesinde bir 1 değeri ayarlar. Sözlükte zaten mevcut olan giriş için, yalnızca değerini artırır. Basit bir ifadeyle, bu cıvata, aramayı ve onun sayısını sözlük nesnesine kaydeder. Çağrıyı ve sayısını sözlüğe kaydetmek yerine, bir veri kaynağına da kaydedebiliriz. Tam program kodu aşağıdaki gibidir -
Kodlama - CallLogCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Topoloji Oluşturma
Fırtına topolojisi temelde bir Thrift yapısıdır. TopologyBuilder sınıfı, karmaşık topolojiler oluşturmak için basit ve kolay yöntemler sağlar. TopologyBuilder sınıfı, musluğu ayarlamak için yöntemler içerir(setSpout) ve cıvatayı ayarlamak için (setBolt). Son olarak, TopologyBuilder topoloji oluşturmak için createTopology'ye sahiptir. Bir topoloji oluşturmak için aşağıdaki kod parçacığını kullanın -
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping ve fieldsGrouping yöntemler, ağız ve cıvatalar için akış gruplamasının ayarlanmasına yardımcı olur.
Yerel Küme
Geliştirme amacıyla, "LocalCluster" nesnesini kullanarak yerel bir küme oluşturabilir ve ardından "LocalCluster" sınıfının "submitTopology" yöntemini kullanarak topolojiyi sunabiliriz. "SubmitTopology" argümanlarından biri "Config" sınıfının bir örneğidir. "Config" sınıfı, topolojiyi göndermeden önce yapılandırma seçeneklerini ayarlamak için kullanılır. Bu yapılandırma seçeneği, çalışma zamanında küme yapılandırmasıyla birleştirilecek ve hazırlama yöntemiyle tüm göreve (ağızlık ve cıvata) gönderilecektir. Topoloji kümeye gönderildikten sonra, kümenin gönderilen topolojiyi hesaplaması için 10 saniye bekleyeceğiz ve ardından "LocalCluster" ın "kapatma" yöntemini kullanarak kümeyi kapatacağız. Tam program kodu aşağıdaki gibidir -
Kodlama - LogAnalyserStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
Uygulamayı Oluşturma ve Çalıştırma
Tam uygulamanın dört Java kodu vardır. Onlar -
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
Uygulama aşağıdaki komut kullanılarak oluşturulabilir -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
Uygulama aşağıdaki komut kullanılarak çalıştırılabilir -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Çıktı
Uygulama başlatıldıktan sonra, küme başlatma süreci, ağızlık ve cıvata işleme ve son olarak küme kapatma işlemiyle ilgili tüm ayrıntıları verir. "CallLogCounterBolt" da, çağrıyı ve sayım ayrıntılarını yazdırdık. Bu bilgiler konsolda aşağıdaki şekilde görüntülenecektir -
1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93
JVM dışı diller
Fırtına topolojileri, herhangi bir dilde topolojilerin gönderilmesini kolaylaştıran Thrift arayüzleri tarafından uygulanır. Storm, Ruby, Python ve diğer birçok dili destekler. Python bağlamasına bir göz atalım.
Python Bağlama
Python, genel amaçlı yorumlanmış, etkileşimli, nesne yönelimli ve üst düzey bir programlama dilidir. Storm, topolojisini uygulamak için Python'u destekler. Python, yayma, sabitleme, onaylama ve günlüğe kaydetme işlemlerini destekler.
Bildiğiniz gibi cıvatalar herhangi bir dilde tanımlanabilir. Başka bir dilde yazılmış cıvatalar alt süreçler olarak yürütülür ve Storm bu alt süreçlerle stdin / stdout üzerinden JSON mesajları ile iletişim kurar. Öncelikle python bağlamayı destekleyen örnek bir WordCount cıvatası alın.
public static class WordCount implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
İşte sınıf WordCount uygular IRichBoltarayüz ve python uygulamasıyla çalışan süper yöntem bağımsız değişkeni "splitword.py" ile çalışıyor. Şimdi "splitword.py" adlı bir python uygulaması oluşturun.
import storm
class WordCountBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordCountBolt().run()
Bu, belirli bir cümledeki kelimeleri sayan Python için örnek uygulama. Benzer şekilde, diğer destekleyici dillerle de bağlanabilirsiniz.