Apache Flink - คู่มือฉบับย่อ

ความก้าวหน้าของข้อมูลในช่วง 10 ปีที่ผ่านมามีมากมายมหาศาล สิ่งนี้ทำให้เกิดคำว่า 'Big Data' ไม่มีขนาดข้อมูลคงที่ซึ่งคุณสามารถเรียกได้ว่าเป็นข้อมูลขนาดใหญ่ ข้อมูลใด ๆ ที่ระบบดั้งเดิม (RDBMS) ของคุณไม่สามารถจัดการได้คือ Big Data ข้อมูลขนาดใหญ่นี้สามารถอยู่ในรูปแบบที่มีโครงสร้างกึ่งโครงสร้างหรือไม่มีโครงสร้าง ในขั้นต้นข้อมูลมีสามมิติ ได้แก่ Volume, Velocity, Variety ตอนนี้มิติได้ไปไกลกว่าแค่สาม Vs ตอนนี้เราได้เพิ่ม Vs อื่น ๆ - ความจริงความถูกต้องช่องโหว่มูลค่าความแปรปรวนและอื่น ๆ

Big Data นำไปสู่การเกิดขึ้นของเครื่องมือและเฟรมเวิร์กมากมายที่ช่วยในการจัดเก็บและประมวลผลข้อมูล มีกรอบข้อมูลขนาดใหญ่ที่เป็นที่นิยมอยู่สองสามตัวเช่น Hadoop, Spark, Hive, Pig, Storm และ Zookeeper นอกจากนี้ยังเปิดโอกาสให้สร้างผลิตภัณฑ์ Next Gen ในหลายโดเมนเช่น Healthcare, Finance, Retail, E-Commerce และอื่น ๆ

ไม่ว่าจะเป็น MNC หรือสตาร์ทอัพทุกคนต่างก็ใช้ประโยชน์จาก Big Data เพื่อจัดเก็บและประมวลผลและตัดสินใจอย่างชาญฉลาด

ในแง่ของข้อมูลขนาดใหญ่มีสองประเภทของการประมวลผล -

  • การประมวลผลแบทช์
  • การประมวลผลแบบเรียลไทม์

การประมวลผลตามข้อมูลที่รวบรวมในช่วงเวลาหนึ่งเรียกว่าการประมวลผลแบบกลุ่ม ตัวอย่างเช่นผู้จัดการธนาคารต้องการประมวลผลข้อมูลหนึ่งเดือนที่ผ่านมา (เก็บรวบรวมเมื่อเวลาผ่านไป) เพื่อทราบจำนวนเช็คที่ถูกยกเลิกใน 1 เดือนที่ผ่านมา

การประมวลผลตามข้อมูลทันทีเพื่อให้ได้ผลลัพธ์ทันทีเรียกว่าการประมวลผลแบบเรียลไทม์ ตัวอย่างเช่นผู้จัดการธนาคารได้รับการแจ้งเตือนการฉ้อโกงทันทีหลังจากเกิดธุรกรรมการฉ้อโกง (ผลลัพธ์ทันที)

ตารางด้านล่างแสดงความแตกต่างระหว่างการประมวลผลแบบเป็นกลุ่มและแบบเรียลไทม์ -

การประมวลผลแบทช์ การประมวลผลแบบเรียลไทม์

ไฟล์คงที่

สตรีมเหตุการณ์

ประมวลผลเป็นระยะ ๆ เป็นนาทีชั่วโมงวันและอื่น ๆ

ดำเนินการทันที

นาโนวินาที

ข้อมูลที่ผ่านมาในการจัดเก็บดิสก์

ในหน่วยความจำ

ตัวอย่าง - การสร้างบิล

ตัวอย่าง - การแจ้งเตือนธุรกรรม ATM

ทุกวันนี้การประมวลผลแบบเรียลไทม์ถูกนำมาใช้มากในทุกองค์กร ใช้กรณีต่างๆเช่นการตรวจจับการฉ้อโกงการแจ้งเตือนแบบเรียลไทม์ในการดูแลสุขภาพและการแจ้งเตือนการโจมตีเครือข่ายต้องการการประมวลผลข้อมูลทันทีแบบเรียลไทม์ ความล่าช้าแม้เพียงไม่กี่มิลลิวินาทีอาจส่งผลกระทบอย่างมาก

เครื่องมือที่เหมาะสำหรับกรณีการใช้งานแบบเรียลไทม์เช่นนี้คือเครื่องมือที่สามารถป้อนข้อมูลเป็นสตรีมและไม่ใช่แบทช์ Apache Flink เป็นเครื่องมือประมวลผลแบบเรียลไทม์

