Apache Storm - ตัวอย่างการทำงาน

เราได้อ่านรายละเอียดทางเทคนิคหลักของ Apache Storm แล้วและตอนนี้ก็ถึงเวลาเขียนโค้ดสถานการณ์ง่ายๆ

สถานการณ์ - ตัววิเคราะห์บันทึกการโทรมือถือ

การโทรมือถือและระยะเวลาจะได้รับเป็นอินพุตไปยัง Apache Storm และ Storm จะประมวลผลและจัดกลุ่มการโทรระหว่างผู้โทรและผู้รับรายเดียวกันและจำนวนการโทรทั้งหมด

การสร้างรางน้ำ

Spout เป็นส่วนประกอบที่ใช้ในการสร้างข้อมูล โดยทั่วไปพวยกาจะใช้อินเทอร์เฟซ IRichSpout อินเทอร์เฟซ“ IRichSpout” มีวิธีการที่สำคัญดังต่อไปนี้ -

  • open- จัดเตรียมพวยกาพร้อมสภาพแวดล้อมในการดำเนินการ ตัวดำเนินการจะเรียกใช้วิธีนี้เพื่อเริ่มต้นพวยกา

  • nextTuple - ปล่อยข้อมูลที่สร้างขึ้นผ่านตัวรวบรวม

  • close - วิธีนี้เรียกว่าเมื่อพวยกากำลังจะปิดเครื่อง

  • declareOutputFields - ประกาศสคีมาผลลัพธ์ของทูเปิล

  • ack - รับทราบว่ามีการประมวลผลทูเปิลเฉพาะ

  • fail - ระบุว่าทูเปิลเฉพาะไม่ได้ถูกประมวลผลและไม่ต้องประมวลผลซ้ำ

เปิด

ลายเซ็นของ open วิธีการมีดังนี้ -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - จัดเตรียมการกำหนดค่าพายุสำหรับพวยกานี้

  • context - ให้ข้อมูลที่สมบูรณ์เกี่ยวกับตำแหน่งพวยกาภายในโทโพโลยีรหัสงานข้อมูลอินพุตและเอาต์พุต

  • collector - ช่วยให้เราสามารถปล่อยทูเปิลที่จะถูกประมวลผลโดยสลักเกลียว

nextTuple

ลายเซ็นของ nextTuple วิธีการมีดังนี้ -

nextTuple()

nextTuple () ถูกเรียกเป็นระยะ ๆ จากลูปเดียวกันกับเมธอด ack () และ fail () จะต้องปล่อยการควบคุมเธรดเมื่อไม่มีงานที่ต้องทำเพื่อให้มีโอกาสเรียกใช้วิธีการอื่น ดังนั้นบรรทัดแรกของ nextTuple จะตรวจสอบว่าการประมวลผลเสร็จสิ้นหรือไม่ ในกรณีนี้ควรพักอย่างน้อยหนึ่งมิลลิวินาทีเพื่อลดภาระในโปรเซสเซอร์ก่อนที่จะกลับมา

ปิด

ลายเซ็นของ close วิธีการมีดังนี้ -

close()

ประกาศOutputFields

ลายเซ็นของ declareOutputFields วิธีการมีดังนี้ -

declareOutputFields(OutputFieldsDeclarer declarer)

declarer - ใช้เพื่อประกาศรหัสสตรีมเอาต์พุตช่องเอาต์พุต ฯลฯ

วิธีนี้ใช้เพื่อระบุสคีมาเอาต์พุตของทูเปิล

ack

ลายเซ็นของ ack วิธีการมีดังนี้ -

ack(Object msgId)

วิธีนี้รับทราบว่ามีการประมวลผลทูเปิลเฉพาะ

ล้มเหลว

ลายเซ็นของ nextTuple วิธีการมีดังนี้ -

ack(Object msgId)

วิธีนี้แจ้งว่าทูเปิลเฉพาะยังไม่ได้รับการประมวลผลอย่างสมบูรณ์ Storm จะประมวลผลทูเพิลเฉพาะ

FakeCallLogReaderSpout

ในสถานการณ์ของเราเราจำเป็นต้องรวบรวมรายละเอียดบันทึกการโทร ข้อมูลของบันทึกการโทรประกอบด้วย

  • หมายเลขผู้โทร
  • หมายเลขผู้รับ
  • duration

เนื่องจากเราไม่มีข้อมูลบันทึกการโทรแบบเรียลไทม์เราจะสร้างบันทึกการโทรปลอม ข้อมูลปลอมจะถูกสร้างขึ้นโดยใช้คลาสสุ่ม รหัสโปรแกรมที่สมบูรณ์จะได้รับด้านล่าง

การเข้ารหัส - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

การสร้างกลอน

