Come limitare il numero di righe per file scritto utilizzando FileIO

Aug 22 2020

Esiste un modo per limitare il numero di righe in ogni frammento scritto utilizzando TextIO o potrebbe essere FileIO?

Esempio:

  1. Leggi righe da Big Query - Batch Job (il risultato è 19500 righe, ad esempio).
  2. Fai delle trasformazioni.
  3. Scrivi file su Google Cloud storage (19 file, ogni file è limitato a 1000 record, un file ha 500 record).
  4. Cloud Function viene attivato per effettuare una richiesta POST a un'API esterna per ogni file in GCS.

Ecco cosa sto cercando di fare finora ma non funziona (cercando di limitare 1000 righe per file):

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
        beam.io.BigQuerySource(query=query,
                               use_standard_sql=True)) | beam.Map(json.dumps)

BQ_DATA | beam.WindowInto(GlobalWindows(), Repeatedly(trigger=AfterCount(1000)),
                              accumulation_mode=AccumulationMode.DISCARDING)
        | WriteToFiles(path='fileio', destination="csv")

Mi sbaglio concettualmente o c'è un altro modo per implementarlo?

Risposte

3 PeterKim Aug 23 2020 at 23:05

Puoi implementare il passaggio di scrittura su GCS all'interno di ParDo e limitare il numero di elementi da includere in un "batch" come questo:

from apache_beam.io import filesystems

class WriteToGcsWithRowLimit(beam.DoFn):
  def __init__(self, row_size=1000):
    self.row_size = row_size
    self.rows = []

  def finish_bundle(self):
     if len(self.rows) > 0:
        self._write_file()

  def process(self, element):
    self.rows.append(element)
    if len(self.rows) >= self.row_size:
        self._write_file()

  def _write_file(self):
    from time import time
    new_file = 'gs://bucket/file-{}.csv'.format(time())
    writer = filesystems.FileSystems.create(path=new_file)
    writer.write(self.rows) # may need to format
    self.rows = []
    writer.close()
BQ_DATA  | beam.ParDo(WriteToGcsWithRowLimit())

Nota che questo non creerà alcun file con meno di 1000 righe, ma puoi modificare la logica processper farlo.

(Modifica 1 per gestire i resti)

(Modifica 2 per smettere di usare i contatori, poiché i file verranno sovrascritti)