Apache Flink เป็นกรอบการประมวลผลแบบเรียลไทม์ซึ่งสามารถประมวลผลข้อมูลสตรีมมิ่ง เป็นกรอบการประมวลผลสตรีมโอเพ่นซอร์สสำหรับแอปพลิเคชันแบบเรียลไทม์ที่มีประสิทธิภาพสูงปรับขนาดได้และแม่นยำ มีรูปแบบการสตรีมที่แท้จริงและไม่ใช้ข้อมูลอินพุตเป็นแบตช์หรือไมโครแบทช์

Apache Flink ก่อตั้งโดย บริษัท Data Artisans และได้รับการพัฒนาภายใต้ Apache License โดย Apache Flink Community ชุมชนนี้มีผู้ร่วมให้ข้อมูลมากกว่า 479 คนและจนถึงขณะนี้มากกว่า 15,000 คน

ระบบนิเวศบน Apache Flink

แผนภาพด้านล่างแสดงชั้นต่างๆของ Apache Flink Ecosystem -

การจัดเก็บ

Apache Flink มีตัวเลือกมากมายสำหรับการอ่าน / เขียนข้อมูล ด้านล่างนี้คือรายการพื้นที่เก็บข้อมูลพื้นฐาน -

  • HDFS (ระบบไฟล์แบบกระจาย Hadoop)
  • ระบบไฟล์ในเครื่อง
  • S3
  • RDBMS (MySQL, Oracle, MS SQL ฯลฯ )
  • MongoDB
  • HBase
  • อาปาเช่คาฟคา
  • Apache Flume

ปรับใช้

คุณสามารถปรับใช้ Apache Fink ในโหมดโลคัลโหมดคลัสเตอร์หรือบนคลาวด์ โหมดคลัสเตอร์สามารถเป็นแบบสแตนด์อโลน, YARN, MESOS

บนคลาวด์สามารถปรับใช้ Flink บน AWS หรือ GCP ได้

เคอร์เนล

นี่คือเลเยอร์รันไทม์ซึ่งจัดเตรียมการประมวลผลแบบกระจายความทนทานต่อข้อผิดพลาดความน่าเชื่อถือความสามารถในการประมวลผลซ้ำแบบเนทีฟและอื่น ๆ

API และไลบรารี

นี่คือเลเยอร์บนสุดและชั้นที่สำคัญที่สุดของ Apache Flink มี Dataset API ซึ่งดูแลการประมวลผลชุดงานและ Datastream API ซึ่งดูแลการประมวลผลสตรีม มีไลบรารีอื่น ๆ เช่น Flink ML (สำหรับการเรียนรู้ของเครื่อง), Gelly (สำหรับการประมวลผลกราฟ), ตารางสำหรับ SQL เลเยอร์นี้มอบความสามารถที่หลากหลายให้กับ Apache Flink

Apache Flink ทำงานบนสถาปัตยกรรม Kappa สถาปัตยกรรม Kappa มีโปรเซสเซอร์เดียว - สตรีมซึ่งถือว่าอินพุตทั้งหมดเป็นสตรีมและเอ็นจิ้นการสตรีมประมวลผลข้อมูลแบบเรียลไทม์ ข้อมูลแบทช์ในสถาปัตยกรรมคัปปาเป็นกรณีพิเศษของการสตรีม

แผนภาพต่อไปนี้แสดงไฟล์ Apache Flink Architecture.

แนวคิดหลักในสถาปัตยกรรม Kappa คือการจัดการข้อมูลทั้งแบบแบทช์และแบบเรียลไทม์ผ่านระบบประมวลผลแบบสตรีมเดียว

เฟรมเวิร์กข้อมูลขนาดใหญ่ส่วนใหญ่ทำงานบนสถาปัตยกรรม Lambda ซึ่งมีโปรเซสเซอร์แยกต่างหากสำหรับข้อมูลแบตช์และสตรีม ในสถาปัตยกรรมแลมบ์ดาคุณมีโค้ดเบสแยกกันสำหรับการดูแบตช์และสตรีม สำหรับการสืบค้นและรับผลลัพธ์จำเป็นต้องผสานโค้ดเบส การไม่ดูแลฐานข้อมูล / มุมมองที่แยกจากกันและการรวมเข้าด้วยกันเป็นความเจ็บปวด แต่สถาปัตยกรรม Kappa ช่วยแก้ปัญหานี้ได้เนื่องจากมีเพียงมุมมองเดียว - แบบเรียลไทม์ดังนั้นจึงไม่จำเป็นต้องรวมโค้ดเบสเข้าด้วยกัน

นั่นไม่ได้หมายความว่าสถาปัตยกรรม Kappa จะเข้ามาแทนที่สถาปัตยกรรม Lambda โดยสมบูรณ์ขึ้นอยู่กับกรณีการใช้งานและแอปพลิเคชันที่ตัดสินใจว่าสถาปัตยกรรมใดจะดีกว่า

แผนภาพต่อไปนี้แสดงสถาปัตยกรรมการดำเนินงานของ Apache Flink

โปรแกรม

