Apache Flink - การสร้างแอปพลิเคชัน Flink
ในบทนี้เราจะเรียนรู้วิธีสร้างแอปพลิเคชัน Flink
เปิด Eclipse IDE คลิกที่ New Project และเลือก Java Project
ตั้งชื่อโครงการและคลิกที่ Finish
ตอนนี้คลิกที่ Finish ตามที่แสดงในภาพหน้าจอต่อไปนี้
ตอนนี้คลิกขวาที่ src และไปที่ New >> Class
ตั้งชื่อชั้นเรียนแล้วคลิกที่ Finish
คัดลอกและวางโค้ดด้านล่างในตัวแก้ไข
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
คุณจะได้รับข้อผิดพลาดมากมายในตัวแก้ไขเนื่องจากต้องเพิ่มไลบรารี Flink ในโปรเจ็กต์นี้
คลิกขวาที่โปรเจ็กต์ >> Build Path >> Configure Build Path
เลือกแท็บ Libraries และคลิกที่ Add External JARs
ไปที่ไดเรกทอรี lib ของ Flink เลือกไลบรารีทั้งหมด 4 ไลบรารีแล้วคลิกตกลง
ไปที่แท็บ Order and Export เลือกไลบรารีทั้งหมดแล้วคลิกตกลง
คุณจะเห็นว่าไม่มีข้อผิดพลาดอีกต่อไป
ตอนนี้ให้เราส่งออกแอปพลิเคชันนี้ คลิกขวาที่โครงการและคลิกที่ส่งออก
เลือกไฟล์ JAR แล้วคลิกถัดไป
ระบุเส้นทางปลายทางแล้วคลิกถัดไป
คลิกที่ถัดไป>
คลิกที่ Browse เลือกคลาสหลัก (WordCount) แล้วคลิก Finish
Note - คลิกตกลงในกรณีที่คุณได้รับคำเตือน
เรียกใช้คำสั่งด้านล่าง มันจะเรียกใช้แอปพลิเคชัน Flink ที่คุณเพิ่งสร้างขึ้น
./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output