Tugas batch Dataflow tidak diskalakan
Pekerjaan Dataflow saya (ID Pekerjaan: 2020-08-18_07_55_15-14428306650890914471) tidak menskalakan 1 pekerja terakhir, meskipun Dataflow menetapkan pekerja target ke 1000.
Pekerjaan tersebut dikonfigurasi untuk meminta set data BigQuery Google Patents, membuat token teks menggunakan fungsi kustom ParDo dan library transformer (huggingface), membuat serialisasi hasilnya, dan menulis semuanya ke file parket raksasa.
Saya berasumsi (setelah menjalankan pekerjaan kemarin, yang memetakan fungsi alih-alih menggunakan kelas beam.DoFn) bahwa masalahnya adalah beberapa objek non-paralelisasi yang menghilangkan penskalaan; karenanya, refactoring proses tokenisasi sebagai sebuah kelas.
Berikut skripnya, yang dijalankan dari baris perintah dengan perintah berikut:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz
Naskah:
import os
import re
import argparse
import google.auth
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners import DataflowRunner
from apache_beam.io.gcp.internal.clients import bigquery
import pyarrow as pa
import pickle
from transformers import AutoTokenizer
print('Defining TokDoFn')
class TokDoFn(beam.DoFn):
def __init__(self, tok_version, block_size=200):
self.tok = AutoTokenizer.from_pretrained(tok_version)
self.block_size = block_size
def process(self, x):
txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
enc = self.tok.encode(txt)
for idx, token in enumerate(enc):
chunk = enc[idx:idx + self.block_size]
serialized = pickle.dumps(chunk)
yield serialized
def run(argv=None, save_main_session=True):
query_big = '''
with data as (
SELECT
(select text from unnest(abstract_localized) limit 1) abs_text,
(select text from unnest(description_localized) limit 1) desc_text,
(select text from unnest(claims_localized) limit 1) claims_text,
publication_date,
filing_date,
grant_date,
application_kind,
ipc
FROM `patents-public-data.patents.publications`
)
select *
FROM data
WHERE
abs_text is not null
AND desc_text is not null
AND claims_text is not null
AND ipc is not null
'''
query_sample = '''
SELECT *
FROM `client_name.patent_data.patent_samples`
LIMIT 2;
'''
print('Start Run()')
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
'''
Configure Options
'''
# Setting up the Apache Beam pipeline options.
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = save_main_session
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'
# The directory to store the output files of the job.
output_gcs_location = f'{dataflow_gcs_location}/output'
print('Options configured per GCP Notebook Examples')
print('Configuring BQ Table Schema for Beam')
#Write Schema (to PQ):
schema = pa.schema([
('block', pa.binary())
])
print('Starting pipeline...')
with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
res = (p
| 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
| beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
| beam.Map(lambda x: {'block': x})
| beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
schema,
record_batch_size=1000)
)
print('Pipeline built. Running...')
if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
run()
Jawaban
Solusinya ada dua:
Kuota berikut telah terlampaui ketika saya menjalankan pekerjaan saya, semuanya di bawah 'Compute Engine API' (lihat kuota Anda di sini: https://console.cloud.google.com/iam-admin/quotas):
- CPU (Saya meminta peningkatan menjadi 50)
- Persistent Disk Standard (GB) (Saya meminta peningkatan menjadi 12.500)
- In_Use_IP_Address (Saya meminta peningkatan menjadi 50)
Catatan: Jika Anda membaca output konsol saat pekerjaan Anda berjalan, kuota yang terlampaui akan dicetak sebagai baris INFO.
Mengikuti saran Peter Kim di atas, saya memberikan bendera --max_num_workers sebagai bagian dari perintah saya:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22
Dan saya mulai menskalakan!
Secara keseluruhan, alangkah baiknya jika ada cara untuk meminta pengguna melalui konsol Dataflow saat kuota tercapai, dan menyediakan cara mudah untuk meminta peningkatan kuota (dan pelengkap yang direkomendasikan), bersama dengan saran untuk apa seharusnya jumlah yang diminta meningkat.