Apache Flink - Erstellen einer Flink-Anwendung

In diesem Kapitel erfahren Sie, wie Sie eine Flink-Anwendung erstellen.

Öffnen Sie die Eclipse-IDE, klicken Sie auf Neues Projekt und wählen Sie Java-Projekt aus.

Geben Sie den Projektnamen ein und klicken Sie auf Fertig stellen.

Klicken Sie nun auf Fertig stellen, wie im folgenden Screenshot gezeigt.

Klicken Sie nun mit der rechten Maustaste auf src und gehe zu Neue >> Klasse.

Geben Sie einen Klassennamen ein und klicken Sie auf Fertig stellen.

Kopieren Sie den folgenden Code und fügen Sie ihn in den Editor ein.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {

   // *************************************************************************
   // PROGRAM
   // *************************************************************************
   public static void main(String[] args) throws Exception {
      final ParameterTool params = ParameterTool.fromArgs(args);
      // set up the execution environment
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);
      // get input data
      DataSet<String> text = env.readTextFile(params.get("input"));
      DataSet<Tuple2<String, Integer>> counts =
      // split up the lines in pairs (2-tuples) containing: (word,1)
      text.flatMap(new Tokenizer())
      // group by the tuple field "0" and sum up tuple field "1"
      .groupBy(0)
      .sum(1);
      // emit result
      if (params.has("output")) {
         counts.writeAsCsv(params.get("output"), "\n", " ");
         // execute program
         env.execute("WordCount Example");
      } else {
         System.out.println("Printing result to stdout. Use --output to specify output path.");
         counts.print();
      }
   }
   
   // *************************************************************************
   // USER FUNCTIONS
   // *************************************************************************
   public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
         // normalize and split the line
         String[] tokens = value.toLowerCase().split("\\W+");
         // emit the pairs
         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<>(token, 1));
            }
         }
      }
   }
}

Im Editor werden viele Fehler angezeigt, da diesem Projekt Flink-Bibliotheken hinzugefügt werden müssen.

Klicken Sie mit der rechten Maustaste auf das Projekt >> Erstellungspfad >> Erstellungspfad konfigurieren.

Wählen Sie die Registerkarte Bibliotheken und klicken Sie auf Externe JARs hinzufügen.

Gehen Sie in das lib-Verzeichnis von Flink, wählen Sie alle 4 Bibliotheken aus und klicken Sie auf OK.

Gehen Sie zur Registerkarte Bestellen und Exportieren, wählen Sie alle Bibliotheken aus und klicken Sie auf OK.

Sie werden sehen, dass die Fehler nicht mehr da sind.

Lassen Sie uns nun diese Anwendung exportieren. Klicken Sie mit der rechten Maustaste auf das Projekt und klicken Sie auf Exportieren.

Wählen Sie JAR-Datei und klicken Sie auf Weiter

Geben Sie einen Zielpfad ein und klicken Sie auf Weiter

Klicken Sie auf Weiter>

Klicken Sie auf Durchsuchen, wählen Sie die Hauptklasse (WordCount) aus und klicken Sie auf Fertig stellen.

Note - Klicken Sie auf OK, falls Sie eine Warnung erhalten.

Führen Sie den folgenden Befehl aus. Die soeben erstellte Flink-Anwendung wird weiter ausgeführt.

./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output