Apache Flink - Hướng dẫn nhanh

Sự tiến bộ của dữ liệu trong 10 năm qua là rất lớn; điều này đã tạo ra một thuật ngữ 'Dữ liệu lớn'. Không có kích thước cố định của dữ liệu, mà bạn có thể gọi là dữ liệu lớn; bất kỳ dữ liệu nào mà hệ thống truyền thống (RDBMS) của bạn không thể xử lý là Dữ liệu lớn. Dữ liệu lớn này có thể ở định dạng có cấu trúc, bán cấu trúc hoặc không có cấu trúc. Ban đầu, có ba thứ nguyên đối với dữ liệu - Khối lượng, Vận tốc, Sự đa dạng. Các kích thước hiện đã vượt ra ngoài chỉ ba chữ V. Bây giờ chúng tôi đã thêm các V khác - Tính xác thực, Tính hợp lệ, Tính dễ bị tổn thương, Giá trị, Tính thay đổi, v.v.

Dữ liệu lớn dẫn đến sự xuất hiện của nhiều công cụ và khuôn khổ giúp lưu trữ và xử lý dữ liệu. Có một số khuôn khổ dữ liệu lớn phổ biến như Hadoop, Spark, Hive, Pig, Storm và Zookeeper. Nó cũng mang lại cơ hội để tạo ra các sản phẩm Thế hệ tiếp theo trong nhiều lĩnh vực như Chăm sóc sức khỏe, Tài chính, Bán lẻ, Thương mại điện tử, v.v.

Cho dù đó là một MNC hay một công ty khởi nghiệp, mọi người đều đang tận dụng Dữ liệu lớn để lưu trữ, xử lý và đưa ra các quyết định thông minh hơn.

Về mặt Dữ liệu lớn, có hai kiểu xử lý -

  • Xử lý hàng loạt
  • Xử lý thời gian thực

Xử lý dựa trên dữ liệu được thu thập theo thời gian được gọi là Xử lý hàng loạt. Ví dụ: một giám đốc ngân hàng muốn xử lý dữ liệu một tháng qua (được thu thập theo thời gian) để biết số séc đã bị hủy trong 1 tháng qua.

Xử lý dựa trên dữ liệu tức thì cho kết quả tức thì được gọi là Xử lý thời gian thực. Ví dụ, một giám đốc ngân hàng nhận được cảnh báo gian lận ngay sau khi một giao dịch gian lận (kết quả tức thì) xảy ra.

Bảng dưới đây liệt kê sự khác biệt giữa Xử lý hàng loạt và Thời gian thực -

Xử lý hàng loạt Xử lý thời gian thực

Tệp tĩnh

Luồng sự kiện

Được xử lý định kỳ theo phút, giờ, ngày, v.v.

Xử lý ngay lập tức

nano giây

Dữ liệu trước đây trên ổ lưu trữ

Trong bộ nhớ lưu trữ

Ví dụ - Tạo hóa đơn

Ví dụ - Thông báo giao dịch ATM

Ngày nay, xử lý thời gian thực đang được sử dụng rất nhiều trong mọi tổ chức. Các trường hợp sử dụng như phát hiện gian lận, cảnh báo thời gian thực trong chăm sóc sức khỏe và cảnh báo tấn công mạng yêu cầu xử lý dữ liệu tức thì theo thời gian thực; sự chậm trễ thậm chí vài mili giây có thể có tác động rất lớn.

Một công cụ lý tưởng cho các trường hợp sử dụng thời gian thực như vậy sẽ là một công cụ có thể nhập dữ liệu dưới dạng luồng chứ không phải hàng loạt. Apache Flink là công cụ xử lý thời gian thực.

Apache Flink là một khung xử lý thời gian thực có thể xử lý dữ liệu truyền trực tuyến. Nó là một khung xử lý dòng mã nguồn mở cho các ứng dụng thời gian thực hiệu suất cao, có thể mở rộng và chính xác. Nó có mô hình phát trực tuyến thực sự và không lấy dữ liệu đầu vào dưới dạng lô hoặc vi lô.

