Apache Spark를 효율적으로 사용하여 데이터를 Elasticsearch로 푸시

Aug 20 2020

xml 파일에 2,700 만 개의 레코드가 있으며 elasticsearch 인덱스로 푸시하고 싶습니다. 아래는 spark scala로 작성된 코드 조각입니다. 저는 spark job jar를 만들고 AWS EMR에서 실행할 것입니다.

이 연습을 완료하기 위해 스파크를 어떻게 효율적으로 사용할 수 있습니까? 안내해주세요.

스파크 데이터 프레임에로드중인 12.5GB의 gzipped xml이 있습니다. 저는 Spark를 처음 사용합니다. (이 gzip 파일을 분할해야합니까? 아니면 Spark 실행자가 처리 할 것입니까?)

class ReadFromXML {

  def createXMLDF(): DataFrame = {
    val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
    import spark.implicits._
    val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)

    var new_df: DataFrame = null
      
      new_df = m_df.select($"CountryCode"(0).as("countryCode"), $"PostalCode"(0).as("postalCode"),
        $"state"(0).as("state"), $"county"(0).as("county"),
        $"city"(0).as("city"), $"district"(0).as("district"),
        $"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
        $"FullStreetName"(0).as("street"), functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal")) .where($"LocationList.Location._primary" === "true")
        .where("(array_contains(_languageCode, 'en'))")
        .where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
    

    new_df.drop("name")
  }
}

object PushToES extends App {
  val spark = SparkSession
    .builder()
    .appName("PushToES")
    .master("local[*]")
    .config("spark.es.nodes", "awsurl")
    .config("spark.es.port", "port")
    .config("spark.es.nodes.wan.only", "true")
    .config("spark.es.net.ssl", "true")
    .getOrCreate()

  val extractor = new ReadFromXML()

  val df = extractor.createXMLDF()
  df.saveToEs("myindex/_doc")
}

업데이트 1 : 68M 파일을 각각 분할 했으며이 단일 파일을 읽으려면 3.7 분이 걸립니다 gzip 압축 코덱 대신 snappy를 사용하려고하지 않았으므로 gz 파일을 snappy 파일로 변환하고 아래에 구성에 추가했습니다.

.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")

하지만 빈 데이터 프레임을 반환합니다.

df.printschema는 "root"만 반환합니다.

업데이트 2 : lzo 형식으로 실행했습니다. 데이터 프레임에서 압축을 풀고로드하는 데 시간이 매우 적게 걸립니다.

140MB 크기의 각 lzo 압축 파일을 반복하고 데이터 프레임을 만드는 것이 좋은 생각입니까? 또는

데이터 프레임에 10 개의 파일 세트를로드해야합니까? 또는

단일 데이터 프레임에 각각 140MB 씩 200 개의 lzo 압축 파일을 모두로드해야합니까? 그렇다면 이것이 마스터에로드 될 것이라고 생각하므로 마스터에 얼마나 많은 메모리를 할당해야합니까?

s3 버킷에서 파일을 읽는 동안 "s3a"uri가 성능을 향상시킬 수 있습니까? 또는 "s3"uri는 EMR에 적합합니까?

업데이트 3 : 10 개의 lzo 파일의 작은 세트를 테스트하기 위해 .. 아래 구성을 사용했습니다. EMR 클러스터는 전체 56 분이 걸렸고,이 단계 (Spark 애플리케이션)는 10 개의 파일을 처리하는 데 48 분이 걸렸습니다.

마스터 1 개 -m5.xlarge 4 vCore, 16GiB 메모리, EBS 전용 스토리지 EBS 스토리지 : 32GiB

2 코어 -m5.xlarge 4 vCore, 16GiB 메모리, EBS 전용 스토리지 EBS 스토리지 : 32GiB

아래 Spark 조정 매개 변수를 통해 https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.nodemanager.vmem-check-enabled": "false",
      "yarn.nodemanager.pmem-check-enabled": "false"
    }
  },
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "false"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.network.timeout": "800s",
      "spark.executor.heartbeatInterval": "60s",
      "spark.dynamicAllocation.enabled": "false",
      "spark.driver.memory": "10800M",
      "spark.executor.memory": "10800M",
      "spark.executor.cores": "2",
      "spark.executor.memoryOverhead": "1200M",
      "spark.driver.memoryOverhead": "1200M",
      "spark.memory.fraction": "0.80",
      "spark.memory.storageFraction": "0.30",
      "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.yarn.scheduler.reporterThread.maxFailures": "5",
      "spark.storage.level": "MEMORY_AND_DISK_SER",
      "spark.rdd.compress": "true",
      "spark.shuffle.compress": "true",
      "spark.shuffle.spill.compress": "true",
      "spark.default.parallelism": "4"
    }
  },
  {
    "Classification": "mapred-site",
    "Properties": {
      "mapreduce.map.output.compress": "true"
    }
  }
]

답변

2 AshishMishra Aug 29 2020 at 13:02

여기 내 쪽에서 몇 가지 팁이 있습니다.

마루 형식이나 다른 형식으로 데이터를 읽습니다. 필요에 따라 다시 분할하십시오. 데이터 변환은 시간이 걸릴 수 있으므로 Spark에서 읽은 다음 처리하십시오. 로드를 시작하기 전에지도를 만들고 데이터 형식을 지정하십시오. 이것은 복잡한 맵의 경우 쉽게 디버깅하는 데 도움이됩니다.

  val spark = SparkSession
    .builder()
    .appName("PushToES")
    .enableHiveSupport()
    .getOrCreate()


