FileIOを使用して書き込まれるファイルあたりの行数を制限する方法

Aug 22 2020

TextIOまたはFileIOを使用して、書き込まれた各シャードの行数を制限する方法はありますか?

例:

  1. Big Query-バッチジョブから行を読み取ります(たとえば、結果は19500行です)。
  2. いくつかの変換を行います。
  3. Google Cloudストレージにファイルを書き込みます(19ファイル、各ファイルは1000レコードに制限され、1ファイルには500レコードがあります)。
  4. Cloud Functionがトリガーされ、GCS内の各ファイルの外部APIにPOSTリクエストが送信されます。

これが私がこれまでにやろうとしていることですが、機能しません(ファイルごとに1000行を制限しようとしています):

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
        beam.io.BigQuerySource(query=query,
                               use_standard_sql=True)) | beam.Map(json.dumps)

BQ_DATA | beam.WindowInto(GlobalWindows(), Repeatedly(trigger=AfterCount(1000)),
                              accumulation_mode=AccumulationMode.DISCARDING)
        | WriteToFiles(path='fileio', destination="csv")

私は概念的に間違っていますか、それともこれを実装する他の方法はありますか?

回答

3 PeterKim Aug 23 2020 at 23:05

ParDo内にGCSへの書き込みステップを実装し、次のように「バッチ」に含める要素の数を制限できます。

from apache_beam.io import filesystems

class WriteToGcsWithRowLimit(beam.DoFn):
  def __init__(self, row_size=1000):
    self.row_size = row_size
    self.rows = []

  def finish_bundle(self):
     if len(self.rows) > 0:
        self._write_file()

  def process(self, element):
    self.rows.append(element)
    if len(self.rows) >= self.row_size:
        self._write_file()

  def _write_file(self):
    from time import time
    new_file = 'gs://bucket/file-{}.csv'.format(time())
    writer = filesystems.FileSystems.create(path=new_file)
    writer.write(self.rows) # may need to format
    self.rows = []
    writer.close()
BQ_DATA  | beam.ParDo(WriteToGcsWithRowLimit())

これにより、1000行未満のファイルは作成されませんが、ロジックを変更して作成できることに注意してくださいprocess

(残りを処理するために1を編集します)

(ファイルが上書きされるため、カウンターの使用を停止するには2を編集します)