자바의 Cloud Function에서 Cloud Dataflow 파이프 라인 작업을 트리거하는 방법은 무엇인가요?

Aug 21 2020

Cloud Functions에서 Cloud Dataflow 파이프 라인을 트리거해야합니다. 그러나 Cloud 함수는 Java로 작성되어야합니다. 따라서 Cloud Function의 트리거는 Google Cloud Storage의 종료 / 만들기 이벤트입니다. 즉, 파일이 GCS 버킷에 업로드되면 Cloud 함수가 Cloud 데이터 흐름을 트리거해야합니다.

데이터 흐름 파이프 라인 (일괄)을 생성하고 파이프 라인을 실행하면 Dataflow 파이프 라인 템플릿이 생성되고 Dataflow 작업이 생성됩니다.

하지만 Java로 클라우드 함수를 생성하고 파일이 업로드되면 상태가 "ok"로 표시되지만 데이터 흐름 파이프 라인이 트리거되지는 않습니다.

클라우드 기능

package com.example;

import com.example.Example.GCSEvent;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.CreateJobFromTemplateRequest;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.logging.Logger;

public class Example implements BackgroundFunction<GCSEvent> {
    private static final Logger logger = Logger.getLogger(Example.class.getName());

    @Override
    public void accept(GCSEvent event, Context context) throws IOException, GeneralSecurityException {
        logger.info("Event: " + context.eventId());
        logger.info("Event Type: " + context.eventType());


        HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();

        GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
        HttpRequestInitializer requestInitializer = new HttpCredentialsAdapter(credentials);


        Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, requestInitializer)
                .setApplicationName("Google Dataflow function Demo")
                .build();

        String projectId = "my-project-id";


        RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
        runtimeEnvironment.setBypassTempDirValidation(false);
        runtimeEnvironment.setTempLocation("gs://my-dataflow-job-bucket/tmp");
        CreateJobFromTemplateRequest createJobFromTemplateRequest = new CreateJobFromTemplateRequest();
        createJobFromTemplateRequest.setEnvironment(runtimeEnvironment);
        createJobFromTemplateRequest.setLocation("us-central1");
        createJobFromTemplateRequest.setGcsPath("gs://my-dataflow-job-bucket-staging/templates/cloud-dataflow-template");
        createJobFromTemplateRequest.setJobName("Dataflow-Cloud-Job");
        createJobFromTemplateRequest.setParameters(new HashMap<String,String>());
        createJobFromTemplateRequest.getParameters().put("inputFile","gs://cloud-dataflow-bucket-input/*.txt");
        dataflowService.projects().templates().create(projectId,createJobFromTemplateRequest);

        throw new UnsupportedOperationException("Not supported yet.");
    }

    public static class GCSEvent {
        String bucket;
        String name;
        String metageneration;
    }


}

pom.xml (클라우드 함수)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cloudfunctions</groupId>
  <artifactId>http-function</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.target>11</maven.compiler.target>
    <maven.compiler.source>11</maven.compiler.source>
  </properties>

  <dependencies>
  <!-- https://mvnrepository.com/artifact/com.google.auth/google-auth-library-credentials -->
<dependency>
    <groupId>com.google.auth</groupId>
    <artifactId>google-auth-library-credentials</artifactId>
    <version>0.21.1</version>
</dependency>

  <dependency>
    <groupId>com.google.apis</groupId>
    <artifactId>google-api-services-dataflow</artifactId>
    <version>v1b3-rev207-1.20.0</version>
</dependency>
    <dependency>
      <groupId>com.google.cloud.functions</groupId>
      <artifactId>functions-framework-api</artifactId>
      <version>1.0.1</version>
    </dependency>
         <dependency>
    <groupId>com.google.auth</groupId>
    <artifactId>google-auth-library-oauth2-http</artifactId>
    <version>0.21.1</version>
</dependency>
  </dependencies>

  <!-- Required for Java 11 functions in the inline editor -->
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <excludes>
            <exclude>.google/</exclude>
          </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

클라우드 기능 로그

클라우드 기능을 통해 클라우드 스토리지에서 데이터 흐름을 트리거 한 아래 블로그 (참조 용 추가)를 살펴 보았습니다. 그러나 코드는 Node.js 또는 python으로 작성되었습니다. 하지만 내 클라우드 기능은 자바로 작성되어야합니다.

Node.js의 클라우드 함수를 통해 Dataflow 파이프 라인 트리거

https://dzone.com/articles/triggering-dataflow-pipelines-with-cloud-functions

Python을 사용하여 클라우드 함수를 통해 데이터 흐름 파이프 라인 트리거

https://medium.com/google-cloud/how-to-kick-off-a-dataflow-pipeline-via-cloud-functions-696927975d4e

이에 대한 도움을 주시면 대단히 감사하겠습니다.

답변

4 Karthikeyant1127 Sep 04 2020 at 11:50
RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
runtimeEnvironment.setBypassTempDirValidation(false);
runtimeEnvironment.setTempLocation("gs://karthiksfirstbucket/temp1");

LaunchTemplateParameters launchTemplateParameters = new LaunchTemplateParameters();
launchTemplateParameters.setEnvironment(runtimeEnvironment);
launchTemplateParameters.setJobName("newJob" + (new Date()).getTime());

Map<String, String> params = new HashMap<String, String>();
params.put("inputFile", "gs://karthiksfirstbucket/sample.txt");
params.put("output", "gs://karthiksfirstbucket/count1");
launchTemplateParameters.setParameters(params);
writer.write("4");
       
Dataflow.Projects.Templates.Launch launch = dataflowService.projects().templates().launch(projectId, launchTemplateParameters);            
launch.setGcsPath("gs://dataflow-templates-us-central1/latest/Word_Count");
launch.execute();

위 코드는 템플릿을 시작하고 데이터 흐름 파이프 라인을 실행합니다.

  1. 애플리케이션 기본 자격 증명 사용 (사용자 자격 증명 또는 서비스 자격 증명으로 변경할 수 있음)
  2. 지역은 기본 지역 (변경 가능)입니다.
  3. 모든 HTTP 트리거에 대한 작업을 생성합니다 (트리거 변경 가능).

전체 코드는 아래에서 찾을 수 있습니다.

https://github.com/karthikeyan1127/Java_CloudFunction_DataFlow/blob/master/Hello.java