Apache Flink được thành lập bởi công ty Data Artisans và hiện được phát triển theo Giấy phép Apache của Apache Flink Community. Cộng đồng này có hơn 479 cộng tác viên và hơn 15500 cam kết cho đến nay.

Hệ sinh thái trên Apache Flink

Sơ đồ dưới đây cho thấy các lớp khác nhau của Hệ sinh thái liên kết Apache -

Lưu trữ

Apache Flink có nhiều tùy chọn từ đó nó có thể Đọc / Ghi dữ liệu. Dưới đây là danh sách lưu trữ cơ bản -

  • HDFS (Hệ thống tệp phân tán Hadoop)
  • Hệ thống tệp cục bộ
  • S3
  • RDBMS (MySQL, Oracle, MS SQL, v.v.)
  • MongoDB
  • HBase
  • Apache Kafka
  • Apache Flume

Triển khai

Bạn có thể triển khai Apache Fink ở chế độ cục bộ, chế độ cụm hoặc trên đám mây. Chế độ cụm có thể là độc lập, YARN, MESOS.

Trên đám mây, Flink có thể được triển khai trên AWS hoặc GCP.

Kernel

Đây là lớp thời gian chạy, cung cấp khả năng xử lý phân tán, khả năng chịu lỗi, độ tin cậy, khả năng xử lý lặp lại nguyên bản và hơn thế nữa.

API & Thư viện

Đây là lớp trên cùng và lớp quan trọng nhất của Apache Flink. Nó có API Dataset, xử lý hàng loạt và API Datastream, xử lý luồng. Có các thư viện khác như Flink ML (cho máy học), Gelly (để xử lý đồ thị), Tables cho SQL. Lớp này cung cấp các khả năng đa dạng cho Apache Flink.

Apache Flink hoạt động trên kiến ​​trúc Kappa. Kiến trúc Kappa có một bộ xử lý duy nhất - luồng, coi tất cả đầu vào là luồng và công cụ phát trực tuyến xử lý dữ liệu trong thời gian thực. Dữ liệu hàng loạt trong kiến ​​trúc kappa là một trường hợp đặc biệt của truyền trực tuyến.

Sơ đồ sau đây cho thấy Apache Flink Architecture.

Ý tưởng chính trong kiến ​​trúc Kappa là xử lý cả dữ liệu hàng loạt và dữ liệu thời gian thực thông qua một công cụ xử lý luồng duy nhất.

Hầu hết các khung dữ liệu lớn hoạt động trên kiến ​​trúc Lambda, có các bộ xử lý riêng biệt cho dữ liệu hàng loạt và truyền trực tuyến. Trong kiến ​​trúc Lambda, bạn có các cơ sở mã riêng biệt cho chế độ xem hàng loạt và luồng. Để truy vấn và nhận được kết quả, các cơ sở mã cần được hợp nhất. Không duy trì các cơ sở mã / chế độ xem riêng biệt và hợp nhất chúng là một điều khó khăn, nhưng kiến ​​trúc Kappa giải quyết vấn đề này vì nó chỉ có một chế độ xem - thời gian thực, do đó không cần hợp nhất các cơ sở mã.

Điều đó không có nghĩa là kiến ​​trúc Kappa thay thế kiến ​​trúc Lambda, nó hoàn toàn phụ thuộc vào trường hợp sử dụng và ứng dụng mà quyết định kiến ​​trúc nào sẽ phù hợp hơn.

Sơ đồ sau đây cho thấy kiến ​​trúc thực thi công việc Apache Flink.

Chương trình

Nó là một đoạn mã mà bạn chạy trên Flink Cluster.

Khách hàng

Nó chịu trách nhiệm lấy mã (chương trình) và xây dựng biểu đồ luồng dữ liệu công việc, sau đó chuyển nó đến JobManager. Nó cũng truy xuất kết quả Công việc.

JobManager

