Processo batch del flusso di dati non ridimensionabile
Il mio job Dataflow (ID job: 2020-08-18_07_55_15-14428306650890914471) non supera 1 worker, nonostante Dataflow abbia impostato i worker di destinazione su 1000.
Il processo è configurato per interrogare il set di dati BigQuery di Google Patents, tokenizzare il testo utilizzando una funzione personalizzata ParDo e la libreria transformers (huggingface), serializzare il risultato e scrivere tutto in un gigantesco file parquet.
Avevo ipotizzato (dopo aver eseguito il lavoro ieri, che mappava una funzione invece di utilizzare una classe beam.DoFn) che il problema fosse un oggetto non parallelizzante che eliminava il ridimensionamento; quindi, refactoring del processo di tokenizzazione come classe.
Ecco lo script, che viene eseguito dalla riga di comando con il seguente comando:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz
Il copione:
import os
import re
import argparse
import google.auth
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners import DataflowRunner
from apache_beam.io.gcp.internal.clients import bigquery
import pyarrow as pa
import pickle
from transformers import AutoTokenizer
print('Defining TokDoFn')
class TokDoFn(beam.DoFn):
def __init__(self, tok_version, block_size=200):
self.tok = AutoTokenizer.from_pretrained(tok_version)
self.block_size = block_size
def process(self, x):
txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
enc = self.tok.encode(txt)
for idx, token in enumerate(enc):
chunk = enc[idx:idx + self.block_size]
serialized = pickle.dumps(chunk)
yield serialized
def run(argv=None, save_main_session=True):
query_big = '''
with data as (
SELECT
(select text from unnest(abstract_localized) limit 1) abs_text,
(select text from unnest(description_localized) limit 1) desc_text,
(select text from unnest(claims_localized) limit 1) claims_text,
publication_date,
filing_date,
grant_date,
application_kind,
ipc
FROM `patents-public-data.patents.publications`
)
select *
FROM data
WHERE
abs_text is not null
AND desc_text is not null
AND claims_text is not null
AND ipc is not null
'''
query_sample = '''
SELECT *
FROM `client_name.patent_data.patent_samples`
LIMIT 2;
'''
print('Start Run()')
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
'''
Configure Options
'''
# Setting up the Apache Beam pipeline options.
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = save_main_session
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'
# The directory to store the output files of the job.
output_gcs_location = f'{dataflow_gcs_location}/output'
print('Options configured per GCP Notebook Examples')
print('Configuring BQ Table Schema for Beam')
#Write Schema (to PQ):
schema = pa.schema([
('block', pa.binary())
])
print('Starting pipeline...')
with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
res = (p
| 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
| beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
| beam.Map(lambda x: {'block': x})
| beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
schema,
record_batch_size=1000)
)
print('Pipeline built. Running...')
if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
run()
Risposte
La soluzione è duplice:
Le seguenti quote sono state superate quando ho eseguito il mio lavoro, tutte in "Compute Engine API" (visualizza le tue quote qui:https://console.cloud.google.com/iam-admin/quotas):
- CPU (ho richiesto un aumento a 50)
- Persistent Disk Standard (GB) (ho richiesto un aumento a 12.500)
- In_Use_IP_Address (ho richiesto un aumento a 50)
Nota: se leggi l'output della console mentre il processo è in esecuzione, qualsiasi quota superata dovrebbe essere stampata come una riga INFO.
Seguendo il consiglio di Peter Kim sopra, ho passato la bandiera --max_num_workers come parte del mio comando:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22
E ho iniziato a scalare!
Tutto sommato, sarebbe bello se ci fosse un modo per avvisare gli utenti tramite la console di Dataflow quando viene raggiunta una quota e fornire un mezzo semplice per richiedere un aumento di quella quota (e raccomandata complementare), insieme a suggerimenti su cosa l'importo maggiorato da richiedere dovrebbe essere.