Dataproc 클러스터에서 Bigquery 테이블로 데이터를로드하는 동안 오류가 발생했습니다.
Nov 19 2020
Dataproc에서 실행되는 Spark 작업이 있습니다. 결과를 BigQuery에로드하고 싶습니다. 데이터를 bigquery에 저장하려면 spark-bigquery 커넥터를 추가해야한다는 것을 알고 있습니다.
name := "spl_prj"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.0"
conflictManager := ConflictManager.latestRevision
libraryDependencies ++= Seq(
"org.apache.spark" %%"spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided ,
"com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.17.3"
)
jar를 빌드하고 작업을 제출하면 다음 오류가 발생합니다.
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:639) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) at com.renault.datalake.spl_prj.Main$.main(Main.scala:58)
at com.renault.datalake.spl_prj.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
원인 : java.lang.ClassNotFoundException : bigquery.DefaultSource
이 예제 에서 작업 lik을 제출할 때 jar를 추가 할 수있는 권한이 없습니다. sbt가 jar를 빌드 할 때 컴파일 프로세스에 커넥터를 추가하지 않는다고 생각합니다. 바로 실행하려는 코드 스칼라 스파크입니다.
val spark = SparkSession.builder.config(conf).getOrCreate()
val bucket = "doc_spk"
spark.conf.set("temporaryGcsBucket", bucket)
val sc =spark.sparkContext
val rddRowString = sc.binaryRecords("gs://bucket/GAR", 120).map(x=>(x.slice(0,17),x.slice(17,20),x.slice(20,120)))
val df=spark.createDataFrame(rddRowString).toDF("v","data","val_data")
df.write.format("bigquery")
.option("table","db.table")
.save()
답변
1 Srinivas Nov 18 2020 at 23:32
jar buil.sbt
파일을 빌드 하려면 아래 파일을 사용하십시오 fat
.
build.sbt
name := "spl_prj"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.0"
conflictManager := ConflictManager.latestRevision
libraryDependencies ++= Seq(
"org.apache.spark" %%"spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided ,
"com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.17.3"
)
assemblyMergeStrategy in assembly := {
case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
case PathList("META-INF",xs @ _*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
assemblyJarName in assembly := s"${name.value}-${version.value}.jar"
project/plugins.sbt
파일을 만들고 아래 내용을 추가 하십시오 .
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
아래 명령을 실행하여```fat ''jar를 만듭니다.
sbt clean compile assembly
참고 : 프로젝트 요구 사항에 따라 버전을 조정할 수 있습니다.