เป็นโค้ดส่วนหนึ่งที่คุณเรียกใช้บน Flink Cluster

ลูกค้า

มีหน้าที่รับโค้ด (โปรแกรม) และสร้างกราฟกระแสข้อมูลงานจากนั้นส่งต่อไปยัง JobManager นอกจากนี้ยังดึงผลลัพธ์ของงาน

JobManager

หลังจากได้รับ Job Dataflow Graph จาก Client แล้วจะมีหน้าที่สร้างกราฟการดำเนินการ จะมอบหมายงานให้กับ TaskManagers ในคลัสเตอร์และดูแลการดำเนินการของงาน

ผู้จัดการงาน

มีหน้าที่รับผิดชอบในการดำเนินงานทั้งหมดที่ได้รับมอบหมายจาก JobManager TaskManagers ทั้งหมดรันงานในสล็อตแยกต่างหากในความขนานที่ระบุ มีหน้าที่ส่งสถานะของงานไปยัง JobManager

คุณสมบัติของ Apache Flink

คุณสมบัติของ Apache Flink มีดังต่อไปนี้ -

  • มีโปรเซสเซอร์สตรีมมิ่งซึ่งสามารถรันได้ทั้งโปรแกรมแบตช์และสตรีม

  • สามารถประมวลผลข้อมูลด้วยความเร็วที่รวดเร็ว

  • API พร้อมใช้งานใน Java, Scala และ Python

  • จัดเตรียม API สำหรับการดำเนินการทั่วไปทั้งหมดซึ่งง่ายมากสำหรับโปรแกรมเมอร์ที่จะใช้

  • ประมวลผลข้อมูลด้วยเวลาแฝงต่ำ (นาโนวินาที) และปริมาณงานสูง

  • ยอมรับความผิดได้ หากโหนดแอปพลิเคชันหรือฮาร์ดแวร์ล้มเหลวจะไม่มีผลต่อคลัสเตอร์

  • สามารถทำงานร่วมกับ Apache Hadoop, Apache MapReduce, Apache Spark, HBase และเครื่องมือข้อมูลขนาดใหญ่อื่น ๆ ได้อย่างง่ายดาย

  • การจัดการในหน่วยความจำสามารถปรับแต่งเพื่อการคำนวณที่ดีขึ้น

  • สามารถปรับขนาดได้สูงและสามารถปรับขนาดได้ไม่เกินหลายพันโหนดในคลัสเตอร์

  • Windowing มีความยืดหยุ่นมากใน Apache Flink

  • ให้การประมวลผลกราฟการเรียนรู้ของเครื่องไลบรารีการประมวลผลเหตุการณ์ที่ซับซ้อน

ต่อไปนี้เป็นข้อกำหนดของระบบในการดาวน์โหลดและทำงานบน Apache Flink -

ระบบปฏิบัติการที่แนะนำ

  • Microsoft Windows 10
  • Ubuntu 16.04 LTS
  • Apple macOS 10.13 / High Sierra

ความต้องการหน่วยความจำ

  • หน่วยความจำ - ขั้นต่ำ 4 GB แนะนำ 8 GB
  • พื้นที่จัดเก็บ - 30 GB

Note - Java 8 ต้องพร้อมใช้งานพร้อมกับตัวแปรสภาพแวดล้อมที่กำหนดไว้แล้ว

ก่อนเริ่มต้นด้วยการตั้งค่า / การติดตั้ง Apache Flink ให้เราตรวจสอบว่าเราติดตั้ง Java 8 ในระบบของเราหรือไม่

Java - เวอร์ชัน

ตอนนี้เราจะดำเนินการต่อโดยดาวน์โหลด Apache Flink

wget http://mirrors.estointernet.in/apache/flink/flink-1.7.1/flink-1.7.1-bin-scala_2.11.tgz

ตอนนี้คลายการบีบอัดไฟล์ tar

tar -xzf flink-1.7.1-bin-scala_2.11.tgz

ไปที่โฮมไดเร็กทอรีของ Flink

cd flink-1.7.1/

เริ่ม Flink Cluster

./bin/start-cluster.sh

เปิดเบราว์เซอร์ Mozilla และไปที่ URL ด้านล่างมันจะเปิด Flink Web Dashboard

http://localhost:8081

นี่คือลักษณะของส่วนติดต่อผู้ใช้ของ Apache Flink Dashboard

ขณะนี้คลัสเตอร์ Flink พร้อมทำงานแล้ว

Flink มีชุด API ที่สมบูรณ์ซึ่งนักพัฒนาสามารถทำการเปลี่ยนแปลงได้ทั้งข้อมูลแบทช์และแบบเรียลไทม์ การเปลี่ยนแปลงที่หลากหลายรวมถึงการทำแผนที่การกรองการเรียงลำดับการเข้าร่วมการจัดกลุ่มและการรวมกลุ่ม การแปลงเหล่านี้โดย Apache Flink ดำเนินการกับข้อมูลแบบกระจาย ให้เราพูดคุยเกี่ยวกับข้อเสนอ APIs Apache Flink ที่แตกต่างกัน

