È possibile eliminare i file di parquet sottostanti senza influire negativamente su DeltaLake _delta_log
L'utilizzo .vacuum()
su un tavolo DeltaLake è molto lento (vedere la tabella Delta Lake (OSS) su EMR e S3 - Il vuoto richiede molto tempo senza lavori ).
Se ho cancellato manualmente i file parquet sottostanti e non ho aggiunto un nuovo json
file di registro o aggiunto un nuovo .checkpoint.parquet
file e cambiato il _delta_log/_last_checkpoint
file che punta ad esso; quali sarebbero gli impatti negativi sulla tabella DeltaLake, se ce ne sarebbero?
Ovviamente viaggiare nel tempo, cioè caricare una versione precedente del tavolo che si basava sui file di parquet che ho rimosso, non avrebbe funzionato. Quello che voglio sapere è se ci sarebbero problemi di lettura, scrittura o aggiunta alla versione corrente della tabella DeltaLake?
Cosa sto pensando di fare in pySpark:
### Assuming a working SparkSession as `spark`
from subprocess import check_output
import json
from pyspark.sql import functions as F
awscmd = "aws s3 cp s3://my_s3_bucket/delta/_delta_log/_last_checkpoint -"
last_checkpoint = str(json.loads(check_output(awscmd, shell=True).decode("utf-8")).get('version')).zfill(20)
s3_bucket_path = "s3a://my_s3_bucket/delta/"
df_chkpt_del = (
spark.read.format("parquet")
.load(f"{s3_bucket_path}/_delta_log/{last_checkpoint}.checkpoint.parquet")
.where(F.col("remove").isNotNull())
.select("remove.*")
.withColumn("deletionTimestamp", F.from_unixtime(F.col("deletionTimestamp")/1000))
.withColumn("delDateDiffDays", F.datediff(F.col("deletionTimestamp"), F.current_timestamp()))
.where(F.col("delDateDiffDays") < -7 )
)
Ci sono molte opzioni da qui. Uno potrebbe essere:
df_chkpt_del.select("path").toPandas().to_csv("files_to_delete.csv", index=False)
Dove ho potuto leggere files_to_delete.csv
in un array bash e quindi utilizzare un semplice for
ciclo bash passando ogni percorso s3 del file parquet a un aws s3 rm
comando per rimuovere i file uno per uno.
Potrebbe essere più lento di vacuum()
, ma almeno non consumerà le risorse del cluster mentre è in funzione.
Se lo faccio, dovrò anche:
- scrivere un nuovo
_delta_log/000000000000000#####.json
file che documenti correttamente queste modifiche? - scrivere un nuovo
000000000000000#####.checkpoint.parquet
file che documenti correttamente queste modifiche e modificare il_delta_log/_last_checkpoint
file in modo che punti a quelcheckpoint.parquet
file?
La seconda opzione sarebbe più semplice.
Tuttavia, se non ci saranno effetti negativi se rimuovo i file e non modifico nulla in _delta_log
, allora sarebbe il più semplice.
Risposte
TLDR. Rispondendo a questa domanda.
Se ho cancellato manualmente i file parquet sottostanti e non ho aggiunto un nuovo file di log json o un nuovo file .checkpoint.parquet e ho cambiato il file _delta_log / _last_checkpoint che punta ad esso; quali sarebbero gli impatti negativi sulla tabella DeltaLake, se ce ne sarebbero?
Sì , questo potrebbe potenzialmente danneggiare la tua tabella delta.
Lasciatemi brevemente rispondere a come delta-lake legge una versione utilizzando _delta_log
.
Se vuoi leggere la versione x
, andrà al registro delta di tutte le versioni da 1
a x-1
e farà una somma corrente di file parquet da leggere. Il riepilogo di questo processo viene salvato come una .checkpoint
versione ogni 10 per rendere efficiente questo processo di esecuzione di sum.
Cosa intendo con questa somma corrente?
Supponiamo, il
registro della versione 1 dice, aggiungi il add file_1, file_2, file_3
registro della versione 2, aggiungidelete file_1, file_2, and add file_4
Quindi, leggendo la versione n. 2, le istruzioni totali saranno add file_1, file_2, file_3 -> delete file_1, file_2, and add file_4
Quindi, i file risultanti letti saranno file_3 e file_4.
Cosa succede se elimini un parquet da un file system?
Diciamo che nella versione 3, elimini file_4
dal file system. Se non lo usi, il .vacuum
registro delta non saprà che file_4
non è presente, proverà a leggerlo e fallirà.