Java'da Cloud Function'tan Cloud Dataflow ardışık düzen işi nasıl tetiklenir?
Cloud Dataflow ardışık düzenini Cloud Functions'tan tetiklemem gerekiyor. Ancak Bulut işlevi Java ile yazılmalıdır. Dolayısıyla, Bulut İşlevi için Tetikleyici, Google Cloud Storage'ın Kesinleştirme / Oluşturma Etkinliğidir; yani, bir dosya GCS paketine yüklendiğinde, Bulut İşlevinin Bulut veri akışını tetiklemesi gerekir.
Veri akışı ardışık düzeni (toplu iş) oluşturduğumda ve ardışık düzeni yürüttüğümde, bir Dataflow ardışık düzeni şablonu oluşturur ve bir Dataflow işi oluşturur.
Ancak Java'da bir bulut işlevi oluşturduğumda ve bir dosya yüklendiğinde, durum yalnızca "tamam" diyor, ancak veri akışı ardışık düzenini tetiklemiyor.
Bulut işlevi
package com.example;
import com.example.Example.GCSEvent;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.CreateJobFromTemplateRequest;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.logging.Logger;
public class Example implements BackgroundFunction<GCSEvent> {
private static final Logger logger = Logger.getLogger(Example.class.getName());
@Override
public void accept(GCSEvent event, Context context) throws IOException, GeneralSecurityException {
logger.info("Event: " + context.eventId());
logger.info("Event Type: " + context.eventType());
HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
HttpRequestInitializer requestInitializer = new HttpCredentialsAdapter(credentials);
Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, requestInitializer)
.setApplicationName("Google Dataflow function Demo")
.build();
String projectId = "my-project-id";
RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
runtimeEnvironment.setBypassTempDirValidation(false);
runtimeEnvironment.setTempLocation("gs://my-dataflow-job-bucket/tmp");
CreateJobFromTemplateRequest createJobFromTemplateRequest = new CreateJobFromTemplateRequest();
createJobFromTemplateRequest.setEnvironment(runtimeEnvironment);
createJobFromTemplateRequest.setLocation("us-central1");
createJobFromTemplateRequest.setGcsPath("gs://my-dataflow-job-bucket-staging/templates/cloud-dataflow-template");
createJobFromTemplateRequest.setJobName("Dataflow-Cloud-Job");
createJobFromTemplateRequest.setParameters(new HashMap<String,String>());
createJobFromTemplateRequest.getParameters().put("inputFile","gs://cloud-dataflow-bucket-input/*.txt");
dataflowService.projects().templates().create(projectId,createJobFromTemplateRequest);
throw new UnsupportedOperationException("Not supported yet.");
}
public static class GCSEvent {
String bucket;
String name;
String metageneration;
}
}
pom.xml (bulut işlevi)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cloudfunctions</groupId>
<artifactId>http-function</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.google.auth/google-auth-library-credentials -->
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
<version>0.21.1</version>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-dataflow</artifactId>
<version>v1b3-rev207-1.20.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>0.21.1</version>
</dependency>
</dependencies>
<!-- Required for Java 11 functions in the inline editor -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<excludes>
<exclude>.google/</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
bulut işlevi günlükleri
Bulut işlevi aracılığıyla bulut depolamadan veri akışını tetikledikleri aşağıdaki blogları (referans için ekleyerek) inceledim. Ancak kod, Node.js veya python'da yazılmıştır. Ancak bulut işlevim java ile yazılmalıdır.
Node.js'de bulut işlevleri aracılığıyla Veri Akışı ardışık düzenini tetikleme
https://dzone.com/articles/triggering-dataflow-pipelines-with-cloud-functions
Python kullanarak bulut işlevleri aracılığıyla veri akışı ardışık düzenini tetikleme
https://medium.com/google-cloud/how-to-kick-off-a-dataflow-pipeline-via-cloud-functions-696927975d4e
Bu konudaki herhangi bir yardım çok takdir edilmektedir.
Yanıtlar
RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
runtimeEnvironment.setBypassTempDirValidation(false);
runtimeEnvironment.setTempLocation("gs://karthiksfirstbucket/temp1");
LaunchTemplateParameters launchTemplateParameters = new LaunchTemplateParameters();
launchTemplateParameters.setEnvironment(runtimeEnvironment);
launchTemplateParameters.setJobName("newJob" + (new Date()).getTime());
Map<String, String> params = new HashMap<String, String>();
params.put("inputFile", "gs://karthiksfirstbucket/sample.txt");
params.put("output", "gs://karthiksfirstbucket/count1");
launchTemplateParameters.setParameters(params);
writer.write("4");
Dataflow.Projects.Templates.Launch launch = dataflowService.projects().templates().launch(projectId, launchTemplateParameters);
launch.setGcsPath("gs://dataflow-templates-us-central1/latest/Word_Count");
launch.execute();
Yukarıdaki kod bir şablon başlatır ve veri akışı ardışık düzenini yürütür
- uygulama varsayılan kimlik bilgilerini kullanma (Kullanıcı kredisi veya hizmet kredisi olarak değiştirilebilir)
- bölge varsayılan bölgedir (Değiştirilebilir).
- her HTTP tetikleyicisi için bir iş oluşturur (Tetikleyici değiştirilebilir).
Kodun tamamı aşağıda bulunabilir:
https://github.com/karthikeyan1127/Java_CloudFunction_DataFlow/blob/master/Hello.java