El trabajo por lotes de Dataflow no se escala
Mi trabajo de Dataflow (ID de trabajo: 2020-08-18_07_55_15-14428306650890914471) no escala más allá de 1 trabajador, a pesar de que Dataflow establece los trabajadores de destino en 1000.
El trabajo está configurado para consultar el conjunto de datos BigQuery de Google Patents, tokenizar el texto mediante una función personalizada de ParDo y la biblioteca de transformadores (huggingface), serializar el resultado y escribir todo en un archivo de parquet gigante.
Asumí (después de ejecutar el trabajo ayer, que asignó una función en lugar de usar una clase beam.DoFn) que el problema era algún objeto no paralelo que eliminaba la escala; por lo tanto, refactorizando el proceso de tokenización como una clase.
Aquí está el script, que se ejecuta desde la línea de comandos con el siguiente comando:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz
La secuencia de comandos:
import os
import re
import argparse
import google.auth
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners import DataflowRunner
from apache_beam.io.gcp.internal.clients import bigquery
import pyarrow as pa
import pickle
from transformers import AutoTokenizer
print('Defining TokDoFn')
class TokDoFn(beam.DoFn):
def __init__(self, tok_version, block_size=200):
self.tok = AutoTokenizer.from_pretrained(tok_version)
self.block_size = block_size
def process(self, x):
txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
enc = self.tok.encode(txt)
for idx, token in enumerate(enc):
chunk = enc[idx:idx + self.block_size]
serialized = pickle.dumps(chunk)
yield serialized
def run(argv=None, save_main_session=True):
query_big = '''
with data as (
SELECT
(select text from unnest(abstract_localized) limit 1) abs_text,
(select text from unnest(description_localized) limit 1) desc_text,
(select text from unnest(claims_localized) limit 1) claims_text,
publication_date,
filing_date,
grant_date,
application_kind,
ipc
FROM `patents-public-data.patents.publications`
)
select *
FROM data
WHERE
abs_text is not null
AND desc_text is not null
AND claims_text is not null
AND ipc is not null
'''
query_sample = '''
SELECT *
FROM `client_name.patent_data.patent_samples`
LIMIT 2;
'''
print('Start Run()')
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
'''
Configure Options
'''
# Setting up the Apache Beam pipeline options.
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = save_main_session
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'
# The directory to store the output files of the job.
output_gcs_location = f'{dataflow_gcs_location}/output'
print('Options configured per GCP Notebook Examples')
print('Configuring BQ Table Schema for Beam')
#Write Schema (to PQ):
schema = pa.schema([
('block', pa.binary())
])
print('Starting pipeline...')
with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
res = (p
| 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
| beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
| beam.Map(lambda x: {'block': x})
| beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
schema,
record_batch_size=1000)
)
print('Pipeline built. Running...')
if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
run()
Respuestas
La solución es doble:
Se excedieron las siguientes cuotas cuando ejecuté mi trabajo, todo en 'API de Compute Engine' (consulte sus cuotas aquí:https://console.cloud.google.com/iam-admin/quotas):
- CPUs (solicité un aumento a 50)
- Estándar de disco persistente (GB) (solicité un aumento a 12,500)
- In_Use_IP_Address (solicité un aumento a 50)
Nota: Si lee la salida de la consola mientras se ejecuta su trabajo, cualquier cuota excedida debe imprimirse como una línea INFO.
Siguiendo el consejo anterior de Peter Kim, pasé la bandera --max_num_workers como parte de mi comando:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22
¡Y empecé a escalar!
En general, sería bueno si hubiera una forma de avisar a los usuarios a través de la consola de Dataflow cuando se alcance una cuota, y proporcionar un medio fácil para solicitar un aumento de esa cuota (y las complementarias recomendadas), junto con sugerencias para lo que el aumento de la cantidad a solicitar debe ser.