ใช้ Apache Spark อย่างมีประสิทธิภาพเพื่อส่งข้อมูลไปยัง elasticsearch

Aug 20 2020

ฉันมี 27 ล้านเรกคอร์ดในไฟล์ xml ที่ฉันต้องการผลักดันมันลงใน elasticsearch index ด้านล่างนี้คือข้อมูลโค้ดที่เขียนด้วย spark scala ฉันกำลังสร้างกระปุกงาน spark และจะทำงานบน AWS EMR

ฉันจะใช้ประกายไฟอย่างมีประสิทธิภาพเพื่อทำแบบฝึกหัดนี้ได้อย่างไร? กรุณาชี้แนะ

ฉันมี xml gzipped ที่ 12.5 gb ซึ่งฉันกำลังโหลดลงใน spark dataframe ฉันเพิ่งเริ่มใช้ Spark .. (ฉันควรแยกไฟล์ gzip นี้หรือไม่หรือผู้ดำเนินการ spark จะดูแลมัน?)

class ReadFromXML {

  def createXMLDF(): DataFrame = {
    val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
    import spark.implicits._
    val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)

    var new_df: DataFrame = null
      
      new_df = m_df.select($"CountryCode"(0).as("countryCode"), $"PostalCode"(0).as("postalCode"),
        $"state"(0).as("state"), $"county"(0).as("county"),
        $"city"(0).as("city"), $"district"(0).as("district"),
        $"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
        $"FullStreetName"(0).as("street"), functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal")) .where($"LocationList.Location._primary" === "true")
        .where("(array_contains(_languageCode, 'en'))")
        .where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
    

    new_df.drop("name")
  }
}

object PushToES extends App {
  val spark = SparkSession
    .builder()
    .appName("PushToES")
    .master("local[*]")
    .config("spark.es.nodes", "awsurl")
    .config("spark.es.port", "port")
    .config("spark.es.nodes.wan.only", "true")
    .config("spark.es.net.ssl", "true")
    .getOrCreate()

  val extractor = new ReadFromXML()

  val df = extractor.createXMLDF()
  df.saveToEs("myindex/_doc")
}

อัปเดต 1:ฉันได้แยกไฟล์เป็น 68M และในการอ่านไฟล์เดียวนี้ใช้เวลา 3.7 นาทีฉันพยายามใช้ snappy แทนตัวแปลงสัญญาณการบีบอัด gzip ดังนั้นจึงแปลงไฟล์ gz เป็นไฟล์เร็วและเพิ่มด้านล่างใน config

.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")

แต่ส่งกลับดาต้าเฟรมว่างเปล่า

df.printschema ส่งคืนเพียง "root"

อัปเดต 2:ฉันจัดการให้รันด้วยรูปแบบ lzo แล้ว .. ใช้เวลาในการคลายการบีบอัดและโหลดในดาต้าเฟรมน้อยลงมาก

เป็นความคิดที่ดีที่จะทำซ้ำไฟล์บีบอัด lzo แต่ละไฟล์ที่มีขนาด 140 MB และสร้างดาต้าเฟรมหรือไม่ หรือ

ฉันควรโหลดชุดไฟล์ 10 ไฟล์ในดาต้าเฟรมหรือไม่ หรือ

ฉันควรโหลดไฟล์บีบอัดทั้งหมด 200 lzo แต่ละไฟล์ขนาด 140MB ในดาต้าเฟรมเดียวหรือไม่? ถ้าใช่แล้วควรจัดสรรหน่วยความจำให้มาสเตอร์เท่าไหร่เพราะฉันคิดว่าจะโหลดบนมาสเตอร์

ในขณะที่อ่านไฟล์จากถัง s3 "s3a" uri สามารถปรับปรุงประสิทธิภาพได้หรือไม่? หรือ "s3" uri ใช้ได้กับ EMR?

อัปเดต 3:เพื่อทดสอบไฟล์ lzo ชุดเล็ก 10 ไฟล์ .. ฉันใช้การกำหนดค่าด้านล่าง EMR Cluster ใช้เวลาโดยรวม 56 นาทีจากขั้นตอน (แอปพลิเคชัน Spark) ใช้เวลา 48 นาทีในการประมวลผล 10 ไฟล์

1 Master - m5.xlarge 4 vCore, หน่วยความจำ 16 GiB, ที่เก็บข้อมูล EBS เท่านั้น EBS Storage: 32 GiB

2 Core - m5.xlarge 4 vCore, หน่วยความจำ 16 GiB, ที่เก็บข้อมูล EBS เท่านั้น EBS Storage: 32 GiB