API ชุดข้อมูล

Dataset API ใน Apache Flink ใช้เพื่อดำเนินการกับข้อมูลเป็นกลุ่มในช่วงเวลาหนึ่ง API นี้สามารถใช้ได้ใน Java, Scala และ Python สามารถใช้การเปลี่ยนแปลงประเภทต่างๆในชุดข้อมูลเช่นการกรองการทำแผนที่การรวมการรวมและการจัดกลุ่ม

ชุดข้อมูลถูกสร้างขึ้นจากแหล่งที่มาเช่นไฟล์ในเครื่องหรือโดยการอ่านไฟล์จากแหล่งที่มาเฉพาะและข้อมูลผลลัพธ์สามารถเขียนบนซิงก์ต่างๆเช่นไฟล์แบบกระจายหรือเทอร์มินัลบรรทัดคำสั่ง API นี้รองรับทั้งภาษาโปรแกรม Java และ Scala

นี่คือโปรแกรม Wordcount ของ Dataset API -

public class WordCountProg {
   public static void main(String[] args) throws Exception {
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      DataSet<String> text = env.fromElements(
      "Hello",
      "My Dataset API Flink Program");

      DataSet<Tuple2<String, Integer>> wordCounts = text
      .flatMap(new LineSplitter())
      .groupBy(0)
      .sum(1);

      wordCounts.print();
   }

   public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
         for (String word : line.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}

DataStream API

API นี้ใช้สำหรับจัดการข้อมูลในสตรีมแบบต่อเนื่อง คุณสามารถดำเนินการต่างๆเช่นการกรองการทำแผนที่การกำหนดหน้าต่างการรวมข้อมูลสตรีม มีแหล่งข้อมูลต่างๆในสตรีมข้อมูลนี้เช่นคิวข้อความไฟล์ซ็อกเก็ตสตรีมและข้อมูลผลลัพธ์สามารถเขียนบนซิงก์ต่างๆเช่นเทอร์มินัลบรรทัดคำสั่ง ทั้งภาษาโปรแกรม Java และ Scala รองรับ API นี้

นี่คือโปรแกรม Wordcount แบบสตรีมของ DataStream API ซึ่งคุณมีสตรีมจำนวนคำอย่างต่อเนื่องและข้อมูลจะถูกจัดกลุ่มในหน้าต่างที่สอง

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCountProg {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream<Tuple2<String, Integer>> dataStream = env
      .socketTextStream("localhost", 9999)
      .flatMap(new Splitter())
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1);
      dataStream.print();
      env.execute("Streaming WordCount Example");
   }
   public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
         for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}

Table API เป็น API เชิงสัมพันธ์ที่มี SQL เหมือนกับภาษานิพจน์ API นี้ทำได้ทั้งการประมวลผลแบบแบตช์และสตรีม สามารถฝังกับ Java และ Scala Dataset และ Datastream API ได้ คุณสามารถสร้างตารางจากชุดข้อมูลและ Datastreams ที่มีอยู่หรือจากแหล่งข้อมูลภายนอก ด้วย API เชิงสัมพันธ์นี้คุณสามารถดำเนินการต่างๆเช่นเข้าร่วมรวมเลือกและกรอง ไม่ว่าอินพุตจะเป็นแบตช์หรือสตรีมความหมายของแบบสอบถามจะยังคงเหมือนเดิม

นี่คือตัวอย่างโปรแกรม Table API -

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)

// register an output Table
tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")

// execute
env.execute()

ในบทนี้เราจะเรียนรู้วิธีสร้างแอปพลิเคชัน Flink

เปิด Eclipse IDE คลิกที่ New Project และเลือก Java Project

ตั้งชื่อโครงการและคลิกที่ Finish

ตอนนี้คลิกที่ Finish ตามที่แสดงในภาพหน้าจอต่อไปนี้

ตอนนี้คลิกขวาที่ src และไปที่ New >> Class

ตั้งชื่อชั้นเรียนแล้วคลิกที่ Finish

คัดลอกและวางโค้ดด้านล่างในตัวแก้ไข

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {

   // *************************************************************************
   // PROGRAM
   // *************************************************************************
   public static void main(String[] args) throws Exception {
      final ParameterTool params = ParameterTool.fromArgs(args);
      // set up the execution environment
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);
      // get input data
      DataSet<String> text = env.readTextFile(params.get("input"));
      DataSet<Tuple2<String, Integer>> counts =
      // split up the lines in pairs (2-tuples) containing: (word,1)
      text.flatMap(new Tokenizer())
      // group by the tuple field "0" and sum up tuple field "1"
      .groupBy(0)
      .sum(1);
      // emit result
      if (params.has("output")) {
         counts.writeAsCsv(params.get("output"), "\n", " ");
         // execute program
         env.execute("WordCount Example");
      } else {
         System.out.println("Printing result to stdout. Use --output to specify output path.");
         counts.print();
      }
   }
   
   // *************************************************************************
   // USER FUNCTIONS
   // *************************************************************************
   public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
         // normalize and split the line
         String[] tokens = value.toLowerCase().split("\\W+");
         // emit the pairs
         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<>(token, 1));
            }
         }
      }
   }
}

