Usa Apache Spark in modo efficiente per inviare i dati a elasticsearch
Ho 27 milioni di record in un file xml, che voglio inserire nell'indice elasticsearch Di seguito è riportato lo snippet di codice scritto in spark scala, creerò un jar di lavoro spark e girerò su AWS EMR
Come posso utilizzare in modo efficiente la scintilla per completare questo esercizio? Per favore guida.
Ho un xml gzip di 12,5 GB che sto caricando in spark dataframe. Sono nuovo di Spark ... (Devo dividere questo file gzip? O se ne occuperanno gli esecutori Spark?)
class ReadFromXML {
def createXMLDF(): DataFrame = {
val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
import spark.implicits._
val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)
var new_df: DataFrame = null
new_df = m_df.select($"CountryCode"(0).as("countryCode"), $"PostalCode"(0).as("postalCode"),
$"state"(0).as("state"), $"county"(0).as("county"),
$"city"(0).as("city"), $"district"(0).as("district"),
$"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
$"FullStreetName"(0).as("street"), functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal")) .where($"LocationList.Location._primary" === "true")
.where("(array_contains(_languageCode, 'en'))")
.where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
new_df.drop("name")
}
}
object PushToES extends App {
val spark = SparkSession
.builder()
.appName("PushToES")
.master("local[*]")
.config("spark.es.nodes", "awsurl")
.config("spark.es.port", "port")
.config("spark.es.nodes.wan.only", "true")
.config("spark.es.net.ssl", "true")
.getOrCreate()
val extractor = new ReadFromXML()
val df = extractor.createXMLDF()
df.saveToEs("myindex/_doc")
}
Aggiornamento 1: ho suddiviso i file in 68 milioni ciascuno e per leggere questo singolo file ci vogliono 3,7 minuti Ho provato a usare snappy invece del codec di compressione gzip Quindi ho convertito il file gz in file snappy e aggiunto di seguito in config
.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
Ma restituisce dataframe vuoto
df.printschema restituisce solo "root"
Aggiornamento 2: sono riuscito a eseguire con il formato lzo..ci vuole molto meno tempo per decomprimere e caricare in dataframe.
È una buona idea iterare su ogni file compresso lzo di dimensioni 140 MB e creare dataframe? o
devo caricare un set di 10 file in un dataframe? o
devo caricare tutti i 200 file compressi lzo ciascuno di 140 MB in un singolo dataframe ?. se sì, quanta memoria dovrebbe essere allocata al master poiché penso che verrà caricato sul master?
Durante la lettura del file dal bucket s3, "s3a" uri può migliorare le prestazioni? o "s3" uri va bene per EMR?
Aggiornamento 3: per testare un piccolo set di 10 file lzo .. Ho usato la configurazione di seguito. EMR Cluster ha impiegato complessivamente 56 minuti da cui il passaggio (applicazione Spark) ha impiegato 48 minuti per elaborare 10 file
1 Master - m5.xlarge 4 vCore, 16 GiB di memoria, solo storage EBS Archiviazione EBS: 32 GiB
2 core - m5.xlarge 4 vCore, 16 GiB di memoria, solo storage EBS Storage EBS: 32 GiB
Con sotto i parametri sintonizzati Spark appresi da https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
[
{
"Classification": "yarn-site",
"Properties": {
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "false"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.network.timeout": "800s",
"spark.executor.heartbeatInterval": "60s",
"spark.dynamicAllocation.enabled": "false",
"spark.driver.memory": "10800M",
"spark.executor.memory": "10800M",
"spark.executor.cores": "2",
"spark.executor.memoryOverhead": "1200M",
"spark.driver.memoryOverhead": "1200M",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.yarn.scheduler.reporterThread.maxFailures": "5",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.default.parallelism": "4"
}
},
{
"Classification": "mapred-site",
"Properties": {
"mapreduce.map.output.compress": "true"
}
}
]
Risposte
Ecco alcuni suggerimenti da parte mia.
Leggi i dati in formato parquet o in qualsiasi formato. Ripartizionalo secondo le tue necessità. La conversione dei dati può richiedere tempo, quindi leggilo in Spark e quindi elaboralo. Prova a creare la mappa e formattare i dati prima di iniziare il caricamento. Ciò aiuterebbe un facile debug in caso di mappa complessa.
val spark = SparkSession
.builder()
.appName("PushToES")
.enableHiveSupport()
.getOrCreate()
val batchSizeInMB=4; // change it as you need
val batchRetryCount= 3
val batchWriteRetryWait = 10
val batchEntries= 10
val enableSSL = true
val wanOnly = true
val enableIdempotentInserts = true
val esNodes = [yourNode1, yourNode2, yourNode3]
var esConfig = Map[String, String]()
esConfig = esConfig + ("es.node"-> esNodes.mkString)(","))
esConfig = esConfig + ("es.port"->port.toString())
esConfig = esConfig + ("es.batch.size.bytes"->(batchSizeInMB*1024*1024).toString())
esConfig = esConfig + ("es.batch.size.entries"->batchEntries.toString())
esConfig = esConfig + ("es.batch.write.retry.count"->batchRetryCount.toString())
esConfig = esConfig + ("es.batch.write.retry.wait"->batchWriteRetryWait.toString())
esConfig = esConfig + ("es.batch.write.refresh"->"false")
if(enableSSL){
esConfig = esConfig + ("es.net.ssl"->"true")
esConfig = esConfig + ("es.net.ssl.keystore.location"->"identity.jks")
esConfig = esConfig + ("es.net.ssl.cert.allow.self.signed"->"true")
}
if (wanOnly){
esConfig = esConfig + ("es.nodes.wan.only"->"true")
}
// This helps if some task fails , so data won't be dublicate
if(enableIdempotentInserts){
esConfig = esConfig + ("es.mapping.id" ->"your_primary_key_column")
}
val df = "suppose you created it using parquet format or any format"
In realtà i dati vengono inseriti a livello di esecutore e non a livello di driver provare a dare solo 2-4 core a ciascun esecutore in modo che non molte connessioni siano aperte contemporaneamente. Puoi variare la dimensione del documento o le voci secondo la tua facilità. Si prega di leggere su di loro.
scrivere i dati in blocchi questo ti aiuterebbe a caricare un set di dati di grandi dimensioni in futuro e provare a creare la mappa dell'indice prima di caricare i dati. E preferisci piccoli dati annidati poiché hai quella funzionalità in ES, voglio dire, cerca di mantenere una chiave primaria nei tuoi dati.
val dfToInsert = df.withColumn("salt", ceil(rand())*10).cast("Int").persist()
for (i<-0 to 10){
val start = System.currentTimeMillis
val finalDF = dfToInsert.filter($"salt"===i) val counts = finalDF.count() println(s"count of record in chunk $i -> $counts") finalDF.drop("salt").saveToES("indexName",esConfig) val totalTime = System.currentTimeMillis - start println(s"ended Loading data for chunk $i. Total time taken in Seconds : ${totalTime/1000}")
}
Prova a dare un alias al tuo DF finale e aggiornalo a ogni esecuzione. Poiché non vorresti disturbare il tuo server di produzione al momento del caricamento
Memoria
Questo non può essere generico. Ma solo per darti un calcio d'inizio
mantieni 10-40 esecutore secondo la dimensione o il budget dei tuoi dati. mantieni le dimensioni di ogni esecutore 8-16 GB e 5 GB di overhead. (Questo può variare in quanto il documento può essere di dimensioni grandi o piccole). Se necessario, mantieni maxResultSize 8gb. Il driver può avere 5 core e 30 g di ram
Cose importanti.
È necessario mantenere la configurazione nella variabile poiché è possibile modificarla come da Indice
L'inserimento avviene sull'executor non sul driver, quindi cerca di mantenere una connessione minore durante la scrittura. Ogni core aprirà una connessione.
l'inserimento del documento può avvenire con la dimensione della voce batch o la dimensione del documento. Cambialo secondo il tuo apprendimento mentre esegui più corse.
Cerca di rendere robusta la tua soluzione. Dovrebbe essere in grado di gestire tutti i dati di dimensione. La lettura e la scrittura possono essere ottimizzate, ma prova a formattare i dati come da mappa del documento prima di iniziare il caricamento. Ciò aiuterebbe nel debug facile, se il documento di dati è poco complesso e annidato.
La memoria di Spark-submit può anche essere regolata in base al tuo apprendimento durante l'esecuzione di lavori. Basta provare a guardare il tempo di inserimento variando la memoria e la dimensione del batch.
La cosa più importante è il design. Se stai utilizzando ES, crea la tua mappa tenendo presenti le query finali e i requisiti.
Non una risposta completa ma ancora un po 'lungo per un commento. Ci sono alcuni suggerimenti che vorrei suggerire.
Non è chiaro ma presumo che la tua preoccupazione sia il tempo di esecuzione. Come suggerito nei commenti è possibile migliorare le prestazioni aggiungendo più nodi / esecutori al cluster. Se il file gzip viene caricato senza partizionare in Spark, è necessario dividerlo in una dimensione ragionevole. (Non troppo piccolo - Questo renderà lenta l'elaborazione. Non troppo grande - gli esecutori eseguiranno OOM).
parquet
è un buon formato di file quando si lavora con Spark. Se puoi convertire il tuo XML in parquet. È super compresso e leggero.
Leggendo i tuoi commenti, coalesce
non fa uno shuffle completo. L'algoritmo di coalescenza modifica il numero di nodi spostando i dati da alcune partizioni a partizioni esistenti. Questo algoritmo ovviamente non può aumentare il numero di partizioni. Usa repartition
invece. L'operazione è costosa ma può aumentare il numero di partizioni. Controlla questo per ulteriori fatti:https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4