ด้วยพารามิเตอร์ที่ปรับแต่ง Spark ด้านล่างเรียนรู้จาก https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.nodemanager.vmem-check-enabled": "false",
      "yarn.nodemanager.pmem-check-enabled": "false"
    }
  },
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "false"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.network.timeout": "800s",
      "spark.executor.heartbeatInterval": "60s",
      "spark.dynamicAllocation.enabled": "false",
      "spark.driver.memory": "10800M",
      "spark.executor.memory": "10800M",
      "spark.executor.cores": "2",
      "spark.executor.memoryOverhead": "1200M",
      "spark.driver.memoryOverhead": "1200M",
      "spark.memory.fraction": "0.80",
      "spark.memory.storageFraction": "0.30",
      "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.yarn.scheduler.reporterThread.maxFailures": "5",
      "spark.storage.level": "MEMORY_AND_DISK_SER",
      "spark.rdd.compress": "true",
      "spark.shuffle.compress": "true",
      "spark.shuffle.spill.compress": "true",
      "spark.default.parallelism": "4"
    }
  },
  {
    "Classification": "mapred-site",
    "Properties": {
      "mapreduce.map.output.compress": "true"
    }
  }
]

คำตอบ

2 AshishMishra Aug 29 2020 at 13:02

นี่คือเคล็ดลับบางส่วนจากด้านข้างของฉัน

อ่านข้อมูลในรูปแบบไม้ปาร์เก้หรือรูปแบบใด ๆ แบ่งพาร์ติชันใหม่ตามความต้องการของคุณ การแปลงข้อมูลอาจใช้เวลาดังนั้นโปรดอ่านอย่างเป็นประกายแล้วประมวลผล พยายามสร้างแผนที่และจัดรูปแบบข้อมูลก่อนเริ่มโหลด สิ่งนี้จะช่วยให้แก้ไขจุดบกพร่องได้ง่ายในกรณีของแผนที่ที่ซับซ้อน

  val spark = SparkSession
    .builder()
    .appName("PushToES")
    .enableHiveSupport()
    .getOrCreate()


val batchSizeInMB=4; // change it as you need
val batchRetryCount= 3
val batchWriteRetryWait = 10
val batchEntries= 10
val enableSSL = true
val wanOnly = true
val enableIdempotentInserts = true
val esNodes = [yourNode1, yourNode2, yourNode3]
var esConfig = Map[String, String]()
esConfig = esConfig + ("es.node"-> esNodes.mkString)(","))
esConfig = esConfig + ("es.port"->port.toString())
esConfig = esConfig + ("es.batch.size.bytes"->(batchSizeInMB*1024*1024).toString())
esConfig = esConfig + ("es.batch.size.entries"->batchEntries.toString())
esConfig = esConfig + ("es.batch.write.retry.count"->batchRetryCount.toString())
esConfig = esConfig + ("es.batch.write.retry.wait"->batchWriteRetryWait.toString())
esConfig = esConfig + ("es.batch.write.refresh"->"false")
if(enableSSL){
esConfig = esConfig + ("es.net.ssl"->"true")
esConfig = esConfig + ("es.net.ssl.keystore.location"->"identity.jks")
esConfig = esConfig + ("es.net.ssl.cert.allow.self.signed"->"true")
}
if (wanOnly){
esConfig = esConfig + ("es.nodes.wan.only"->"true")
}

// This helps if some task fails , so data won't be dublicate
if(enableIdempotentInserts){
  esConfig = esConfig + ("es.mapping.id" ->"your_primary_key_column")
}

val df = "suppose you created it using parquet format or any format"

ข้อมูลจริงถูกแทรกที่ระดับตัวดำเนินการและไม่ใช่ที่ระดับไดรเวอร์ลองให้เพียง 2-4 คอร์แก่ผู้ดำเนินการแต่ละตัวเพื่อไม่ให้เปิดการเชื่อมต่อจำนวนมากในเวลาเดียวกัน คุณสามารถเปลี่ยนขนาดเอกสารหรือรายการได้ตามความสะดวกของคุณ โปรดอ่านเกี่ยวกับพวกเขา

เขียนข้อมูลเป็นกลุ่มซึ่งจะช่วยคุณในการโหลดชุดข้อมูลขนาดใหญ่ในอนาคตและลองสร้างแผนที่ดัชนีก่อนที่จะโหลดข้อมูล และชอบข้อมูลที่ซ้อนกันเล็กน้อยเนื่องจากคุณมีฟังก์ชันนั้นใน ES ฉันหมายถึงพยายามเก็บคีย์หลักไว้ในข้อมูลของคุณ

val dfToInsert = df.withColumn("salt", ceil(rand())*10).cast("Int").persist()
for (i<-0 to 10){
val start = System.currentTimeMillis
val finalDF = dfToInsert.filter($"salt"===i) val counts = finalDF.count() println(s"count of record in chunk $i -> $counts") finalDF.drop("salt").saveToES("indexName",esConfig) val totalTime = System.currentTimeMillis - start println(s"ended Loading data for chunk $i. Total time taken in Seconds : ${totalTime/1000}")
}

