Tâche par lots Dataflow non mise à l'échelle
Ma tâche Dataflow (ID de tâche : 2020-08-18_07_55_15-14428306650890914471) ne dépasse pas 1 nœud de calcul, bien que Dataflow ait défini les nœuds de calcul cibles sur 1 000.
Le travail est configuré pour interroger l'ensemble de données Google Patents BigQuery, tokeniser le texte à l'aide d'une fonction personnalisée ParDo et de la bibliothèque de transformateurs (huggingface), sérialiser le résultat et tout écrire dans un fichier parquet géant.
J'avais supposé (après avoir exécuté le travail hier, qui mappait une fonction au lieu d'utiliser une classe beam.DoFn) que le problème était un objet non parallélisant éliminant la mise à l'échelle; par conséquent, refactoriser le processus de tokenisation en tant que classe.
Voici le script, qui est exécuté depuis la ligne de commande avec la commande suivante :
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz
Le scénario:
import os
import re
import argparse
import google.auth
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners import DataflowRunner
from apache_beam.io.gcp.internal.clients import bigquery
import pyarrow as pa
import pickle
from transformers import AutoTokenizer
print('Defining TokDoFn')
class TokDoFn(beam.DoFn):
def __init__(self, tok_version, block_size=200):
self.tok = AutoTokenizer.from_pretrained(tok_version)
self.block_size = block_size
def process(self, x):
txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
enc = self.tok.encode(txt)
for idx, token in enumerate(enc):
chunk = enc[idx:idx + self.block_size]
serialized = pickle.dumps(chunk)
yield serialized
def run(argv=None, save_main_session=True):
query_big = '''
with data as (
SELECT
(select text from unnest(abstract_localized) limit 1) abs_text,
(select text from unnest(description_localized) limit 1) desc_text,
(select text from unnest(claims_localized) limit 1) claims_text,
publication_date,
filing_date,
grant_date,
application_kind,
ipc
FROM `patents-public-data.patents.publications`
)
select *
FROM data
WHERE
abs_text is not null
AND desc_text is not null
AND claims_text is not null
AND ipc is not null
'''
query_sample = '''
SELECT *
FROM `client_name.patent_data.patent_samples`
LIMIT 2;
'''
print('Start Run()')
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
'''
Configure Options
'''
# Setting up the Apache Beam pipeline options.
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = save_main_session
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'
# The directory to store the output files of the job.
output_gcs_location = f'{dataflow_gcs_location}/output'
print('Options configured per GCP Notebook Examples')
print('Configuring BQ Table Schema for Beam')
#Write Schema (to PQ):
schema = pa.schema([
('block', pa.binary())
])
print('Starting pipeline...')
with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
res = (p
| 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
| beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
| beam.Map(lambda x: {'block': x})
| beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
schema,
record_batch_size=1000)
)
print('Pipeline built. Running...')
if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
run()
Réponses
La solution est double :
Les quotas suivants ont été dépassés lorsque j'ai exécuté ma tâche, tous sous "API Compute Engine" (consultez vos quotas ici :https://console.cloud.google.com/iam-admin/quotas):
- CPU (j'ai demandé une augmentation à 50)
- Persistent Disk Standard (Go) (j'ai demandé une augmentation à 12 500)
- In_Use_IP_Address (j'ai demandé une augmentation à 50)
Remarque : Si vous lisez la sortie de la console pendant l'exécution de votre tâche, tout quota dépassé doit s'imprimer sous la forme d'une ligne INFO.
Suite aux conseils de Peter Kim ci-dessus, j'ai passé le drapeau --max_num_workers dans le cadre de ma commande :
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22
Et j'ai commencé la mise à l'échelle!
Dans l'ensemble, ce serait bien s'il y avait un moyen d'informer les utilisateurs via la console Dataflow lorsqu'un quota est atteint, et de fournir un moyen simple de demander une augmentation de ce quota (et des quotas complémentaires recommandés), ainsi que des suggestions pour ce qui le montant majoré à demander devrait être.