คุณจะได้รับข้อผิดพลาดมากมายในตัวแก้ไขเนื่องจากต้องเพิ่มไลบรารี Flink ในโปรเจ็กต์นี้

คลิกขวาที่โปรเจ็กต์ >> Build Path >> Configure Build Path

เลือกแท็บ Libraries และคลิกที่ Add External JARs

ไปที่ไดเรกทอรี lib ของ Flink เลือกไลบรารีทั้งหมด 4 ไลบรารีแล้วคลิกตกลง

ไปที่แท็บ Order and Export เลือกไลบรารีทั้งหมดแล้วคลิกตกลง

คุณจะเห็นว่าไม่มีข้อผิดพลาดอีกต่อไป

ตอนนี้ให้เราส่งออกแอปพลิเคชันนี้ คลิกขวาที่โครงการและคลิกที่ส่งออก

เลือกไฟล์ JAR แล้วคลิกถัดไป

ระบุเส้นทางปลายทางแล้วคลิกถัดไป

คลิกที่ถัดไป>

คลิกที่ Browse เลือกคลาสหลัก (WordCount) แล้วคลิก Finish

Note - คลิกตกลงในกรณีที่คุณได้รับคำเตือน

เรียกใช้คำสั่งด้านล่าง มันจะเรียกใช้แอปพลิเคชัน Flink ที่คุณเพิ่งสร้างขึ้น

./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output

ในบทนี้เราจะเรียนรู้วิธีเรียกใช้โปรแกรม Flink

ให้เราเรียกใช้ตัวอย่าง Flink wordcount บนคลัสเตอร์ Flink

ไปที่โฮมไดเร็กทอรีของ Flink และเรียกใช้คำสั่งด้านล่างในเทอร์มินัล

bin/flink run examples/batch/WordCount.jar -input README.txt -output /home/ubuntu/flink-1.7.1/output.txt

ไปที่แดชบอร์ด Flink คุณจะสามารถดูงานที่เสร็จสมบูรณ์พร้อมรายละเอียด

หากคุณคลิกที่งานที่เสร็จสมบูรณ์คุณจะเห็นภาพรวมโดยละเอียดของงาน

ในการตรวจสอบผลลัพธ์ของโปรแกรม wordcount ให้รันคำสั่งด้านล่างในเทอร์มินัล

cat output.txt

ในบทนี้เราจะเรียนรู้เกี่ยวกับไลบรารีต่างๆของ Apache Flink

การประมวลผลเหตุการณ์ที่ซับซ้อน (CEP)

FlinkCEP เป็น API ใน Apache Flink ซึ่งวิเคราะห์รูปแบบเหตุการณ์บนข้อมูลสตรีมมิ่งแบบต่อเนื่อง เหตุการณ์เหล่านี้ใกล้เคียงกับเวลาจริงซึ่งมีทรูพุตสูงและเวลาแฝงต่ำ API นี้ใช้กับข้อมูลเซนเซอร์เป็นส่วนใหญ่ซึ่งมาในแบบเรียลไทม์และมีความซับซ้อนในการประมวลผล

CEP วิเคราะห์รูปแบบของอินพุตสตรีมและให้ผลลัพธ์เร็ว ๆ นี้ มีความสามารถในการแจ้งเตือนแบบเรียลไทม์และการแจ้งเตือนในกรณีที่รูปแบบเหตุการณ์มีความซับซ้อน FlinkCEP สามารถเชื่อมต่อกับแหล่งอินพุตประเภทต่างๆและวิเคราะห์รูปแบบในนั้น

นี่คือลักษณะของสถาปัตยกรรมตัวอย่างที่มี CEP -

ข้อมูลเซ็นเซอร์จะมาจากแหล่งต่างๆ Kafka จะทำหน้าที่เป็นกรอบการส่งข้อความแบบกระจายซึ่งจะกระจายสตรีมไปยัง Apache Flink และ FlinkCEP จะวิเคราะห์รูปแบบเหตุการณ์ที่ซับซ้อน

คุณสามารถเขียนโปรแกรมใน Apache Flink สำหรับการประมวลผลเหตุการณ์ที่ซับซ้อนโดยใช้ Pattern API ช่วยให้คุณสามารถตัดสินใจรูปแบบเหตุการณ์ที่จะตรวจจับจากข้อมูลสตรีมแบบต่อเนื่อง ด้านล่างนี้คือรูปแบบ CEP ที่ใช้บ่อยที่สุด -

