Dataflow 일괄 작업이 확장되지 않음

Aug 19 2020

Dataflow가 대상 작업자를 1000으로 설정 했음에도 불구하고 내 Dataflow 작업 (작업 ID : 2020-08-18_07_55_15-14428306650890914471)이 작업자 1 명 이상으로 확장되지 않습니다.

이 작업은 Google Patents BigQuery 데이터 세트를 쿼리하고, ParDo 커스텀 함수와 변환기 (허깅 페이스) 라이브러리를 사용하여 텍스트를 토큰 화하고, 결과를 직렬화하고, 모든 것을 거대한 쪽모이 세공 파일에 쓰도록 구성됩니다.

나는 (어제 beam.DoFn 클래스를 사용하는 대신 함수를 매핑 한 작업을 실행 한 후) 문제가 스케일링을 제거하는 병렬화되지 않는 객체라고 가정했습니다. 따라서 토큰 화 프로세스를 클래스로 리팩토링합니다.

다음은 다음 명령을 사용하여 명령 줄에서 실행되는 스크립트입니다.

python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz

스크립트 :

    import os
    import re
    import argparse
    
    import google.auth
    import apache_beam as beam
    from apache_beam.options import pipeline_options
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.runners import DataflowRunner
    
    
    from apache_beam.io.gcp.internal.clients import bigquery
    import pyarrow as pa
    import pickle
    from transformers import AutoTokenizer
    
    
    print('Defining TokDoFn')
    class TokDoFn(beam.DoFn):
        def __init__(self, tok_version, block_size=200):
            self.tok = AutoTokenizer.from_pretrained(tok_version)
            self.block_size = block_size
    
        def process(self, x):
            txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
            enc = self.tok.encode(txt)
    
            for idx, token in enumerate(enc):
                chunk = enc[idx:idx + self.block_size]
                serialized = pickle.dumps(chunk)
                yield serialized
    
    
    def run(argv=None, save_main_session=True):
        query_big = '''
        with data as (
          SELECT 
            (select text from unnest(abstract_localized) limit 1) abs_text,
            (select text from unnest(description_localized) limit 1) desc_text,
            (select text from unnest(claims_localized) limit 1) claims_text,
            publication_date,
            filing_date,
            grant_date,
            application_kind,
            ipc
          FROM `patents-public-data.patents.publications` 
        )
    
        select *
        FROM data
        WHERE
          abs_text is not null 
          AND desc_text is not null
          AND claims_text is not null
          AND ipc is not null
        '''
    
        query_sample = '''
        SELECT *
        FROM `client_name.patent_data.patent_samples`
        LIMIT 2;
        '''
    
        print('Start Run()')
        parser = argparse.ArgumentParser()
        known_args, pipeline_args = parser.parse_known_args(argv)
    
        '''
        Configure Options
        '''
        # Setting up the Apache Beam pipeline options.
        # We use the save_main_session option because one or more DoFn's in this
        # workflow rely on global context (e.g., a module imported at module level).
        options = PipelineOptions(pipeline_args)
        options.view_as(SetupOptions).save_main_session = save_main_session
    
        # Sets the project to the default project in your current Google Cloud environment.
        _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
        # Sets the Google Cloud Region in which Cloud Dataflow runs.
        options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    
        # IMPORTANT! Adjust the following to choose a Cloud Storage location.
        dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
        # Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
        options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'
    
        # Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
        options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'
    
        # The directory to store the output files of the job.
        output_gcs_location = f'{dataflow_gcs_location}/output'
    
        print('Options configured per GCP Notebook Examples')
        print('Configuring BQ Table Schema for Beam')
    
    
        #Write Schema (to PQ):
        schema = pa.schema([
            ('block', pa.binary())
        ])
    
        print('Starting pipeline...')
        with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
            res = (p
                   | 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
                   | beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
                   | beam.Map(lambda x: {'block': x})
                   | beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
                                            schema,
                                            record_batch_size=1000)
                   )
            print('Pipeline built. Running...')
    
    if __name__ == '__main__':
        import logging
        logging.getLogger().setLevel(logging.INFO)
        logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
        run()

답변

4 apteryxlabs Aug 19 2020 at 06:06

해결책은 두 가지입니다.

내 작업을 실행할 때 다음 할당량을 모두 'Compute Engine API'에서 초과했습니다 (여기에서 할당량보기 : https://console.cloud.google.com/iam-admin/quotas) :

  • CPU (50 개로 증가 요청)
  • 영구 디스크 표준 (GB) (12,500으로 증가 요청)
  • In_Use_IP_Address (50 개로 증가 요청)

참고 : 작업이 실행되는 동안 콘솔 출력을 읽으면 초과 된 할당량이 INFO 행으로 인쇄되어야합니다.

위의 Peter Kim의 조언에 따라 명령의 일부로 --max_num_workers 플래그를 전달했습니다.

python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22

그리고 스케일링을 시작했습니다!

대체로 할당량에 도달했을 때 Dataflow 콘솔을 통해 사용자에게 메시지를 표시하고 해당 할당량 (및 권장 보완)에 대한 증가를 요청할 수있는 쉬운 방법을 제공하고 무엇에 대한 제안을 제공한다면 좋을 것입니다. 요청해야하는 증가 된 금액이어야합니다.