Sau khi nhận được Đồ thị luồng dữ liệu công việc từ Khách hàng, nó có trách nhiệm tạo đồ thị thực thi. Nó giao công việc cho TaskManagers trong cụm và giám sát việc thực hiện công việc.

Quản lý công việc

Nó chịu trách nhiệm thực hiện tất cả các nhiệm vụ đã được giao bởi JobManager. Tất cả các TaskManagers chạy các tác vụ trong các vị trí riêng biệt của chúng theo chế độ song song được chỉ định. Có trách nhiệm gửi trạng thái của các nhiệm vụ cho JobManager.

Các tính năng của Apache Flink

Các tính năng của Apache Flink như sau:

  • Nó có một bộ xử lý phát trực tuyến, có thể chạy cả chương trình hàng loạt và dòng.

  • Nó có thể xử lý dữ liệu với tốc độ cực nhanh.

  • API có sẵn trong Java, Scala và Python.

  • Cung cấp các API cho tất cả các hoạt động phổ biến, rất dễ dàng cho các lập trình viên sử dụng.

  • Xử lý dữ liệu với độ trễ thấp (nano giây) và thông lượng cao.

  • Khả năng chịu lỗi của nó. Nếu một nút, ứng dụng hoặc phần cứng bị lỗi, nó không ảnh hưởng đến cụm.

  • Có thể dễ dàng tích hợp với Apache Hadoop, Apache MapReduce, Apache Spark, HBase và các công cụ dữ liệu lớn khác.

  • Quản lý trong bộ nhớ có thể được tùy chỉnh để tính toán tốt hơn.

  • Nó có khả năng mở rộng cao và có thể mở rộng quy mô lên đến hàng nghìn nút trong một cụm.

  • Windowing rất linh hoạt trong Apache Flink.

  • Cung cấp các thư viện Xử lý đồ thị, Học máy, Xử lý sự kiện phức tạp.

Sau đây là các yêu cầu hệ thống để tải xuống và hoạt động trên Apache Flink -

Hệ điều hành được đề xuất

  • Microsoft Windows 10
  • Ubuntu 16.04 LTS
  • Apple macOS 10.13 / High Sierra

Yêu cầu bộ nhớ

  • Bộ nhớ - Tối thiểu 4 GB, Khuyến nghị 8 GB
  • Dung lượng lưu trữ - 30 GB

Note - Java 8 phải có sẵn với các biến môi trường đã được thiết lập.

Trước khi bắt đầu thiết lập / cài đặt Apache Flink, hãy kiểm tra xem chúng tôi đã cài đặt Java 8 trong hệ thống của mình chưa.

Java - phiên bản

Bây giờ chúng ta sẽ tiến hành bằng cách tải xuống Apache Flink.

wget http://mirrors.estointernet.in/apache/flink/flink-1.7.1/flink-1.7.1-bin-scala_2.11.tgz

Bây giờ, giải nén tệp tar.

tar -xzf flink-1.7.1-bin-scala_2.11.tgz

Đi tới thư mục chính của Flink.

cd flink-1.7.1/

Khởi động Cụm liên kết.

./bin/start-cluster.sh

Mở trình duyệt Mozilla và truy cập URL bên dưới, nó sẽ mở Bảng điều khiển web liên kết.

http://localhost:8081

Đây là giao diện người dùng của Apache Flink Dashboard trông như thế nào.

Bây giờ cụm Flink đang hoạt động.

Flink có một bộ API phong phú sử dụng để các nhà phát triển có thể thực hiện chuyển đổi trên cả dữ liệu hàng loạt và dữ liệu thời gian thực. Một loạt các biến đổi bao gồm ánh xạ, lọc, sắp xếp, nối, nhóm và tổng hợp. Các biến đổi này của Apache Flink được thực hiện trên dữ liệu phân tán. Hãy để chúng tôi thảo luận về các API khác nhau mà Apache Flink cung cấp.

API tập dữ liệu