Bolt เป็นส่วนประกอบที่ใช้ tuples เป็นอินพุตประมวลผลทูเพิลและสร้างสิ่งทอปเปิลใหม่เป็นเอาต์พุต สลักเกลียวจะดำเนินการIRichBoltอินเตอร์เฟซ. ในโปรแกรมนี้คลาสโบลต์สองคลาสCallLogCreatorBolt และ CallLogCounterBolt ใช้ในการดำเนินการ

อินเทอร์เฟซ IRichBolt มีวิธีการดังต่อไปนี้ -

  • prepare- จัดเตรียมโบลต์พร้อมสภาพแวดล้อมในการดำเนินการ ตัวดำเนินการจะเรียกใช้วิธีนี้เพื่อเริ่มต้นพวยกา

  • execute - ประมวลผลอินพุตทูเพิลเดียว

  • cleanup - เรียกเมื่อโบลต์กำลังจะปิด

  • declareOutputFields - ประกาศสคีมาผลลัพธ์ของทูเปิล

เตรียม

ลายเซ็นของ prepare วิธีการมีดังนี้ -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf - จัดเตรียมการกำหนดค่า Storm สำหรับสายฟ้านี้

  • context - ให้ข้อมูลที่สมบูรณ์เกี่ยวกับตำแหน่งโบลต์ภายในโทโพโลยีรหัสงานข้อมูลอินพุตและเอาต์พุต ฯลฯ

  • collector - ช่วยให้เราสามารถปล่อยทูเปิลที่ประมวลผลได้

ดำเนินการ

ลายเซ็นของ execute วิธีการมีดังนี้ -

execute(Tuple tuple)

ที่นี่ tuple คือทูเพิลอินพุตที่จะประมวลผล

executeวิธีการประมวลผลทูเพิลเดียวในแต่ละครั้ง ข้อมูลทูเพิลสามารถเข้าถึงได้ด้วยเมธอด getValue ของคลาส Tuple ไม่จำเป็นต้องประมวลผลทูเพิลอินพุตทันที สามารถประมวลผลทูเปิลหลายตัวและส่งออกเป็นทูเพิลเอาต์พุตเดียว ทูเปิลที่ประมวลผลแล้วสามารถปล่อยออกมาได้โดยใช้คลาส OutputCollector

ทำความสะอาด

ลายเซ็นของ cleanup วิธีการมีดังนี้ -

cleanup()

ประกาศOutputFields

ลายเซ็นของ declareOutputFields วิธีการมีดังนี้ -

declareOutputFields(OutputFieldsDeclarer declarer)

นี่คือพารามิเตอร์ declarer ใช้เพื่อประกาศรหัสสตรีมเอาต์พุตฟิลด์เอาต์พุต ฯลฯ

วิธีนี้ใช้เพื่อระบุสคีมาเอาต์พุตของทูเปิล

บันทึกการโทรผู้สร้าง Bolt

ผู้สร้างบันทึกการโทรได้รับทูเปิลบันทึกการโทร ทูเปิลบันทึกการโทรมีหมายเลขผู้โทรหมายเลขผู้รับและระยะเวลาการโทร สายฟ้านี้สร้างค่าใหม่โดยการรวมหมายเลขผู้โทรและหมายเลขผู้รับ รูปแบบของค่าใหม่คือ "หมายเลขผู้โทร - หมายเลขผู้รับ" และตั้งชื่อเป็นช่องใหม่ว่า "โทร" รหัสที่สมบูรณ์จะได้รับด้านล่าง

การเข้ารหัส - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

บันทึกการโทร Counter Bolt

สลักตัวนับบันทึกการโทรรับสายและระยะเวลาเป็นทูเพิล สลักเกลียวนี้เริ่มต้นวัตถุพจนานุกรม (แผนที่) ในวิธีการเตรียม ในexecuteวิธีการตรวจสอบทูเปิลและสร้างรายการใหม่ในออบเจ็กต์พจนานุกรมสำหรับทุกค่า "การเรียก" ใหม่ในทูเพิลและกำหนดค่า 1 ในออบเจ็กต์พจนานุกรม สำหรับรายการที่มีอยู่แล้วในพจนานุกรมก็แค่เพิ่มค่า กล่าวง่ายๆคือสลักเกลียวนี้จะบันทึกการโทรและจำนวนของมันในวัตถุพจนานุกรม แทนที่จะบันทึกการโทรและจำนวนครั้งในพจนานุกรมเรายังสามารถบันทึกลงในแหล่งข้อมูลได้อีกด้วย รหัสโปรแกรมที่สมบูรณ์มีดังนี้ -

การเข้ารหัส - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

การสร้างโทโพโลยี

