ใช้ Apache Spark อย่างมีประสิทธิภาพเพื่อส่งข้อมูลไปยัง elasticsearch
ฉันมี 27 ล้านเรกคอร์ดในไฟล์ xml ที่ฉันต้องการผลักดันมันลงใน elasticsearch index ด้านล่างนี้คือข้อมูลโค้ดที่เขียนด้วย spark scala ฉันกำลังสร้างกระปุกงาน spark และจะทำงานบน AWS EMR
ฉันจะใช้ประกายไฟอย่างมีประสิทธิภาพเพื่อทำแบบฝึกหัดนี้ได้อย่างไร? กรุณาชี้แนะ
ฉันมี xml gzipped ที่ 12.5 gb ซึ่งฉันกำลังโหลดลงใน spark dataframe ฉันเพิ่งเริ่มใช้ Spark .. (ฉันควรแยกไฟล์ gzip นี้หรือไม่หรือผู้ดำเนินการ spark จะดูแลมัน?)
class ReadFromXML {
def createXMLDF(): DataFrame = {
val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
import spark.implicits._
val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)
var new_df: DataFrame = null
new_df = m_df.select($"CountryCode"(0).as("countryCode"), $"PostalCode"(0).as("postalCode"),
$"state"(0).as("state"), $"county"(0).as("county"),
$"city"(0).as("city"), $"district"(0).as("district"),
$"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
$"FullStreetName"(0).as("street"), functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal")) .where($"LocationList.Location._primary" === "true")
.where("(array_contains(_languageCode, 'en'))")
.where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
new_df.drop("name")
}
}
object PushToES extends App {
val spark = SparkSession
.builder()
.appName("PushToES")
.master("local[*]")
.config("spark.es.nodes", "awsurl")
.config("spark.es.port", "port")
.config("spark.es.nodes.wan.only", "true")
.config("spark.es.net.ssl", "true")
.getOrCreate()
val extractor = new ReadFromXML()
val df = extractor.createXMLDF()
df.saveToEs("myindex/_doc")
}
อัปเดต 1:ฉันได้แยกไฟล์เป็น 68M และในการอ่านไฟล์เดียวนี้ใช้เวลา 3.7 นาทีฉันพยายามใช้ snappy แทนตัวแปลงสัญญาณการบีบอัด gzip ดังนั้นจึงแปลงไฟล์ gz เป็นไฟล์เร็วและเพิ่มด้านล่างใน config
.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
แต่ส่งกลับดาต้าเฟรมว่างเปล่า
df.printschema ส่งคืนเพียง "root"
อัปเดต 2:ฉันจัดการให้รันด้วยรูปแบบ lzo แล้ว .. ใช้เวลาในการคลายการบีบอัดและโหลดในดาต้าเฟรมน้อยลงมาก
เป็นความคิดที่ดีที่จะทำซ้ำไฟล์บีบอัด lzo แต่ละไฟล์ที่มีขนาด 140 MB และสร้างดาต้าเฟรมหรือไม่ หรือ
ฉันควรโหลดชุดไฟล์ 10 ไฟล์ในดาต้าเฟรมหรือไม่ หรือ
ฉันควรโหลดไฟล์บีบอัดทั้งหมด 200 lzo แต่ละไฟล์ขนาด 140MB ในดาต้าเฟรมเดียวหรือไม่? ถ้าใช่แล้วควรจัดสรรหน่วยความจำให้มาสเตอร์เท่าไหร่เพราะฉันคิดว่าจะโหลดบนมาสเตอร์
ในขณะที่อ่านไฟล์จากถัง s3 "s3a" uri สามารถปรับปรุงประสิทธิภาพได้หรือไม่? หรือ "s3" uri ใช้ได้กับ EMR?
อัปเดต 3:เพื่อทดสอบไฟล์ lzo ชุดเล็ก 10 ไฟล์ .. ฉันใช้การกำหนดค่าด้านล่าง EMR Cluster ใช้เวลาโดยรวม 56 นาทีจากขั้นตอน (แอปพลิเคชัน Spark) ใช้เวลา 48 นาทีในการประมวลผล 10 ไฟล์
1 Master - m5.xlarge 4 vCore, หน่วยความจำ 16 GiB, ที่เก็บข้อมูล EBS เท่านั้น EBS Storage: 32 GiB
2 Core - m5.xlarge 4 vCore, หน่วยความจำ 16 GiB, ที่เก็บข้อมูล EBS เท่านั้น EBS Storage: 32 GiB
ด้วยพารามิเตอร์ที่ปรับแต่ง Spark ด้านล่างเรียนรู้จาก https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
[
{
"Classification": "yarn-site",
"Properties": {
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "false"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.network.timeout": "800s",
"spark.executor.heartbeatInterval": "60s",
"spark.dynamicAllocation.enabled": "false",
"spark.driver.memory": "10800M",
"spark.executor.memory": "10800M",
"spark.executor.cores": "2",
"spark.executor.memoryOverhead": "1200M",
"spark.driver.memoryOverhead": "1200M",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.yarn.scheduler.reporterThread.maxFailures": "5",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.default.parallelism": "4"
}
},
{
"Classification": "mapred-site",
"Properties": {
"mapreduce.map.output.compress": "true"
}
}
]
คำตอบ
นี่คือเคล็ดลับบางส่วนจากด้านข้างของฉัน
อ่านข้อมูลในรูปแบบไม้ปาร์เก้หรือรูปแบบใด ๆ แบ่งพาร์ติชันใหม่ตามความต้องการของคุณ การแปลงข้อมูลอาจใช้เวลาดังนั้นโปรดอ่านอย่างเป็นประกายแล้วประมวลผล พยายามสร้างแผนที่และจัดรูปแบบข้อมูลก่อนเริ่มโหลด สิ่งนี้จะช่วยให้แก้ไขจุดบกพร่องได้ง่ายในกรณีของแผนที่ที่ซับซ้อน
val spark = SparkSession
.builder()
.appName("PushToES")
.enableHiveSupport()
.getOrCreate()
val batchSizeInMB=4; // change it as you need
val batchRetryCount= 3
val batchWriteRetryWait = 10
val batchEntries= 10
val enableSSL = true
val wanOnly = true
val enableIdempotentInserts = true
val esNodes = [yourNode1, yourNode2, yourNode3]
var esConfig = Map[String, String]()
esConfig = esConfig + ("es.node"-> esNodes.mkString)(","))
esConfig = esConfig + ("es.port"->port.toString())
esConfig = esConfig + ("es.batch.size.bytes"->(batchSizeInMB*1024*1024).toString())
esConfig = esConfig + ("es.batch.size.entries"->batchEntries.toString())
esConfig = esConfig + ("es.batch.write.retry.count"->batchRetryCount.toString())
esConfig = esConfig + ("es.batch.write.retry.wait"->batchWriteRetryWait.toString())
esConfig = esConfig + ("es.batch.write.refresh"->"false")
if(enableSSL){
esConfig = esConfig + ("es.net.ssl"->"true")
esConfig = esConfig + ("es.net.ssl.keystore.location"->"identity.jks")
esConfig = esConfig + ("es.net.ssl.cert.allow.self.signed"->"true")
}
if (wanOnly){
esConfig = esConfig + ("es.nodes.wan.only"->"true")
}
// This helps if some task fails , so data won't be dublicate
if(enableIdempotentInserts){
esConfig = esConfig + ("es.mapping.id" ->"your_primary_key_column")
}
val df = "suppose you created it using parquet format or any format"
ข้อมูลจริงถูกแทรกที่ระดับตัวดำเนินการและไม่ใช่ที่ระดับไดรเวอร์ลองให้เพียง 2-4 คอร์แก่ผู้ดำเนินการแต่ละตัวเพื่อไม่ให้เปิดการเชื่อมต่อจำนวนมากในเวลาเดียวกัน คุณสามารถเปลี่ยนขนาดเอกสารหรือรายการได้ตามความสะดวกของคุณ โปรดอ่านเกี่ยวกับพวกเขา
เขียนข้อมูลเป็นกลุ่มซึ่งจะช่วยคุณในการโหลดชุดข้อมูลขนาดใหญ่ในอนาคตและลองสร้างแผนที่ดัชนีก่อนที่จะโหลดข้อมูล และชอบข้อมูลที่ซ้อนกันเล็กน้อยเนื่องจากคุณมีฟังก์ชันนั้นใน ES ฉันหมายถึงพยายามเก็บคีย์หลักไว้ในข้อมูลของคุณ
val dfToInsert = df.withColumn("salt", ceil(rand())*10).cast("Int").persist()
for (i<-0 to 10){
val start = System.currentTimeMillis
val finalDF = dfToInsert.filter($"salt"===i) val counts = finalDF.count() println(s"count of record in chunk $i -> $counts") finalDF.drop("salt").saveToES("indexName",esConfig) val totalTime = System.currentTimeMillis - start println(s"ended Loading data for chunk $i. Total time taken in Seconds : ${totalTime/1000}")
}
พยายามตั้งชื่อแทนให้กับ DF สุดท้ายของคุณและอัปเดตในแต่ละครั้ง เนื่องจากคุณไม่ต้องการรบกวนเซิร์ฟเวอร์การผลิตของคุณในขณะโหลด
หน่วยความจำ
ไม่สามารถเป็นแบบทั่วไปได้ แต่เพื่อให้คุณเริ่มเตะ
ให้ผู้ดำเนินการ 10-40 คนตามขนาดข้อมูลหรืองบประมาณของคุณ เก็บแต่ละตัวดำเนินการขนาด 8-16gb และค่าใช้จ่าย 5 gb (อาจแตกต่างกันไปเนื่องจากเอกสารของคุณอาจมีขนาดใหญ่หรือเล็ก) ถ้าจำเป็นให้ maxResultSize 8GB ไดร์เวอร์สามารถมี 5 คอร์และแรม 30 ก
สิ่งที่สำคัญ.
คุณต้องเก็บ config ไว้ในตัวแปรเนื่องจากคุณสามารถเปลี่ยนแปลงได้ตามดัชนี
การแทรกเกิดขึ้นกับตัวดำเนินการที่ไม่ได้อยู่ในไดรเวอร์ดังนั้นพยายามให้การเชื่อมต่อน้อยลงในขณะที่เขียน แต่ละคอร์จะเปิดหนึ่งการเชื่อมต่อ
การแทรกเอกสารอาจมีขนาดรายการเป็นชุดหรือขนาดเอกสาร เปลี่ยนตามการเรียนรู้ของคุณในขณะที่ดำเนินการหลาย ๆ ครั้ง
พยายามทำให้โซลูชันของคุณมีประสิทธิภาพ มันควรจะจัดการข้อมูลทุกขนาดได้ การอ่านและการเขียนสามารถปรับแต่งได้ แต่พยายามจัดรูปแบบข้อมูลของคุณตามแผนผังเอกสารก่อนเริ่มโหลด สิ่งนี้จะช่วยในการดีบั๊กได้ง่ายหากเอกสารข้อมูลมีความซับซ้อนเล็กน้อยและซ้อนกัน
นอกจากนี้ยังสามารถปรับหน่วยความจำของการส่งประกายไฟได้ตามการเรียนรู้ของคุณในขณะที่ทำงาน เพียงแค่ลองดูเวลาแทรกตามหน่วยความจำและขนาดแบทช์ที่แตกต่างกัน
สิ่งที่สำคัญที่สุดคือการออกแบบ หากคุณใช้ ES มากกว่าสร้างแผนที่ของคุณโดยคำนึงถึงคำค้นหาและข้อกำหนดสุดท้าย
ไม่ใช่คำตอบที่สมบูรณ์ แต่ยังคงยาวสำหรับความคิดเห็น มีเคล็ดลับเล็กน้อยที่อยากจะแนะนำ
มันไม่ชัดเจน แต่ฉันถือว่าสิ่งที่คุณกังวลคือเวลาดำเนินการ ตามที่แนะนำในความคิดเห็นคุณสามารถปรับปรุงประสิทธิภาพได้โดยการเพิ่มโหนด / ตัวดำเนินการในคลัสเตอร์ หากไฟล์ gzip ถูกโหลดโดยไม่มีการแบ่งพาร์ติชันใน spark คุณควรแบ่งให้มีขนาดที่เหมาะสม (ไม่เล็กเกินไป - จะทำให้การประมวลผลช้าไม่ใหญ่เกินไป - ตัวดำเนินการจะเรียกใช้ OOM)
parquet
เป็นรูปแบบไฟล์ที่ดีเมื่อทำงานกับ Spark หากคุณสามารถแปลง XML ของคุณเป็นไม้ปาร์เก้ อัดแน่นและน้ำหนักเบา
การอ่านความคิดเห็นของคุณcoalesce
ไม่ได้ทำการสุ่มแบบเต็ม อัลกอริทึมการรวมกันเปลี่ยนจำนวนโหนดโดยย้ายข้อมูลจากพาร์ติชันบางส่วนไปยังพาร์ติชันที่มีอยู่ เห็นได้ชัดว่าอัลกอริทึมนี้ไม่สามารถเพิ่มจำนวนพาร์ติชันได้ ใช้repartition
แทน การดำเนินการมีค่าใช้จ่ายสูง แต่สามารถเพิ่มจำนวนพาร์ติชันได้ ตรวจสอบข้อมูลเพิ่มเติมได้ที่:https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4