API tập dữ liệu trong Apache Flink được sử dụng để thực hiện các hoạt động hàng loạt trên dữ liệu trong một khoảng thời gian. API này có thể được sử dụng trong Java, Scala và Python. Nó có thể áp dụng các loại biến đổi khác nhau trên tập dữ liệu như lọc, ánh xạ, tổng hợp, nối và nhóm.

Tập dữ liệu được tạo từ các nguồn như tệp cục bộ hoặc bằng cách đọc tệp từ một tệp tin cụ thể và dữ liệu kết quả có thể được ghi trên các tệp chìm khác nhau như tệp phân tán hoặc thiết bị đầu cuối dòng lệnh. API này được hỗ trợ bởi cả hai ngôn ngữ lập trình Java và Scala.

Đây là một chương trình Wordcount của Dataset API -

public class WordCountProg {
   public static void main(String[] args) throws Exception {
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      DataSet<String> text = env.fromElements(
      "Hello",
      "My Dataset API Flink Program");

      DataSet<Tuple2<String, Integer>> wordCounts = text
      .flatMap(new LineSplitter())
      .groupBy(0)
      .sum(1);

      wordCounts.print();
   }

   public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
         for (String word : line.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}

API DataStream

API này được sử dụng để xử lý dữ liệu trong luồng liên tục. Bạn có thể thực hiện các thao tác khác nhau như lọc, ánh xạ, cửa sổ, tổng hợp trên dữ liệu luồng. Có nhiều nguồn khác nhau trên luồng dữ liệu này như hàng đợi tin nhắn, tệp, luồng ổ cắm và dữ liệu kết quả có thể được viết trên các phần chìm khác nhau như thiết bị đầu cuối dòng lệnh. Cả hai ngôn ngữ lập trình Java và Scala đều hỗ trợ API này.

Đây là một chương trình Wordcount trực tuyến của DataStream API, nơi bạn có dòng đếm từ liên tục và dữ liệu được nhóm trong cửa sổ thứ hai.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCountProg {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream<Tuple2<String, Integer>> dataStream = env
      .socketTextStream("localhost", 9999)
      .flatMap(new Splitter())
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1);
      dataStream.print();
      env.execute("Streaming WordCount Example");
   }
   public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
         for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}

API bảng là một API quan hệ với SQL giống như ngôn ngữ biểu thức. API này có thể thực hiện cả xử lý hàng loạt và luồng. Nó có thể được nhúng với Java và Scala Dataset và Datastream APIs. Bạn có thể tạo bảng từ Tập dữ liệu và Luồng dữ liệu hiện có hoặc từ các nguồn dữ liệu bên ngoài. Thông qua API quan hệ này, bạn có thể thực hiện các thao tác như nối, tổng hợp, chọn và lọc. Cho dù đầu vào là hàng loạt hay dòng, ngữ nghĩa của truy vấn vẫn giống nhau.

Đây là một chương trình Table API mẫu -

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)

// register an output Table
tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")

// execute
env.execute()

Trong chương này, chúng ta sẽ học cách tạo một ứng dụng Flink.

Mở Eclipse IDE, nhấp vào Dự án mới và Chọn Dự án Java.

Đặt Tên dự án và nhấp vào Kết thúc.

Bây giờ, nhấp vào Kết thúc như được hiển thị trong ảnh chụp màn hình sau.

Bây giờ, nhấp chuột phải vào src và chuyển đến New >> Class.

Đặt tên lớp và nhấp vào Kết thúc.