พยายามตั้งชื่อแทนให้กับ DF สุดท้ายของคุณและอัปเดตในแต่ละครั้ง เนื่องจากคุณไม่ต้องการรบกวนเซิร์ฟเวอร์การผลิตของคุณในขณะโหลด

หน่วยความจำ

ไม่สามารถเป็นแบบทั่วไปได้ แต่เพื่อให้คุณเริ่มเตะ

ให้ผู้ดำเนินการ 10-40 คนตามขนาดข้อมูลหรืองบประมาณของคุณ เก็บแต่ละตัวดำเนินการขนาด 8-16gb และค่าใช้จ่าย 5 gb (อาจแตกต่างกันไปเนื่องจากเอกสารของคุณอาจมีขนาดใหญ่หรือเล็ก) ถ้าจำเป็นให้ maxResultSize 8GB ไดร์เวอร์สามารถมี 5 คอร์และแรม 30 ก

สิ่งที่สำคัญ.

  • คุณต้องเก็บ config ไว้ในตัวแปรเนื่องจากคุณสามารถเปลี่ยนแปลงได้ตามดัชนี

  • การแทรกเกิดขึ้นกับตัวดำเนินการที่ไม่ได้อยู่ในไดรเวอร์ดังนั้นพยายามให้การเชื่อมต่อน้อยลงในขณะที่เขียน แต่ละคอร์จะเปิดหนึ่งการเชื่อมต่อ

  • การแทรกเอกสารอาจมีขนาดรายการเป็นชุดหรือขนาดเอกสาร เปลี่ยนตามการเรียนรู้ของคุณในขณะที่ดำเนินการหลาย ๆ ครั้ง

  • พยายามทำให้โซลูชันของคุณมีประสิทธิภาพ มันควรจะจัดการข้อมูลทุกขนาดได้ การอ่านและการเขียนสามารถปรับแต่งได้ แต่พยายามจัดรูปแบบข้อมูลของคุณตามแผนผังเอกสารก่อนเริ่มโหลด สิ่งนี้จะช่วยในการดีบั๊กได้ง่ายหากเอกสารข้อมูลมีความซับซ้อนเล็กน้อยและซ้อนกัน

  • นอกจากนี้ยังสามารถปรับหน่วยความจำของการส่งประกายไฟได้ตามการเรียนรู้ของคุณในขณะที่ทำงาน เพียงแค่ลองดูเวลาแทรกตามหน่วยความจำและขนาดแบทช์ที่แตกต่างกัน

  • สิ่งที่สำคัญที่สุดคือการออกแบบ หากคุณใช้ ES มากกว่าสร้างแผนที่ของคุณโดยคำนึงถึงคำค้นหาและข้อกำหนดสุดท้าย

3 PubuduSitinamaluwa Aug 28 2020 at 20:31

ไม่ใช่คำตอบที่สมบูรณ์ แต่ยังคงยาวสำหรับความคิดเห็น มีเคล็ดลับเล็กน้อยที่อยากจะแนะนำ

มันไม่ชัดเจน แต่ฉันถือว่าสิ่งที่คุณกังวลคือเวลาดำเนินการ ตามที่แนะนำในความคิดเห็นคุณสามารถปรับปรุงประสิทธิภาพได้โดยการเพิ่มโหนด / ตัวดำเนินการในคลัสเตอร์ หากไฟล์ gzip ถูกโหลดโดยไม่มีการแบ่งพาร์ติชันใน spark คุณควรแบ่งให้มีขนาดที่เหมาะสม (ไม่เล็กเกินไป - จะทำให้การประมวลผลช้าไม่ใหญ่เกินไป - ตัวดำเนินการจะเรียกใช้ OOM)

parquetเป็นรูปแบบไฟล์ที่ดีเมื่อทำงานกับ Spark หากคุณสามารถแปลง XML ของคุณเป็นไม้ปาร์เก้ อัดแน่นและน้ำหนักเบา

การอ่านความคิดเห็นของคุณcoalesceไม่ได้ทำการสุ่มแบบเต็ม อัลกอริทึมการรวมกันเปลี่ยนจำนวนโหนดโดยย้ายข้อมูลจากพาร์ติชันบางส่วนไปยังพาร์ติชันที่มีอยู่ เห็นได้ชัดว่าอัลกอริทึมนี้ไม่สามารถเพิ่มจำนวนพาร์ติชันได้ ใช้repartitionแทน การดำเนินการมีค่าใช้จ่ายสูง แต่สามารถเพิ่มจำนวนพาร์ติชันได้ ตรวจสอบข้อมูลเพิ่มเติมได้ที่:https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4