โทโพโลยีแบบพายุนั้นเป็นโครงสร้างแบบ Thrift คลาส TopologyBuilder มีวิธีการที่ง่ายและสะดวกในการสร้างโทโพโลยีที่ซับซ้อน คลาส TopologyBuilder มีวิธีการตั้งค่าพวยกา(setSpout) และตั้งสลักเกลียว (setBolt). ในที่สุด TopologyBuilder ได้ createTopology เพื่อสร้างโทโพโลยี ใช้ข้อมูลโค้ดต่อไปนี้เพื่อสร้างโทโพโลยี -

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping และ fieldsGrouping วิธีการช่วยในการตั้งค่าการจัดกลุ่มสตรีมสำหรับพวยกาและสลักเกลียว

คลัสเตอร์ภายใน

เพื่อวัตถุประสงค์ในการพัฒนาเราสามารถสร้างคลัสเตอร์ภายในโดยใช้ออบเจ็กต์ "LocalCluster" จากนั้นส่งโทโพโลยีโดยใช้เมธอด "submitTopology" ของคลาส "LocalCluster" หนึ่งในอาร์กิวเมนต์สำหรับ "submitTopology" คืออินสแตนซ์ของคลาส "Config" คลาส "Config" ใช้เพื่อตั้งค่าตัวเลือกการกำหนดค่าก่อนที่จะส่งโทโพโลยี ตัวเลือกการกำหนดค่านี้จะรวมเข้ากับการกำหนดค่าคลัสเตอร์ในขณะรันและส่งไปยังงานทั้งหมด (พวยกาและโบลต์) ด้วยวิธีการเตรียม เมื่อส่งโทโพโลยีไปยังคลัสเตอร์แล้วเราจะรอ 10 วินาทีเพื่อให้คลัสเตอร์คำนวณโทโพโลยีที่ส่งจากนั้นปิดคลัสเตอร์โดยใช้วิธีการ "ปิดระบบ" ของ "LocalCluster" รหัสโปรแกรมที่สมบูรณ์มีดังนี้ -

การเข้ารหัส - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

การสร้างและเรียกใช้แอปพลิเคชัน

แอปพลิเคชันที่สมบูรณ์มีรหัส Java สี่ตัว พวกเขาคือ -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

สามารถสร้างแอปพลิเคชันได้โดยใช้คำสั่งต่อไปนี้ -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

แอปพลิเคชันสามารถรันได้โดยใช้คำสั่งต่อไปนี้ -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

เอาต์พุต

เมื่อแอปพลิเคชันเริ่มทำงานแอปพลิเคชันจะแสดงรายละเอียดทั้งหมดเกี่ยวกับกระบวนการเริ่มต้นคลัสเตอร์การประมวลผลพวยกาและโบลต์และสุดท้ายคือกระบวนการปิดระบบคลัสเตอร์ ใน "CallLogCounterBolt" เราได้พิมพ์การโทรและรายละเอียดการนับ ข้อมูลนี้จะแสดงบนคอนโซลดังนี้ -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

ภาษาที่ไม่ใช่ JVM

โทโพโลยีแบบพายุถูกนำมาใช้โดยอินเทอร์เฟซ Thrift ซึ่งทำให้ง่ายต่อการส่งโทโพโลยีในภาษาใด ๆ Storm รองรับ Ruby, Python และภาษาอื่น ๆ อีกมากมาย ลองมาดูการผูกไพ ธ อน

การผูก Python

Python เป็นภาษาการเขียนโปรแกรมระดับสูงที่ตีความโต้ตอบเชิงวัตถุและระดับสูง Storm สนับสนุน Python เพื่อใช้โทโพโลยี Python สนับสนุนการเปล่งการทอดสมอการจับและการบันทึก

ดังที่คุณทราบสลักเกลียวสามารถกำหนดเป็นภาษาใดก็ได้ สลักเกลียวที่เขียนในภาษาอื่นจะดำเนินการเป็นกระบวนการย่อยและ Storm จะสื่อสารกับกระบวนการย่อยเหล่านั้นด้วยข้อความ JSON ผ่าน stdin / stdout ก่อนอื่นให้ใช้ตัวอย่าง Bolt WordCount ที่รองรับการผูก python

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

ที่นี่ชั้นเรียน WordCount ใช้ IRichBoltอินเทอร์เฟซและทำงานโดยใช้ python อาร์กิวเมนต์ super method ระบุ "splitword.py" ตอนนี้สร้างการใช้งาน python ชื่อ "splitword.py"

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

นี่คือตัวอย่างการใช้งาน Python ที่นับจำนวนคำในประโยคที่กำหนด ในทำนองเดียวกันคุณสามารถเชื่อมโยงกับภาษาสนับสนุนอื่น ๆ ได้เช่นกัน