Sao chép và dán đoạn mã dưới đây vào Trình chỉnh sửa.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {

   // *************************************************************************
   // PROGRAM
   // *************************************************************************
   public static void main(String[] args) throws Exception {
      final ParameterTool params = ParameterTool.fromArgs(args);
      // set up the execution environment
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);
      // get input data
      DataSet<String> text = env.readTextFile(params.get("input"));
      DataSet<Tuple2<String, Integer>> counts =
      // split up the lines in pairs (2-tuples) containing: (word,1)
      text.flatMap(new Tokenizer())
      // group by the tuple field "0" and sum up tuple field "1"
      .groupBy(0)
      .sum(1);
      // emit result
      if (params.has("output")) {
         counts.writeAsCsv(params.get("output"), "\n", " ");
         // execute program
         env.execute("WordCount Example");
      } else {
         System.out.println("Printing result to stdout. Use --output to specify output path.");
         counts.print();
      }
   }
   
   // *************************************************************************
   // USER FUNCTIONS
   // *************************************************************************
   public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
         // normalize and split the line
         String[] tokens = value.toLowerCase().split("\\W+");
         // emit the pairs
         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<>(token, 1));
            }
         }
      }
   }
}

Bạn sẽ gặp nhiều lỗi trong trình soạn thảo, vì các thư viện Flink cần được thêm vào dự án này.

Nhấp chuột phải vào dự án >> Đường dẫn xây dựng >> Định cấu hình Đường dẫn xây dựng.

Chọn tab Thư viện và nhấp vào Thêm JAR bên ngoài.

Vào thư mục lib của Flink, chọn tất cả 4 thư viện và nhấp vào OK.

Chuyển đến tab Đặt hàng và Xuất, chọn tất cả các thư viện và nhấp vào OK.

Bạn sẽ thấy rằng các lỗi không còn nữa.

Bây giờ, hãy để chúng tôi xuất ứng dụng này. Nhấp chuột phải vào dự án và nhấp vào Xuất.

Chọn tệp JAR và nhấp vào Tiếp theo

Đưa ra một đường dẫn đích và nhấp vào Tiếp theo

Nhấp vào Tiếp theo>

Bấm vào Duyệt, chọn lớp chính (WordCount) và bấm Kết thúc.

Note - Nhấn OK, trong trường hợp bạn nhận được bất kỳ cảnh báo nào.

Chạy lệnh dưới đây. Nó sẽ tiếp tục chạy ứng dụng Flink mà bạn vừa tạo.

./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output

Trong chương này, chúng ta sẽ học cách chạy một chương trình Flink.

Hãy để chúng tôi chạy ví dụ đếm từ Flink trên một cụm Flink.

Vào thư mục chính của Flink và chạy lệnh dưới đây trong terminal.

bin/flink run examples/batch/WordCount.jar -input README.txt -output /home/ubuntu/flink-1.7.1/output.txt

Vào bảng điều khiển Flink, bạn sẽ có thể xem một công việc đã hoàn thành với các chi tiết của nó.

Nếu bạn nhấp vào Công việc đã hoàn thành, bạn sẽ có tổng quan chi tiết về các công việc.

Để kiểm tra kết quả đầu ra của chương trình wordcount, hãy chạy lệnh dưới đây trong terminal.

cat output.txt

Trong chương này, chúng ta sẽ tìm hiểu về các thư viện khác nhau của Apache Flink.

Xử lý sự kiện phức tạp (CEP)

FlinkCEP là một API trong Apache Flink, phân tích các mẫu sự kiện trên dữ liệu phát trực tuyến liên tục. Những sự kiện này gần thời gian thực, có thông lượng cao và độ trễ thấp. API này được sử dụng chủ yếu trên dữ liệu Cảm biến, theo thời gian thực và rất phức tạp để xử lý.

CEP phân tích mô hình của luồng đầu vào và đưa ra kết quả rất sớm. Nó có khả năng cung cấp thông báo và cảnh báo thời gian thực trong trường hợp mẫu sự kiện phức tạp. FlinkCEP có thể kết nối với các loại nguồn đầu vào khác nhau và phân tích các mẫu trong đó.

Đây là cách kiến ​​trúc mẫu với CEP trông như thế này -

Dữ liệu cảm biến sẽ đến từ các nguồn khác nhau, Kafka sẽ hoạt động như một khung nhắn tin phân tán, sẽ phân phối các luồng tới Apache Flink và FlinkCEP sẽ phân tích các mẫu sự kiện phức tạp.

