Apache Flink-API 개념

Flink에는 개발자가 일괄 데이터와 실시간 데이터 모두에서 변환을 수행 할 수있는 풍부한 API 세트가 있습니다. 다양한 변환에는 매핑, 필터링, 정렬, 결합, 그룹화 및 집계가 포함됩니다. Apache Flink에 의한 이러한 변환은 분산 데이터에서 수행됩니다. Apache Flink가 제공하는 다양한 API에 대해 논의하겠습니다.

데이터 세트 API

Apache Flink의 데이터 세트 API는 일정 기간 동안 데이터에 대한 일괄 작업을 수행하는 데 사용됩니다. 이 API는 Java, Scala 및 Python에서 사용할 수 있습니다. 필터링, 매핑, 집계, 조인 및 그룹화와 같은 데이터 세트에 다양한 종류의 변환을 적용 할 수 있습니다.

데이터 세트는 로컬 파일과 같은 소스에서 생성되거나 특정 소스에서 파일을 읽어 생성되며 결과 데이터는 분산 파일 또는 명령 줄 터미널과 같은 다른 싱크에 기록 될 수 있습니다. 이 API는 Java 및 Scala 프로그래밍 언어 모두에서 지원됩니다.

다음은 Dataset API의 Wordcount 프로그램입니다.

public class WordCountProg {
   public static void main(String[] args) throws Exception {
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      DataSet<String> text = env.fromElements(
      "Hello",
      "My Dataset API Flink Program");

      DataSet<Tuple2<String, Integer>> wordCounts = text
      .flatMap(new LineSplitter())
      .groupBy(0)
      .sum(1);

      wordCounts.print();
   }

   public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
         for (String word : line.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}

DataStream API

이 API는 연속 스트림에서 데이터를 처리하는 데 사용됩니다. 필터링, 매핑, 창 설정, 스트림 데이터 집계와 같은 다양한 작업을 수행 할 수 있습니다. 이 데이터 스트림에는 메시지 큐, 파일, 소켓 스트림과 같은 다양한 소스가 있으며 결과 데이터는 명령 줄 터미널과 같은 다른 싱크에 기록 될 수 있습니다. Java 및 Scala 프로그래밍 언어 모두이 API를 지원합니다.

다음은 DataStream API의 스트리밍 Wordcount 프로그램으로, 연속적인 단어 수 스트림이 있고 데이터가 두 번째 창에 그룹화되어 있습니다.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCountProg {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream<Tuple2<String, Integer>> dataStream = env
      .socketTextStream("localhost", 9999)
      .flatMap(new Splitter())
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1);
      dataStream.print();
      env.execute("Streaming WordCount Example");
   }
   public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
         for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}