เริ่ม

ใช้เพื่อกำหนดสถานะเริ่มต้น โปรแกรมต่อไปนี้แสดงให้เห็นว่ามีการกำหนดไว้อย่างไรในโปรแกรม Flink -

Pattern<Event, ?> next = start.next("next");

ที่ไหน

ใช้เพื่อกำหนดเงื่อนไขตัวกรองในสถานะปัจจุบัน

patternState.where(new FilterFunction <Event>() {  
   @Override 
      public boolean filter(Event value) throws Exception { 
   } 
});

ต่อไป

ใช้เพื่อผนวกสถานะรูปแบบใหม่และเหตุการณ์การจับคู่ที่จำเป็นเพื่อส่งผ่านรูปแบบก่อนหน้า

Pattern<Event, ?> next = start.next("next");

ติดตามโดย

ใช้เพื่อต่อท้ายสถานะรูปแบบใหม่ แต่ที่นี่เหตุการณ์อื่นอาจเกิดขึ้น b / w สองเหตุการณ์ที่ตรงกัน

Pattern<Event, ?> followedBy = start.followedBy("next");

เกลลี่

Graph API ของ Apache Flink คือ Gelly Gelly ใช้ในการวิเคราะห์กราฟบนแอพพลิเคชั่น Flink โดยใช้ชุดวิธีการและยูทิลิตี้ คุณสามารถวิเคราะห์กราฟขนาดใหญ่โดยใช้ Apache Flink API แบบกระจายด้วย Gelly มีไลบรารีกราฟอื่น ๆ เช่น Apache Giraph เพื่อจุดประสงค์เดียวกัน แต่เนื่องจาก Gelly ใช้กับ Apache Flink จึงใช้ API เดียว สิ่งนี้มีประโยชน์มากจากมุมมองของการพัฒนาและการดำเนินงาน

ให้เราเรียกใช้ตัวอย่างโดยใช้ Apache Flink API - Gelly

ประการแรกคุณต้องคัดลอกไฟล์ Gelly jar 2 ไฟล์จากไดเร็กทอรี opt ของ Apache Flink ไปยังไดเร็กทอรี lib จากนั้นเรียกใช้ขวดตัวอย่าง flink-gelly

cp opt/flink-gelly* lib/ 
./bin/flink run examples/gelly/flink-gelly-examples_*.jar

ตอนนี้ให้เราเรียกใช้ตัวอย่างเพจแรงก์

เพจแรงก์คำนวณคะแนนต่อจุดยอดซึ่งเป็นผลรวมของคะแนนเพจแรงก์ที่ส่งผ่านขอบ คะแนนของจุดยอดแต่ละจุดจะถูกแบ่งเท่า ๆ กันระหว่างขอบนอก จุดยอดที่มีคะแนนสูงจะเชื่อมโยงกับจุดยอดอื่น ๆ ที่มีคะแนนสูง

ผลลัพธ์ประกอบด้วยจุดยอด ID และคะแนนเพจแรงก์

usage: flink run examples/flink-gelly-examples_<version>.jar --algorithm PageRank [algorithm options] --input <input> [input options] --output <output> [output options] 

./bin/flink run examples/gelly/flink-gelly-examples_*.jar --algorithm PageRank --input CycleGraph --vertex_count 2 --output Print

ไลบรารี Machine Learning ของ Apache Flink เรียกว่า FlinkML เนื่องจากการใช้แมชชีนเลิร์นนิงเพิ่มขึ้นอย่างทวีคูณในช่วง 5 ปีที่ผ่านมาชุมชน Flink จึงตัดสินใจเพิ่ม APO การเรียนรู้ของเครื่องนี้ในระบบนิเวศ รายชื่อผู้ร่วมให้ข้อมูลและอัลกอริทึมเพิ่มขึ้นใน FlinkML API นี้ยังไม่ได้เป็นส่วนหนึ่งของการแจกแจงแบบไบนารี

นี่คือตัวอย่างของการถดถอยเชิงเส้นโดยใช้ FlinkML -

// LabeledVector is a feature vector with a label (class or real value)
val trainingData: DataSet[LabeledVector] = ...
val testingData: DataSet[Vector] = ...

// Alternatively, a Splitter is used to break up a DataSet into training and testing data.
val dataSet: DataSet[LabeledVector] = ...
val trainTestData: DataSet[TrainTestDataSet] = Splitter.trainTestSplit(dataSet)
val trainingData: DataSet[LabeledVector] = trainTestData.training
val testingData: DataSet[Vector] = trainTestData.testing.map(lv => lv.vector)
val mlr = MultipleLinearRegression()

.setStepsize(1.0)
.setIterations(100)
.setConvergenceThreshold(0.001)
mlr.fit(trainingData)

// The fitted model can now be used to make predictions
val predictions: DataSet[LabeledVector] = mlr.predict(testingData)