Bạn có thể viết chương trình trong Apache Flink để xử lý sự kiện phức tạp bằng cách sử dụng API mẫu. Nó cho phép bạn quyết định các mẫu sự kiện để phát hiện từ dữ liệu luồng liên tục. Dưới đây là một số mẫu CEP được sử dụng phổ biến nhất -

Bắt đầu

Nó được sử dụng để xác định trạng thái bắt đầu. Chương trình sau đây cho biết cách nó được định nghĩa trong chương trình Flink:

Pattern<Event, ?> next = start.next("next");

Ở đâu

Nó được sử dụng để xác định một điều kiện lọc trong trạng thái hiện tại.

patternState.where(new FilterFunction <Event>() {  
   @Override 
      public boolean filter(Event value) throws Exception { 
   } 
});

Kế tiếp

Nó được sử dụng để thêm một trạng thái mẫu mới và sự kiện đối sánh cần thiết để vượt qua mẫu trước đó.

Pattern<Event, ?> next = start.next("next");

Theo dõi bởi

Nó được sử dụng để thêm một trạng thái mẫu mới nhưng ở đây các sự kiện khác có thể xảy ra b / w hai sự kiện khớp.

Pattern<Event, ?> followedBy = start.followedBy("next");

Gelly

API đồ thị của Apache Flink là Gelly. Gelly được sử dụng để thực hiện phân tích đồ thị trên các ứng dụng Flink bằng cách sử dụng một tập hợp các phương pháp và tiện ích. Bạn có thể phân tích các biểu đồ khổng lồ bằng Apache Flink API theo cách phân tán với Gelly. Có những thư viện đồ thị khác cũng giống như Apache Giraph cho cùng mục đích, nhưng vì Gelly được sử dụng trên Apache Flink nên nó sử dụng một API duy nhất. Điều này rất hữu ích từ quan điểm phát triển và vận hành.

Hãy để chúng tôi chạy một ví dụ sử dụng Apache Flink API - Gelly.

Đầu tiên, bạn cần sao chép 2 tệp jar Gelly từ thư mục opt của Apache Flink vào thư mục lib của nó. Sau đó, chạy flink-gelly-wallet jar.

cp opt/flink-gelly* lib/ 
./bin/flink run examples/gelly/flink-gelly-examples_*.jar

Bây giờ chúng ta hãy chạy ví dụ về Xếp hạng trang.

PageRank tính điểm trên mỗi đỉnh, là tổng điểm của PageRank được truyền qua các cạnh. Điểm của mỗi đỉnh được chia đều cho các cạnh ngoài. Các đỉnh có điểm cao được liên kết với các đỉnh có điểm cao khác.

Kết quả chứa ID đỉnh và điểm Xếp hạng trang.

usage: flink run examples/flink-gelly-examples_<version>.jar --algorithm PageRank [algorithm options] --input <input> [input options] --output <output> [output options] 

./bin/flink run examples/gelly/flink-gelly-examples_*.jar --algorithm PageRank --input CycleGraph --vertex_count 2 --output Print

Thư viện Machine Learning của Apache Flink được gọi là FlinkML. Vì việc sử dụng máy học ngày càng tăng theo cấp số nhân trong 5 năm qua, cộng đồng Flink đã quyết định thêm APO máy học này vào hệ sinh thái của mình. Danh sách những người đóng góp và thuật toán ngày càng tăng trong FlinkML. API này chưa phải là một phần của phân phối nhị phân.

Đây là một ví dụ về hồi quy tuyến tính sử dụng FlinkML -

// LabeledVector is a feature vector with a label (class or real value)
val trainingData: DataSet[LabeledVector] = ...
val testingData: DataSet[Vector] = ...

// Alternatively, a Splitter is used to break up a DataSet into training and testing data.
val dataSet: DataSet[LabeledVector] = ...
val trainTestData: DataSet[TrainTestDataSet] = Splitter.trainTestSplit(dataSet)
val trainingData: DataSet[LabeledVector] = trainTestData.training
val testingData: DataSet[Vector] = trainTestData.testing.map(lv => lv.vector)
val mlr = MultipleLinearRegression()

