È possibile eliminare i file di parquet sottostanti senza influire negativamente su DeltaLake _delta_log

Aug 22 2020

L'utilizzo .vacuum()su un tavolo DeltaLake è molto lento (vedere la tabella Delta Lake (OSS) su EMR e S3 - Il vuoto richiede molto tempo senza lavori ).

Se ho cancellato manualmente i file parquet sottostanti e non ho aggiunto un nuovo jsonfile di registro o aggiunto un nuovo .checkpoint.parquetfile e cambiato il _delta_log/_last_checkpointfile che punta ad esso; quali sarebbero gli impatti negativi sulla tabella DeltaLake, se ce ne sarebbero?

Ovviamente viaggiare nel tempo, cioè caricare una versione precedente del tavolo che si basava sui file di parquet che ho rimosso, non avrebbe funzionato. Quello che voglio sapere è se ci sarebbero problemi di lettura, scrittura o aggiunta alla versione corrente della tabella DeltaLake?

Cosa sto pensando di fare in pySpark:

### Assuming a working SparkSession as `spark`

from subprocess import check_output
import json
from pyspark.sql import functions as F

awscmd = "aws s3 cp s3://my_s3_bucket/delta/_delta_log/_last_checkpoint -"
last_checkpoint = str(json.loads(check_output(awscmd, shell=True).decode("utf-8")).get('version')).zfill(20)

s3_bucket_path = "s3a://my_s3_bucket/delta/"

df_chkpt_del = (
    spark.read.format("parquet")
    .load(f"{s3_bucket_path}/_delta_log/{last_checkpoint}.checkpoint.parquet")
    .where(F.col("remove").isNotNull())
    .select("remove.*")
    .withColumn("deletionTimestamp", F.from_unixtime(F.col("deletionTimestamp")/1000))
    .withColumn("delDateDiffDays", F.datediff(F.col("deletionTimestamp"), F.current_timestamp()))
    .where(F.col("delDateDiffDays") < -7 )
)

Ci sono molte opzioni da qui. Uno potrebbe essere:

df_chkpt_del.select("path").toPandas().to_csv("files_to_delete.csv", index=False)

Dove ho potuto leggere files_to_delete.csvin un array bash e quindi utilizzare un semplice forciclo bash passando ogni percorso s3 del file parquet a un aws s3 rmcomando per rimuovere i file uno per uno.

Potrebbe essere più lento di vacuum(), ma almeno non consumerà le risorse del cluster mentre è in funzione.

Se lo faccio, dovrò anche:

  1. scrivere un nuovo _delta_log/000000000000000#####.jsonfile che documenti correttamente queste modifiche?
  2. scrivere un nuovo 000000000000000#####.checkpoint.parquetfile che documenti correttamente queste modifiche e modificare il _delta_log/_last_checkpointfile in modo che punti a quel checkpoint.parquetfile?

La seconda opzione sarebbe più semplice.

Tuttavia, se non ci saranno effetti negativi se rimuovo i file e non modifico nulla in _delta_log, allora sarebbe il più semplice.

Risposte

SamirVyas Aug 24 2020 at 12:18

TLDR. Rispondendo a questa domanda.

Se ho cancellato manualmente i file parquet sottostanti e non ho aggiunto un nuovo file di log json o un nuovo file .checkpoint.parquet e ho cambiato il file _delta_log / _last_checkpoint che punta ad esso; quali sarebbero gli impatti negativi sulla tabella DeltaLake, se ce ne sarebbero?

, questo potrebbe potenzialmente danneggiare la tua tabella delta.

Lasciatemi brevemente rispondere a come delta-lake legge una versione utilizzando _delta_log.

Se vuoi leggere la versione x, andrà al registro delta di tutte le versioni da 1a x-1e farà una somma corrente di file parquet da leggere. Il riepilogo di questo processo viene salvato come una .checkpointversione ogni 10 per rendere efficiente questo processo di esecuzione di sum.

Cosa intendo con questa somma corrente?

Supponiamo, il
registro della versione 1 dice, aggiungi il add file_1, file_2, file_3registro della versione 2, aggiungidelete file_1, file_2, and add file_4

Quindi, leggendo la versione n. 2, le istruzioni totali saranno add file_1, file_2, file_3 -> delete file_1, file_2, and add file_4

Quindi, i file risultanti letti saranno file_3 e file_4.

Cosa succede se elimini un parquet da un file system?

Diciamo che nella versione 3, elimini file_4dal file system. Se non lo usi, il .vacuumregistro delta non saprà che file_4non è presente, proverà a leggerlo e fallirà.