val batchSizeInMB=4; // change it as you need
val batchRetryCount= 3
val batchWriteRetryWait = 10
val batchEntries= 10
val enableSSL = true
val wanOnly = true
val enableIdempotentInserts = true
val esNodes = [yourNode1, yourNode2, yourNode3]
var esConfig = Map[String, String]()
esConfig = esConfig + ("es.node"-> esNodes.mkString)(","))
esConfig = esConfig + ("es.port"->port.toString())
esConfig = esConfig + ("es.batch.size.bytes"->(batchSizeInMB*1024*1024).toString())
esConfig = esConfig + ("es.batch.size.entries"->batchEntries.toString())
esConfig = esConfig + ("es.batch.write.retry.count"->batchRetryCount.toString())
esConfig = esConfig + ("es.batch.write.retry.wait"->batchWriteRetryWait.toString())
esConfig = esConfig + ("es.batch.write.refresh"->"false")
if(enableSSL){
esConfig = esConfig + ("es.net.ssl"->"true")
esConfig = esConfig + ("es.net.ssl.keystore.location"->"identity.jks")
esConfig = esConfig + ("es.net.ssl.cert.allow.self.signed"->"true")
}
if (wanOnly){
esConfig = esConfig + ("es.nodes.wan.only"->"true")
}

// This helps if some task fails , so data won't be dublicate
if(enableIdempotentInserts){
  esConfig = esConfig + ("es.mapping.id" ->"your_primary_key_column")
}

val df = "suppose you created it using parquet format or any format"

실제로 데이터는 드라이버 수준이 아닌 실행기 수준에서 삽입되므로 동시에 많은 연결이 열리지 않도록 각 실행기에 2-4 개의 코어 만 제공하십시오. 용이성에 따라 문서 크기 또는 항목을 변경할 수 있습니다. 그들에 대해 읽어보세요.

청크로 데이터를 쓰면 나중에 큰 데이터 세트를로드하는 데 도움이되고 데이터를로드하기 전에 인덱스 맵을 만들어보십시오. ES에 해당 기능이 있으므로 중첩 된 데이터를 거의 선호하지 않습니다. 즉, 데이터에 기본 키를 유지하려고합니다.

val dfToInsert = df.withColumn("salt", ceil(rand())*10).cast("Int").persist()
for (i<-0 to 10){
val start = System.currentTimeMillis
val finalDF = dfToInsert.filter($"salt"===i) val counts = finalDF.count() println(s"count of record in chunk $i -> $counts") finalDF.drop("salt").saveToES("indexName",esConfig) val totalTime = System.currentTimeMillis - start println(s"ended Loading data for chunk $i. Total time taken in Seconds : ${totalTime/1000}")
}

최종 DF에 별칭을 부여하고 각 실행에서 업데이트하십시오. 로드시 프로덕션 서버를 방해하고 싶지 않기 때문에

기억

이것은 일반적 일 수 없습니다. 하지만 당신에게 킥 스타트를 제공하기 위해

데이터 크기 또는 예산에 따라 10-40 명의 실행자를 유지합니다. 각 실행기 크기를 8-16GB, 오버 헤드 5GB로 유지합니다. (문서 크기가 크거나 작을 수 있으므로 다를 수 있습니다.) 필요한 경우 maxResultSize 8gb를 유지하십시오. 드라이버는 5 개의 코어와 30g 램을 가질 수 있습니다.

중요한 것들.

  • 인덱스에 따라 변경할 수 있으므로 구성을 변수로 유지해야합니다.

  • 삽입은 드라이버가 아닌 실행기에서 발생하므로 작성하는 동안 연결을 줄이십시오. 각 코어는 하나의 연결을 엽니 다.

  • 문서 삽입은 배치 항목 크기 또는 문서 크기로 가능합니다. 여러 번 실행하면서 학습에 따라 변경하십시오.

  • 솔루션을 강력하게 만드십시오. 모든 크기의 데이터를 처리 할 수 ​​있어야합니다. 읽기와 쓰기를 모두 조정할 수 있지만로드를 시작하기 전에 문서 맵에 따라 데이터 형식을 지정하십시오. 데이터 문서가 약간 복잡하고 중첩 된 경우 쉽게 디버깅하는 데 도움이됩니다.

  • spark-submit의 메모리는 작업을 실행하는 동안 학습에 따라 조정할 수도 있습니다. 메모리와 배치 크기를 변경하여 삽입 시간을 살펴보십시오.

  • 가장 중요한 것은 디자인입니다. 최종 쿼리와 요구 사항을 염두에두고 맵을 만드는 것보다 ES를 사용하는 경우.

3 PubuduSitinamaluwa Aug 28 2020 at 20:31

완전한 대답은 아니지만 여전히 주석이 약간 깁니다. 제가 제안하고 싶은 몇 가지 팁이 있습니다.

명확하지 않지만 당신의 걱정은 실행 시간이라고 생각합니다. 주석에서 제안한대로 클러스터에 더 많은 노드 / 실행기를 추가하여 성능을 향상시킬 수 있습니다. Spark에서 파티션을 나누지 않고 gzip 파일을로드 한 경우 적절한 크기로 분할해야합니다. (너무 작지 않음-처리 속도가 느려집니다. 너무 크지 않음-실행기가 OOM을 실행합니다).

parquetSpark로 작업 할 때 좋은 파일 형식입니다. XML을 마루로 변환 할 수 있다면. 매우 압축되고 가볍습니다.

귀하의 의견을 읽는 coalesce것은 전체 셔플을 수행하지 않습니다. 통합 알고리즘은 데이터를 일부 파티션에서 기존 파티션으로 이동하여 노드 수를 변경합니다. 이 알고리즘은 분명히 파티션 수를 늘릴 수 없습니다. repartition대신 사용하십시오 . 작업은 비용이 많이 들지만 파티션 수를 늘릴 수 있습니다. 더 많은 사실을 확인하십시오.https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4