.setStepsize(1.0)
.setIterations(100)
.setConvergenceThreshold(0.001)
mlr.fit(trainingData)

// The fitted model can now be used to make predictions
val predictions: DataSet[LabeledVector] = mlr.predict(testingData)

Phía trong flink-1.7.1/examples/batch/, bạn sẽ tìm thấy tệp KMeans.jar. Hãy để chúng tôi chạy ví dụ FlinkML mẫu này.

Chương trình ví dụ này được chạy bằng cách sử dụng điểm mặc định và tập dữ liệu centroid.

./bin/flink run examples/batch/KMeans.jar --output Print

Trong chương này, chúng ta sẽ hiểu một vài trường hợp kiểm thử trong Apache Flink.

Apache Flink - Bouygues Telecom

Bouygues Telecom là một trong những tổ chức viễn thông lớn nhất ở Pháp. Nó có hơn 11 triệu thuê bao di động và hơn 2,5 triệu khách hàng cố định. Bouygues nghe nói về Apache Flink lần đầu tiên trong cuộc họp nhóm Hadoop được tổ chức tại Paris. Kể từ đó, họ đã sử dụng Flink cho nhiều trường hợp sử dụng. Họ đã xử lý hàng tỷ tin nhắn trong một ngày trong thời gian thực thông qua Apache Flink.

Đây là những gì Bouygues phải nói về Apache Flink: "Chúng tôi kết thúc với Flink vì hệ thống hỗ trợ phát trực tuyến thực sự - cả ở API và ở cấp thời gian chạy, mang lại cho chúng tôi khả năng lập trình và độ trễ thấp mà chúng tôi đang tìm kiếm. Ngoài ra, chúng tôi đã có thể thiết lập và chạy hệ thống của mình với Flink trong một phần nhỏ thời gian so với các giải pháp khác, dẫn đến nhiều tài nguyên dành cho nhà phát triển có sẵn hơn để mở rộng logic nghiệp vụ trong hệ thống. "

Tại Bouygues, trải nghiệm của khách hàng là ưu tiên hàng đầu. Họ phân tích dữ liệu trong thời gian thực để có thể cung cấp thông tin chi tiết cho các kỹ sư của họ -

  • Trải nghiệm khách hàng trong thời gian thực qua mạng của họ

  • Điều gì đang xảy ra trên toàn cầu trên mạng

  • Đánh giá và vận hành mạng

Họ đã tạo ra một hệ thống có tên LUX (Trải nghiệm người dùng được ghi nhật ký) xử lý dữ liệu nhật ký khổng lồ từ thiết bị mạng với tham chiếu dữ liệu nội bộ để cung cấp các chỉ số chất lượng trải nghiệm sẽ ghi lại trải nghiệm khách hàng của họ và xây dựng chức năng báo động để phát hiện bất kỳ lỗi nào trong việc tiêu thụ dữ liệu trong vòng 60 giây.

Để đạt được điều này, họ cần một khuôn khổ có thể lấy dữ liệu lớn trong thời gian thực, dễ thiết lập và cung cấp bộ API phong phú để xử lý dữ liệu được truyền trực tuyến. Apache Flink hoàn toàn phù hợp với Bouygues Telecom.

Apache Flink - Alibaba

Alibaba là công ty bán lẻ thương mại điện tử lớn nhất trên thế giới với doanh thu 394 tỷ đô la vào năm 2015. Tìm kiếm của Alibaba là điểm truy cập của tất cả khách hàng, nơi hiển thị tất cả các tìm kiếm và đề xuất phù hợp.

Alibaba sử dụng Apache Flink trong công cụ tìm kiếm của mình để hiển thị kết quả theo thời gian thực với độ chính xác và mức độ liên quan cao nhất cho từng người dùng.