ข้างใน flink-1.7.1/examples/batch/เส้นทางคุณจะพบไฟล์ KMeans.jar ให้เราเรียกใช้ตัวอย่าง FlinkML ตัวอย่างนี้

โปรแกรมตัวอย่างนี้ทำงานโดยใช้จุดเริ่มต้นและชุดข้อมูลเซนทรอยด์

./bin/flink run examples/batch/KMeans.jar --output Print

ในบทนี้เราจะเข้าใจกรณีทดสอบบางส่วนใน Apache Flink

Apache Flink - Bouygues Telecom

Bouygues Telecom เป็นองค์กรโทรคมนาคมที่ใหญ่ที่สุดแห่งหนึ่งในฝรั่งเศส มีผู้ใช้บริการโทรศัพท์มือถือ 11+ ล้านรายและลูกค้าคงที่มากกว่า 2.5 ล้านราย Bouygues ได้ยินเกี่ยวกับ Apache Flink เป็นครั้งแรกในการประชุมกลุ่ม Hadoop ซึ่งจัดขึ้นที่ปารีส ตั้งแต่นั้นมาพวกเขาใช้ Flink สำหรับการใช้งานหลายกรณี พวกเขาประมวลผลข้อความหลายพันล้านข้อความในหนึ่งวันแบบเรียลไทม์ผ่าน Apache Flink

นี่คือสิ่งที่ Bouygues พูดเกี่ยวกับ Apache Flink: "เราจบลงด้วย Flink เพราะระบบรองรับการสตรีมที่แท้จริง - ทั้งในระดับ API และในระดับรันไทม์ทำให้เรามีความสามารถในการตั้งโปรแกรมและเวลาแฝงต่ำที่เรากำลังมองหานอกจากนี้ เราสามารถทำให้ระบบของเราทำงานด้วย Flink ได้ในเวลาอันสั้นเมื่อเทียบกับโซลูชันอื่น ๆ ซึ่งส่งผลให้มีทรัพยากรสำหรับนักพัฒนาที่มีอยู่มากขึ้นสำหรับการขยายตรรกะทางธุรกิจในระบบ "

ที่ Bouygues ประสบการณ์ของลูกค้าเป็นสิ่งสำคัญสูงสุด พวกเขาวิเคราะห์ข้อมูลแบบเรียลไทม์เพื่อให้ข้อมูลเชิงลึกแก่วิศวกรด้านล่าง -

  • ประสบการณ์ของลูกค้าแบบเรียลไทม์ผ่านเครือข่ายของพวกเขา

  • สิ่งที่เกิดขึ้นทั่วโลกบนเครือข่าย

  • การประเมินและการดำเนินงานของเครือข่าย

พวกเขาสร้างระบบที่เรียกว่า LUX (Logged User Experience) ซึ่งประมวลผลข้อมูลบันทึกขนาดใหญ่จากอุปกรณ์เครือข่ายพร้อมการอ้างอิงข้อมูลภายในเพื่อให้ตัวบ่งชี้คุณภาพของประสบการณ์ซึ่งจะบันทึกประสบการณ์ของลูกค้าและสร้างฟังก์ชันที่น่าตกใจเพื่อตรวจจับความล้มเหลวในการใช้ข้อมูลภายใน 60 วินาที.

เพื่อให้บรรลุเป้าหมายนี้พวกเขาต้องการเฟรมเวิร์กที่สามารถรับข้อมูลจำนวนมากแบบเรียลไทม์ติดตั้งง่ายและมีชุด API มากมายสำหรับการประมวลผลข้อมูลที่สตรีม Apache Flink เหมาะอย่างยิ่งสำหรับ Bouygues Telecom

Apache Flink - อาลีบาบา

อาลีบาบาเป็น บริษัท ค้าปลีกอีคอมเมิร์ซที่ใหญ่ที่สุดในโลกโดยมีรายได้ 394 พันล้านดอลลาร์ในปี 2558 การค้นหาของอาลีบาบาเป็นจุดเริ่มต้นของลูกค้าทั้งหมดซึ่งจะแสดงการค้นหาทั้งหมดและแนะนำตามนั้น

Alibaba ใช้ Apache Flink ในเครื่องมือค้นหาเพื่อแสดงผลแบบเรียลไทม์ด้วยความแม่นยำและความเกี่ยวข้องสูงสุดสำหรับผู้ใช้แต่ละราย

อาลีบาบากำลังมองหากรอบการทำงานซึ่งก็คือ -

  • คล่องตัวมากในการบำรุงรักษาโค้ดเบสเดียวสำหรับกระบวนการโครงสร้างพื้นฐานการค้นหาทั้งหมด

  • ให้เวลาแฝงต่ำสำหรับการเปลี่ยนแปลงความพร้อมใช้งานของผลิตภัณฑ์บนเว็บไซต์

  • สอดคล้องและคุ้มค่า

