Dataflow-Batchjob wird nicht skaliert

Aug 19 2020

Mein Dataflow-Job (Job-ID: 2020-08-18_07_55_15-14428306650890914471) skaliert nicht über 1 Worker hinaus, obwohl Dataflow die Ziel-Worker auf 1000 eingestellt hat.

Der Job ist so konfiguriert, dass er den BigQuery-Datensatz von Google Patents abfragt, den Text mit einer benutzerdefinierten ParDo-Funktion und der Transformers-Bibliothek (Huggingface) tokenisiert, das Ergebnis serialisiert und alles in eine riesige Parquet-Datei schreibt.

Ich hatte angenommen (nachdem ich gestern den Job ausgeführt hatte, der eine Funktion abbildete, anstatt eine beam.DoFn-Klasse zu verwenden), dass das Problem ein nicht parallelisierendes Objekt war, das die Skalierung eliminierte; Daher Refactoring des Tokenisierungsprozesses als Klasse.

Hier ist das Skript, das von der Befehlszeile mit dem folgenden Befehl ausgeführt wird:

python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz

Das Skript:

    import os
    import re
    import argparse
    
    import google.auth
    import apache_beam as beam
    from apache_beam.options import pipeline_options
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.runners import DataflowRunner
    
    
    from apache_beam.io.gcp.internal.clients import bigquery
    import pyarrow as pa
    import pickle
    from transformers import AutoTokenizer
    
    
    print('Defining TokDoFn')
    class TokDoFn(beam.DoFn):
        def __init__(self, tok_version, block_size=200):
            self.tok = AutoTokenizer.from_pretrained(tok_version)
            self.block_size = block_size
    
        def process(self, x):
            txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
            enc = self.tok.encode(txt)
    
            for idx, token in enumerate(enc):
                chunk = enc[idx:idx + self.block_size]
                serialized = pickle.dumps(chunk)
                yield serialized
    
    
    def run(argv=None, save_main_session=True):
        query_big = '''
        with data as (
          SELECT 
            (select text from unnest(abstract_localized) limit 1) abs_text,
            (select text from unnest(description_localized) limit 1) desc_text,
            (select text from unnest(claims_localized) limit 1) claims_text,
            publication_date,
            filing_date,
            grant_date,
            application_kind,
            ipc
          FROM `patents-public-data.patents.publications` 
        )
    
        select *
        FROM data
        WHERE
          abs_text is not null 
          AND desc_text is not null
          AND claims_text is not null
          AND ipc is not null
        '''
    
        query_sample = '''
        SELECT *
        FROM `client_name.patent_data.patent_samples`
        LIMIT 2;
        '''
    
        print('Start Run()')
        parser = argparse.ArgumentParser()
        known_args, pipeline_args = parser.parse_known_args(argv)
    
        '''
        Configure Options
        '''
        # Setting up the Apache Beam pipeline options.
        # We use the save_main_session option because one or more DoFn's in this
        # workflow rely on global context (e.g., a module imported at module level).
        options = PipelineOptions(pipeline_args)
        options.view_as(SetupOptions).save_main_session = save_main_session
    
        # Sets the project to the default project in your current Google Cloud environment.
        _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
        # Sets the Google Cloud Region in which Cloud Dataflow runs.
        options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    
        # IMPORTANT! Adjust the following to choose a Cloud Storage location.
        dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
        # Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
        options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'
    
        # Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
        options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'
    
        # The directory to store the output files of the job.
        output_gcs_location = f'{dataflow_gcs_location}/output'
    
        print('Options configured per GCP Notebook Examples')
        print('Configuring BQ Table Schema for Beam')
    
    
        #Write Schema (to PQ):
        schema = pa.schema([
            ('block', pa.binary())
        ])
    
        print('Starting pipeline...')
        with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
            res = (p
                   | 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
                   | beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
                   | beam.Map(lambda x: {'block': x})
                   | beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
                                            schema,
                                            record_batch_size=1000)
                   )
            print('Pipeline built. Running...')
    
    if __name__ == '__main__':
        import logging
        logging.getLogger().setLevel(logging.INFO)
        logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
        run()

Antworten

4 apteryxlabs Aug 19 2020 at 06:06

Die Lösung ist zweigeteilt:

Die folgenden Kontingente wurden überschritten, als ich meinen Job ausgeführt habe, alle unter "Compute Engine API" (sehen Sie sich Ihre Kontingente hier an:https://console.cloud.google.com/iam-admin/quotas):

  • CPUs (ich habe eine Erhöhung auf 50 beantragt)
  • Persistent Disk Standard (GB) (Ich habe eine Erhöhung auf 12.500 beantragt)
  • In_Use_IP_Address (Ich habe eine Erhöhung auf 50 beantragt)

Hinweis: Wenn Sie die Konsolenausgabe lesen, während Ihr Job ausgeführt wird, sollten alle überschrittenen Kontingente als INFO-Zeile ausgegeben werden.

Dem obigen Rat von Peter Kim folgend, habe ich das Flag --max_num_workers als Teil meines Befehls übergeben:

python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22

Und ich fing an zu skalieren!

Alles in allem wäre es schön, wenn es eine Möglichkeit gäbe, Benutzer über die Dataflow-Konsole zu benachrichtigen, wenn ein Kontingent erreicht ist, und eine einfache Möglichkeit bereitzustellen, eine Erhöhung dieses (und empfohlenen ergänzenden) Kontingents zu beantragen, zusammen mit Vorschlägen für was der erhöhte Betrag beantragt werden sollte.