Ottimizzazione Spark - join - numero di attività molto basso - OOM

Aug 24 2020

La mia applicazione Spark non riesce con questo errore: Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Questo è ciò che ottengo quando controllo il registro del containger:java.lang.OutOfMemoryError: Java heap space

La mia applicazione è principalmente ottenere una tabella, quindi unire tabelle diverse che ho letto da aws S3:

var result = readParquet(table1)  
val table2 = readParquet(table2)

result = result.join(table2 , result(primaryKey) === table2(foreignKey))

val table3 = readParquet(table3)

result = result.join(table3 , result(primaryKey) === table3(foreignKey))

val table4 = readParquet(table4)

result = result.join(table4 , result(primaryKey) === table4(foreignKey))

e così via

La mia applicazione non riesce quando provo a salvare il dataframe dei risultati su postgresql utilizzando:

result.toDF(df.columns.map(x => x.toLowerCase()): _*).write
  .mode("overwrite")
  .format("jdbc")
  .option(JDBCOptions.JDBC_TABLE_NAME, table)
  .save()

Sul mio stage di iscrizione non riuscito ho un numero di compiti molto basso: 6 compiti per 4 esecutori

Perché il mio stage stage genera 2 posti di lavoro?

Il primo è completato con 426 attività:

e il secondo sta fallendo:

La mia configurazione spark-submit:

dynamicAllocation = true  
num core = 2
driver memory = 6g
executor memory = 6g
max num executor = 10
min num executor = 1
spark.default.parallelism = 400
spark.sql.shuffle.partitions = 400

Ho provato con più risorse ma lo stesso problema:

 num core = 5
 driver memory = 16g
 executor memory = 16g
 num executor = 20

Penso che tutti i dati vadano nella stessa partizione / esecutore anche con un numero predefinito di 400 partizioni e questo causa un errore OOM

Ho provato (senza successo): persit data
broadcastJoin, ma la mia tabella non è abbastanza piccola per trasmetterla alla fine.
ripartizione su un numero più alto (4000) e fare un conteggio tra ogni join per eseguire un'azione:

la mia tabella principale cresce molto velocemente:
(numero di righe) 40 -> 68 -> 7304 -> 946832 -> 123032 864 -> 246064864 -> (troppo tempo dopo)
Tuttavia la dimensione dei dati è molto bassa

Se guardo le metriche delle attività, una cosa interessante è che i miei dati sono distorti (non sono davvero sicuro)
Nell'ultima azione di conteggio, posso vedere che ~ 120 attività esegue un'azione, con ~ 10 MB di dati di input per 100 record e 12 secondi e le altre 3880 attività non fanno assolutamente nulla (3 ms, 0 record 16B (metadati?)):

Risposte

1 kavetiraviteja Aug 24 2020 at 16:06

memoria driver = 16g è una memoria troppo alta e non necessaria. usalo solo quando hai una vasta raccolta di dati da padroneggiare con azioni come (collect ()) assicurati di aumentare spark.maxResult.size se questo è il caso

puoi fare le seguenti cose

- Esegui la ripartizione durante la lettura dei file readParquet (table1) .repartition (x). Se una delle tabelle è piccola, puoi trasmetterla e rimuovere join invece usa mapPartition e usa una variabile di trasmissione come cache di ricerca.

(O)

- Seleziona una colonna distribuita uniformemente e ripartiziona la tabella di conseguenza utilizzando quella particolare colonna.

Due punti che devo premere guardando nelle statistiche sopra. il tuo lavoro ha un ritardo di pianificazione elevato che è causato da troppe attività e le statistiche delle tue attività poche statistiche vengono lanciate con dati di input come 10 byte e poche lanciate con 9 MB ... ovviamente, c'è asimmetria dei dati qui ... come hai detto il primo è completato con 426 attività ma con 4000 come conteggio delle ripartizioni dovrebbe avviare più attività

per favore guarda https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c ... per ulteriori approfondimenti.