Apache Flink มีคุณสมบัติตรงตามข้อกำหนดข้างต้นทั้งหมด พวกเขาต้องการเฟรมเวิร์กซึ่งมีกลไกการประมวลผลเดียวและสามารถประมวลผลทั้งแบตช์และสตรีมข้อมูลด้วยเอ็นจิ้นเดียวกันและนั่นคือสิ่งที่ Apache Flink ทำ

นอกจากนี้ยังใช้ Blink ซึ่งเป็นเวอร์ชันแยกสำหรับ Flink เพื่อตอบสนองความต้องการเฉพาะบางประการสำหรับการค้นหาของพวกเขา พวกเขายังใช้ Table API ของ Apache Flink ซึ่งมีการปรับปรุงเล็กน้อยสำหรับการค้นหา

นี่คือสิ่งที่อาลีบาบาพูดเกี่ยวกับ apache Flink: " เมื่อมองย้อนกลับไปไม่ต้องสงสัยเลยว่าเป็นปีที่ยิ่งใหญ่สำหรับ Blink และ Flink ที่ Alibaba ไม่มีใครคิดว่าเราจะก้าวหน้ามากขนาดนี้ใน 1 ปีและเรารู้สึกขอบคุณทุกคน ผู้คนที่ช่วยเหลือเราในชุมชน Flink ได้รับการพิสูจน์แล้วว่าทำงานในระดับที่ใหญ่มากเรามีความมุ่งมั่นมากขึ้นที่จะทำงานร่วมกับชุมชนต่อไปเพื่อขับเคลื่อน Flink ไปข้างหน้า! "

นี่คือตารางที่ครอบคลุมซึ่งแสดงการเปรียบเทียบระหว่างกรอบข้อมูลขนาดใหญ่ที่ได้รับความนิยมสูงสุดสามกรอบ ได้แก่ Apache Flink, Apache Spark และ Apache Hadoop

Apache Hadoop Apache Spark Apache Flink

Year of Origin

พ.ศ. 2548 2552 2552

Place of Origin

MapReduce (Google) Hadoop (Yahoo) มหาวิทยาลัยแคลิฟอร์เนียเบิร์กลีย์ มหาวิทยาลัยเทคนิคแห่งเบอร์ลิน

Data Processing Engine

แบทช์ แบทช์ กระแส

Processing Speed

ช้ากว่า Spark และ Flink เร็วกว่า Hadoop 100 เท่า เร็วกว่าประกายไฟ

Programming Languages

Java, C, C ++, Ruby, Groovy, Perl, Python Java, Scala, python และ R Java และ Scala

Programming Model

MapReduce ชุดข้อมูลแบบกระจายที่ยืดหยุ่น (RDD) กระแสข้อมูล Cyclic

Data Transfer

แบทช์ แบทช์ ท่อและแบทช์

Memory Management

ตามดิสก์ JVM จัดการ มีการจัดการที่ใช้งานอยู่

Latency

ต่ำ ปานกลาง ต่ำ

Throughput

ปานกลาง สูง สูง

Optimization

คู่มือ คู่มือ อัตโนมัติ

API

ระดับต่ำ ระดับสูง ระดับสูง

Streaming Support

NA Spark Streaming Flink Streaming

SQL Support

รังอิมพาลา SparkSQL ตาราง API และ SQL

Graph Support

NA GraphX เกลลี่

Machine Learning Support

NA SparkML FlinkML

ตารางเปรียบเทียบที่เราเห็นในบทที่แล้วสรุปตัวชี้ได้สวยมาก Apache Flink เป็นเฟรมเวิร์กที่เหมาะสมที่สุดสำหรับการประมวลผลแบบเรียลไทม์และกรณีการใช้งาน ระบบเอนจิ้นเดียวมีเอกลักษณ์เฉพาะซึ่งสามารถประมวลผลทั้งแบทช์และสตรีมข้อมูลด้วย API ที่แตกต่างกันเช่น Dataset และ DataStream

ไม่ได้หมายความว่า Hadoop และ Spark ไม่อยู่ในเกมการเลือกเฟรมเวิร์กข้อมูลขนาดใหญ่ที่เหมาะสมที่สุดนั้นขึ้นอยู่กับและแตกต่างกันไปในแต่ละกรณีการใช้งาน อาจมีกรณีการใช้งานหลายกรณีที่อาจใช้ Hadoop และ Flink หรือ Spark และ Flink ร่วมกันได้

อย่างไรก็ตาม Flink เป็นเฟรมเวิร์กที่ดีที่สุดสำหรับการประมวลผลแบบเรียลไทม์ในปัจจุบัน การเติบโตของ Apache Flink นั้นน่าทึ่งมากและจำนวนผู้มีส่วนร่วมในชุมชนก็เพิ่มขึ้นทุกวัน

แฮปปี้วูบวาบ!