Alibaba đang tìm kiếm một khuôn khổ, đó là -

  • Rất nhanh nhẹn trong việc duy trì một cơ sở mã cho toàn bộ quy trình cơ sở hạ tầng tìm kiếm của họ.

  • Cung cấp độ trễ thấp cho các thay đổi về tính sẵn có của các sản phẩm trên trang web.

  • Nhất quán và hiệu quả về chi phí.

Apache Flink đủ điều kiện cho tất cả các yêu cầu trên. Họ cần một khuôn khổ, có một công cụ xử lý duy nhất và có thể xử lý cả dữ liệu hàng loạt và luồng với cùng một công cụ và đó là những gì Apache Flink làm.

Họ cũng sử dụng Blink, một phiên bản được chia nhỏ cho Flink để đáp ứng một số yêu cầu riêng cho tìm kiếm của họ. Họ cũng đang sử dụng API bảng của Apache Flink với một số cải tiến cho tìm kiếm của họ.

Đây là những gì Alibaba phải nói về apache Flink: " Nhìn lại, không nghi ngờ gì là một năm thành công của Blink và Flink tại Alibaba. Không ai nghĩ rằng chúng tôi sẽ đạt được nhiều tiến bộ này trong một năm, và chúng tôi rất biết ơn tất cả những người đã giúp đỡ chúng tôi trong cộng đồng. Flink đã được chứng minh là hoạt động ở quy mô rất lớn. Chúng tôi cam kết tiếp tục công việc của mình với cộng đồng để đưa Flink tiến lên phía trước! "

Dưới đây là một bảng tổng hợp, cho thấy sự so sánh giữa ba khung dữ liệu lớn phổ biến nhất: Apache Flink, Apache Spark và Apache Hadoop.

Apache Hadoop Apache Spark Apache Flink

Year of Origin

2005 2009 2009

Place of Origin

MapReduce (Google) Hadoop (Yahoo) đại học California, Berkeley Đại học kỹ thuật Berlin

Data Processing Engine

Lô hàng Lô hàng Suối

Processing Speed

Chậm hơn Spark và Flink Nhanh hơn 100 lần so với Hadoop Nhanh hơn tia lửa

Programming Languages

Java, C, C ++, Ruby, Groovy, Perl, Python Java, Scala, python và R Java và Scala

Programming Model

MapReduce Tập dữ liệu phân phối có khả năng phục hồi (RDD) Luồng dữ liệu tuần hoàn

Data Transfer

Lô hàng Lô hàng Pipelined và Batch

Memory Management

Dựa trên đĩa JVM Managed Được quản lý hoạt động

Latency

Thấp Trung bình Thấp

Throughput

Trung bình Cao Cao

Optimization

Thủ công Thủ công Tự động

API

Cấp thấp Trình độ cao Trình độ cao

Streaming Support

NA Spark Streaming Truyền trực tuyến Flink

SQL Support

Hive, Impala SparkSQL API bảng và SQL

Graph Support

NA GraphX Gelly

Machine Learning Support

NA SparkML FlinkML

Bảng so sánh mà chúng ta đã thấy trong chương trước kết luận khá nhiều. Apache Flink là khuôn khổ phù hợp nhất cho các trường hợp sử dụng và xử lý thời gian thực. Hệ thống công cụ duy nhất của nó là duy nhất có thể xử lý cả dữ liệu hàng loạt và truyền trực tuyến với các API khác nhau như Dataset và DataStream.

Điều đó không có nghĩa là Hadoop và Spark bị loại khỏi cuộc chơi, việc lựa chọn khung dữ liệu lớn phù hợp nhất luôn phụ thuộc và thay đổi theo từng trường hợp sử dụng. Có thể có một số trường hợp sử dụng trong đó sự kết hợp của Hadoop và Flink hoặc Spark và Flink có thể phù hợp.

Tuy nhiên, Flink là khuôn khổ tốt nhất để xử lý thời gian thực hiện tại. Sự phát triển của Apache Flink thật đáng kinh ngạc và số lượng người đóng góp cho cộng đồng của nó đang tăng lên từng ngày.

Happy Flinking!