Apache Flink - Erstellen einer Flink-Anwendung
In diesem Kapitel erfahren Sie, wie Sie eine Flink-Anwendung erstellen.
Öffnen Sie die Eclipse-IDE, klicken Sie auf Neues Projekt und wählen Sie Java-Projekt aus.
Geben Sie den Projektnamen ein und klicken Sie auf Fertig stellen.
Klicken Sie nun auf Fertig stellen, wie im folgenden Screenshot gezeigt.
Klicken Sie nun mit der rechten Maustaste auf src und gehe zu Neue >> Klasse.
Geben Sie einen Klassennamen ein und klicken Sie auf Fertig stellen.
Kopieren Sie den folgenden Code und fügen Sie ihn in den Editor ein.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
Im Editor werden viele Fehler angezeigt, da diesem Projekt Flink-Bibliotheken hinzugefügt werden müssen.
Klicken Sie mit der rechten Maustaste auf das Projekt >> Erstellungspfad >> Erstellungspfad konfigurieren.
Wählen Sie die Registerkarte Bibliotheken und klicken Sie auf Externe JARs hinzufügen.
Gehen Sie in das lib-Verzeichnis von Flink, wählen Sie alle 4 Bibliotheken aus und klicken Sie auf OK.
Gehen Sie zur Registerkarte Bestellen und Exportieren, wählen Sie alle Bibliotheken aus und klicken Sie auf OK.
Sie werden sehen, dass die Fehler nicht mehr da sind.
Lassen Sie uns nun diese Anwendung exportieren. Klicken Sie mit der rechten Maustaste auf das Projekt und klicken Sie auf Exportieren.
Wählen Sie JAR-Datei und klicken Sie auf Weiter
Geben Sie einen Zielpfad ein und klicken Sie auf Weiter
Klicken Sie auf Weiter>
Klicken Sie auf Durchsuchen, wählen Sie die Hauptklasse (WordCount) aus und klicken Sie auf Fertig stellen.
Note - Klicken Sie auf OK, falls Sie eine Warnung erhalten.
Führen Sie den folgenden Befehl aus. Die soeben erstellte Flink-Anwendung wird weiter ausgeführt.
./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output