Apache Storm - ตรีศูล
ตรีศูลเป็นส่วนขยายของสตอร์ม เช่นเดียวกับ Storm Trident ก็ได้รับการพัฒนาโดย Twitter เหตุผลหลักที่อยู่เบื้องหลังการพัฒนาตรีศูลคือการให้นามธรรมระดับสูงที่ด้านบนของ Storm พร้อมกับการประมวลผลสตรีมแบบมีสถานะและการสืบค้นแบบกระจายเวลาแฝงต่ำ
Trident ใช้พวยกาและโบลต์ แต่ส่วนประกอบระดับต่ำเหล่านี้สร้างขึ้นโดยอัตโนมัติโดย Trident ก่อนดำเนินการ ตรีศูลมีฟังก์ชันตัวกรองการรวมการจัดกลุ่มและการรวม
ตรีศูลประมวลผลสตรีมเป็นชุดของแบทช์ซึ่งเรียกว่าธุรกรรม โดยทั่วไปขนาดของแบทช์เล็ก ๆ เหล่านั้นจะอยู่ในลำดับของสิ่งทอหลายพันหรือหลายล้านตัวขึ้นอยู่กับสตรีมอินพุต วิธีนี้ตรีศูลแตกต่างจาก Storm ซึ่งดำเนินการประมวลผลแบบทูเพิล - ทูเพิล
แนวคิดการประมวลผลแบบกลุ่มมีความคล้ายคลึงกับธุรกรรมฐานข้อมูลมาก ทุกธุรกรรมจะถูกกำหนดรหัสธุรกรรม ธุรกรรมจะถือว่าสำเร็จเมื่อการประมวลผลทั้งหมดเสร็จสมบูรณ์ อย่างไรก็ตามความล้มเหลวในการประมวลผลสิ่งที่เกิดขึ้นอย่างใดอย่างหนึ่งของธุรกรรมจะทำให้ธุรกรรมทั้งหมดถูกส่งใหม่ สำหรับแต่ละแบทช์ตรีศูลจะเรียก beginCommit ที่จุดเริ่มต้นของธุรกรรมและกระทำในตอนท้ายของธุรกรรม
โทโพโลยีตรีศูล
Trident API แสดงตัวเลือกที่ง่ายในการสร้างโทโพโลยีตรีศูลโดยใช้คลาส“ TridentTopology” โดยทั่วไปโทโพโลยีตรีศูลจะรับอินพุตสตรีมจากพวยกาและทำลำดับการทำงานตามลำดับ (ตัวกรองการรวมการจัดกลุ่ม ฯลฯ ) บนสตรีม Storm Tuple ถูกแทนที่ด้วย Trident Tuple และ Bolts จะถูกแทนที่ด้วยปฏิบัติการ สามารถสร้างโทโพโลยีตรีศูลอย่างง่ายได้ดังนี้ -
TridentTopology topology = new TridentTopology();
ตรีศูลทูเปิล
ตรีศูลทูเปิลเป็นรายการค่าที่ตั้งชื่อ อินเทอร์เฟซ TridentTuple เป็นแบบจำลองข้อมูลของโทโพโลยีตรีศูล อินเทอร์เฟซ TridentTuple เป็นหน่วยพื้นฐานของข้อมูลที่สามารถประมวลผลโดยโทโพโลยีตรีศูล
ตรีศูลพวยกา
Trident spout คล้ายกับ Storm spout พร้อมตัวเลือกเพิ่มเติมในการใช้คุณสมบัติของ Trident จริงๆแล้วเรายังสามารถใช้ IRichSpout ซึ่งเราเคยใช้ในโทโพโลยีของ Storm ได้ แต่จะไม่สามารถทำธุรกรรมได้และเราจะไม่สามารถใช้ประโยชน์จาก Trident ได้
พวยกาพื้นฐานที่มีฟังก์ชันทั้งหมดในการใช้คุณสมบัติของ Trident คือ "ITridentSpout" สนับสนุนทั้งความหมายทรานแซคชันและทึบแสง พวยกาอื่น ๆ ได้แก่ IBatchSpout, IPartitionedTridentSpout และ IOpaquePartitionedTridentSpout
นอกจากพวยกาทั่วไปแล้วตรีศูลยังมีตัวอย่างการใช้งานพวยกาตรีศูลอีกมากมาย หนึ่งในนั้นคือ FeederBatchSpout spout ซึ่งเราสามารถใช้เพื่อส่งรายชื่อของ tuples ตรีศูลได้อย่างง่ายดายโดยไม่ต้องกังวลเกี่ยวกับการประมวลผลแบบแบทช์การขนานกัน ฯลฯ
การสร้าง FeederBatchSpout และการป้อนข้อมูลสามารถทำได้ดังภาพด้านล่าง -
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
ตรีศูลปฏิบัติการ
ตรีศูลอาศัย“ Trident Operation” ในการประมวลผลอินพุตของสิ่งทอสามมิติ Trident API มีการดำเนินการในตัวจำนวนมากเพื่อจัดการกับการประมวลผลสตรีมแบบง่ายถึงซับซ้อน การดำเนินการเหล่านี้มีตั้งแต่การตรวจสอบความถูกต้องอย่างง่ายไปจนถึงการจัดกลุ่มที่ซับซ้อนและการรวมตัวกันของสิ่งทอสามมิติ ให้เราดำเนินการที่สำคัญที่สุดและใช้บ่อยที่สุด
กรอง
ตัวกรองเป็นวัตถุที่ใช้ในการตรวจสอบความถูกต้องของอินพุต ตัวกรองตรีศูลรับส่วนย่อยของฟิลด์ทูเปิลตรีศูลเป็นอินพุตและส่งคืนค่าจริงหรือเท็จขึ้นอยู่กับว่าเงื่อนไขบางอย่างเป็นที่พอใจหรือไม่ หากส่งคืนค่า true ทูเปิลจะถูกเก็บไว้ในเอาต์พุตสตรีม มิฉะนั้นทูเปิลจะถูกลบออกจากสตรีม โดยพื้นฐานแล้วตัวกรองจะสืบทอดมาจากไฟล์BaseFilter คลาสและใช้ isKeepวิธี. นี่คือตัวอย่างการใช้งานตัวกรอง -
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(1) % 2 == 0;
}
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2]
[1, 4]
ฟังก์ชันตัวกรองสามารถเรียกใช้ในโทโพโลยีโดยใช้วิธีการ "แต่ละ" คลาส "ฟิลด์" สามารถใช้เพื่อระบุอินพุต (เซตย่อยของทูเพิลตรีศูล) โค้ดตัวอย่างมีดังนี้ -
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())
ฟังก์ชัน
Functionเป็นวัตถุที่ใช้ในการดำเนินการอย่างง่ายกับทูเปิลตรีศูลตัวเดียว ใช้ฟิลด์ทูเปิลตรีศูลย่อยและปล่อยฟิลด์ทูเปิลตรีศูลใหม่เป็นศูนย์หรือมากกว่า
Function โดยทั่วไปจะสืบทอดมาจากไฟล์ BaseFunction ชั้นเรียนและใช้ executeวิธี. ตัวอย่างการใช้งานได้รับด้านล่าง -
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
int a = tuple.getInteger(0);
int b = tuple.getInteger(1);
collector.emit(new Values(a + b));
}
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2, 3]
[1, 3, 4]
[1, 4, 5]
เช่นเดียวกับการกรองการทำงานของฟังก์ชันสามารถเรียกใช้ในโทโพโลยีโดยใช้ eachวิธี. โค้ดตัวอย่างมีดังนี้ -
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
การรวม
Aggregation เป็นอ็อบเจ็กต์ที่ใช้ในการดำเนินการรวมบนชุดอินพุตหรือพาร์ติชันหรือสตรีม ตรีศูลมีการรวมตัวสามประเภท มีดังนี้ -
aggregate- รวบรวมทูเปิลตรีศูลแต่ละชุดแยกกัน ในระหว่างกระบวนการรวม tuples จะถูกแบ่งพาร์ติชันในขั้นต้นโดยใช้การจัดกลุ่มส่วนกลางเพื่อรวมพาร์ติชันทั้งหมดของชุดงานเดียวกันให้เป็นพาร์ติชันเดียว
partitionAggregate- รวมแต่ละพาร์ติชันแทนที่จะเป็นกลุ่มตรีศูลทูเปิลทั้งหมด ผลลัพธ์ของการรวมพาร์ติชันจะแทนที่ทูเพิลอินพุตอย่างสมบูรณ์ ผลลัพธ์ของการรวมพาร์ติชันประกอบด้วยทูเพิลฟิลด์เดียว
persistentaggregate - รวมค่าทูเพิลตรีศูลทั้งหมดในทุกแบทช์และเก็บผลลัพธ์ไว้ในหน่วยความจำหรือฐานข้อมูล
TridentTopology topology = new TridentTopology();
// aggregate operation
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.aggregate(new Count(), new Fields(“count”))
// partitionAggregate operation
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.partitionAggregate(new Count(), new Fields(“count"))
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
การดำเนินการรวมสามารถสร้างได้โดยใช้ CombinerAggregator, ReducerAggregator หรืออินเทอร์เฟซ Aggregator ทั่วไป ตัวรวบรวม "จำนวน" ที่ใช้ในตัวอย่างข้างต้นเป็นหนึ่งในตัวรวบรวมแบบบิลด์อินซึ่งใช้งานโดยใช้ "CombinerAggregator" การใช้งานมีดังนี้ -
public class Count implements CombinerAggregator<Long> {
@Override
public Long init(TridentTuple tuple) {
return 1L;
}
@Override
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
@Override
public Long zero() {
return 0L;
}
}
การจัดกลุ่ม
การดำเนินการจัดกลุ่มเป็นการดำเนินการที่สร้างขึ้นและสามารถเรียกใช้โดยไฟล์ groupByวิธี. เมธอด groupBy แบ่งพาร์ติชันสตรีมใหม่โดยทำ partitionBy บนฟิลด์ที่ระบุจากนั้นภายในแต่ละพาร์ติชันจะจัดกลุ่ม tuples เข้าด้วยกันซึ่งฟิลด์กลุ่มมีค่าเท่ากัน โดยปกติเราจะใช้“ groupBy” ร่วมกับ“ persistentAggregate” เพื่อรับการรวมกลุ่ม โค้ดตัวอย่างมีดังนี้ -
TridentTopology topology = new TridentTopology();
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.groupBy(new Fields(“d”)
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
การรวมและการเข้าร่วม
การรวมและการเข้าร่วมสามารถทำได้โดยใช้วิธี "รวม" และ "เข้าร่วม" ตามลำดับ การผสานรวมสตรีมตั้งแต่หนึ่งรายการขึ้นไป การเข้าร่วมคล้ายกับการรวมยกเว้นข้อเท็จจริงที่ว่าการเข้าร่วมใช้ฟิลด์ทูเปิลตรีศูลจากทั้งสองด้านเพื่อตรวจสอบและเข้าร่วมสองสตรีม ยิ่งไปกว่านั้นการเข้าร่วมจะทำงานภายใต้ระดับแบทช์เท่านั้น โค้ดตัวอย่างมีดังนี้ -
TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
new Fields("key", "a", "b", "c"));
การบำรุงรักษาของรัฐ
ตรีศูลเป็นกลไกสำหรับการบำรุงรักษาสถานะ ข้อมูลสถานะสามารถจัดเก็บไว้ในโทโพโลยีได้เองมิฉะนั้นคุณสามารถจัดเก็บไว้ในฐานข้อมูลแยกต่างหากได้เช่นกัน เหตุผลคือเพื่อรักษาสถานะว่าหากทูเปิลใดล้มเหลวในระหว่างการประมวลผลทูเปิลที่ล้มเหลวจะถูกลองใหม่ สิ่งนี้สร้างปัญหาขณะอัปเดตสถานะเนื่องจากคุณไม่แน่ใจว่าสถานะของทูเปิลนี้ได้รับการอัปเดตก่อนหน้านี้หรือไม่ หากทูเปิลล้มเหลวก่อนอัปเดตสถานะการลองทูเปิลอีกครั้งจะทำให้สถานะเสถียร อย่างไรก็ตามหากทูเปิลล้มเหลวหลังจากอัปเดตสถานะแล้วการลองทูเปิลเดิมซ้ำอีกครั้งจะเพิ่มจำนวนในฐานข้อมูลและทำให้สถานะไม่เสถียร เราต้องทำตามขั้นตอนต่อไปนี้เพื่อให้แน่ใจว่าข้อความถูกประมวลผลเพียงครั้งเดียว -
ประมวลผล tuples ในแบทช์เล็ก ๆ
กำหนด ID เฉพาะให้กับแต่ละชุด หากลองชุดใหม่อีกครั้งชุดนั้นจะได้รับรหัสเฉพาะเดียวกัน
การอัปเดตสถานะจะเรียงลำดับระหว่างแบทช์ ตัวอย่างเช่นการอัปเดตสถานะของชุดที่สองจะไม่สามารถทำได้จนกว่าการอัปเดตสถานะสำหรับชุดงานแรกจะเสร็จสิ้น
RPC แบบกระจาย
RPC แบบกระจายใช้เพื่อสืบค้นและดึงผลลัพธ์จากโทโพโลยีตรีศูล Storm มีเซิร์ฟเวอร์ RPC แบบกระจายในตัว เซิร์ฟเวอร์ RPC แบบกระจายจะรับคำขอ RPC จากไคลเอนต์และส่งต่อไปยังโทโพโลยี โทโพโลยีประมวลผลคำขอและส่งผลลัพธ์ไปยังเซิร์ฟเวอร์ RPC แบบกระจายซึ่งถูกเปลี่ยนเส้นทางโดยเซิร์ฟเวอร์ RPC แบบกระจายไปยังไคลเอนต์ การสืบค้น RPC แบบกระจายของ Trident จะดำเนินการเหมือนกับการสืบค้น RPC ทั่วไปยกเว้นข้อเท็จจริงที่ว่าการสืบค้นเหล่านี้ทำงานควบคู่กันไป
ควรใช้ตรีศูลเมื่อใด
เช่นเดียวกับการใช้งานหลายกรณีหากข้อกำหนดคือการประมวลผลแบบสอบถามเพียงครั้งเดียวเราสามารถบรรลุได้โดยการเขียนโทโพโลยีในตรีศูล ในทางกลับกันการประมวลผลครั้งเดียวในกรณีของสตอร์มจะเป็นเรื่องยาก ดังนั้นตรีศูลจะเป็นประโยชน์สำหรับกรณีการใช้งานที่คุณต้องการเพียงครั้งเดียวในการประมวลผล ตรีศูลไม่ได้มีไว้สำหรับกรณีการใช้งานทั้งหมดโดยเฉพาะอย่างยิ่งกรณีการใช้งานที่มีประสิทธิภาพสูงเนื่องจากเพิ่มความซับซ้อนให้กับ Storm และจัดการสถานะ
ตัวอย่างการทำงานของตรีศูล
เรากำลังจะแปลงแอปพลิเคชันตัววิเคราะห์บันทึกการโทรของเราที่ทำในส่วนก่อนหน้านี้เป็นกรอบงานตรีศูล แอปพลิเคชั่น Trident จะค่อนข้างง่ายเมื่อเทียบกับพายุธรรมดาด้วย API ระดับสูง โดยทั่วไป Storm จะต้องดำเนินการอย่างใดอย่างหนึ่งของ Function, Filter, Aggregate, GroupBy, Join and Merge operation in Trident ในที่สุดเราจะเริ่มเซิร์ฟเวอร์ DRPC โดยใช้ไฟล์LocalDRPC คลาสและค้นหาคำหลักโดยใช้ execute วิธีการของคลาส LocalDRPC
การจัดรูปแบบข้อมูลการโทร
วัตถุประสงค์ของคลาส FormatCall คือการจัดรูปแบบข้อมูลการโทรซึ่งประกอบด้วย "หมายเลขผู้โทร" และ "หมายเลขผู้รับ" รหัสโปรแกรมที่สมบูรณ์มีดังนี้ -
การเข้ารหัส: FormatCall.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class FormatCall extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String fromMobileNumber = tuple.getString(0);
String toMobileNumber = tuple.getString(1);
collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
}
}
CSVSplit
วัตถุประสงค์ของคลาส CSVSplit คือการแยกสตริงอินพุตตาม "ลูกน้ำ (,)" และเปล่งทุกคำในสตริง ฟังก์ชันนี้ใช้เพื่อแยกวิเคราะห์อาร์กิวเมนต์อินพุตของการสืบค้นแบบกระจาย รหัสที่สมบูรณ์มีดังนี้ -
การเข้ารหัส: CSVSplit.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class CSVSplit extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
for(String word: tuple.getString(0).split(",")) {
if(word.length() > 0) {
collector.emit(new Values(word));
}
}
}
}
Log Analyzer
นี่คือแอปพลิเคชันหลัก ในขั้นต้นแอปพลิเคชันจะเริ่มต้น TridentTopology และป้อนข้อมูลผู้โทรโดยใช้FeederBatchSpout. สามารถสร้างสตรีมโทโพโลยีตรีศูลได้โดยใช้newStreamวิธีการของคลาส TridentTopology ในทำนองเดียวกันสตรีม DRPC โทโพโลยีตรีศูลสามารถสร้างได้โดยใช้newDRCPStreamวิธีการของคลาส TridentTopology สามารถสร้างเซิร์ฟเวอร์ DRCP อย่างง่ายโดยใช้คลาส LocalDRPCLocalDRPCมีวิธีการดำเนินการเพื่อค้นหาคำหลักบางคำ รหัสที่สมบูรณ์จะได้รับด้านล่าง
การเข้ารหัส: LogAnalyserTrident.java
import java.util.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;
import com.google.common.collect.ImmutableList;
public class LogAnalyserTrident {
public static void main(String[] args) throws Exception {
System.out.println("Log Analyser Trident");
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
"toMobileNumber", "duration"));
TridentState callCounts = topology
.newStream("fixed-batch-spout", testSpout)
.each(new Fields("fromMobileNumber", "toMobileNumber"),
new FormatCall(), new Fields("call"))
.groupBy(new Fields("call"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(),
new Fields("count"));
LocalDRPC drpc = new LocalDRPC();
topology.newDRPCStream("call_count", drpc)
.stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));
topology.newDRPCStream("multiple_call_count", drpc)
.each(new Fields("args"), new CSVSplit(), new Fields("call"))
.groupBy(new Fields("call"))
.stateQuery(callCounts, new Fields("call"), new MapGet(),
new Fields("count"))
.each(new Fields("call", "count"), new Debug())
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident", conf, topology.build());
Random randomGenerator = new Random();
int idx = 0;
while(idx < 10) {
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123402", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123403", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123404", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123402",
"1234123403", randomGenerator.nextInt(60))));
idx = idx + 1;
}
System.out.println("DRPC : Query starts");
System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
System.out.println(drpc.execute("multiple_call_count", "1234123401 -
1234123402,1234123401 - 1234123403"));
System.out.println("DRPC : Query ends");
cluster.shutdown();
drpc.shutdown();
// DRPCClient client = new DRPCClient("drpc.server.location", 3772);
}
}
การสร้างและเรียกใช้แอปพลิเคชัน
แอปพลิเคชันที่สมบูรณ์มีรหัส Java สามรหัส มีดังนี้ -
- FormatCall.java
- CSVSplit.java
- LogAnalyerTrident.java
สามารถสร้างแอปพลิเคชันได้โดยใช้คำสั่งต่อไปนี้ -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
แอปพลิเคชันสามารถทำงานได้โดยใช้คำสั่งต่อไปนี้ -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
เอาต์พุต
เมื่อแอปพลิเคชันเริ่มทำงานแอปพลิเคชันจะแสดงรายละเอียดทั้งหมดเกี่ยวกับกระบวนการเริ่มต้นคลัสเตอร์การประมวลผลการดำเนินงานเซิร์ฟเวอร์ DRPC และข้อมูลไคลเอ็นต์และสุดท้ายคือกระบวนการปิดระบบคลัสเตอร์ เอาต์พุตนี้จะแสดงบนคอนโซลดังที่แสดงด้านล่าง
DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends