ชุดงานกระแสข้อมูลไม่ได้ปรับขนาด
งาน Dataflow ของฉัน (รหัสงาน: 2020-08-18_07_55_15-14428306650890914471) ไม่ได้ปรับขนาดผู้ปฏิบัติงานเกิน 1 คนแม้ Dataflow จะตั้งค่าคนงานเป้าหมายเป็น 1,000 คน
งานได้รับการกำหนดค่าให้สืบค้นชุดข้อมูล Google Patents BigQuery สร้างโทเค็นข้อความโดยใช้ฟังก์ชันที่กำหนดเองของ ParDo และไลบรารีของ Transformers (hugface) จัดลำดับผลลัพธ์และเขียนทุกอย่างลงในไฟล์ไม้ปาร์เก้ขนาดยักษ์
ฉันได้สันนิษฐาน (หลังจากเรียกใช้งานเมื่อวานนี้ซึ่งแมปฟังก์ชันแทนการใช้คลาส Beam.DoFn) ว่าปัญหาคือวัตถุที่ไม่ขนานกันซึ่งกำจัดการปรับขนาด ดังนั้นการปรับโครงสร้างกระบวนการโทเค็นใหม่เป็นคลาส
นี่คือสคริปต์ซึ่งเรียกใช้จากบรรทัดคำสั่งด้วยคำสั่งต่อไปนี้:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz
สคริปต์:
import os
import re
import argparse
import google.auth
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners import DataflowRunner
from apache_beam.io.gcp.internal.clients import bigquery
import pyarrow as pa
import pickle
from transformers import AutoTokenizer
print('Defining TokDoFn')
class TokDoFn(beam.DoFn):
def __init__(self, tok_version, block_size=200):
self.tok = AutoTokenizer.from_pretrained(tok_version)
self.block_size = block_size
def process(self, x):
txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
enc = self.tok.encode(txt)
for idx, token in enumerate(enc):
chunk = enc[idx:idx + self.block_size]
serialized = pickle.dumps(chunk)
yield serialized
def run(argv=None, save_main_session=True):
query_big = '''
with data as (
SELECT
(select text from unnest(abstract_localized) limit 1) abs_text,
(select text from unnest(description_localized) limit 1) desc_text,
(select text from unnest(claims_localized) limit 1) claims_text,
publication_date,
filing_date,
grant_date,
application_kind,
ipc
FROM `patents-public-data.patents.publications`
)
select *
FROM data
WHERE
abs_text is not null
AND desc_text is not null
AND claims_text is not null
AND ipc is not null
'''
query_sample = '''
SELECT *
FROM `client_name.patent_data.patent_samples`
LIMIT 2;
'''
print('Start Run()')
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
'''
Configure Options
'''
# Setting up the Apache Beam pipeline options.
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = save_main_session
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'
# The directory to store the output files of the job.
output_gcs_location = f'{dataflow_gcs_location}/output'
print('Options configured per GCP Notebook Examples')
print('Configuring BQ Table Schema for Beam')
#Write Schema (to PQ):
schema = pa.schema([
('block', pa.binary())
])
print('Starting pipeline...')
with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
res = (p
| 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
| beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
| beam.Map(lambda x: {'block': x})
| beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
schema,
record_batch_size=1000)
)
print('Pipeline built. Running...')
if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
run()
คำตอบ
การแก้ปัญหาเป็นสองเท่า:
เกินโควต้าต่อไปนี้เมื่อฉันทำงานทั้งหมดนี้อยู่ภายใต้ "Compute Engine API" (ดูโควต้าของคุณที่นี่: https://console.cloud.google.com/iam-admin/quotas):
- ซีพียู (ฉันขอเพิ่มเป็น 50)
- Persistent Disk Standard (GB) (ฉันขอเพิ่มเป็น 12,500)
- In_Use_IP_Address (ฉันขอเพิ่มเป็น 50)
หมายเหตุ:หากคุณอ่านเอาต์พุตคอนโซลในขณะที่งานของคุณกำลังทำงานอยู่ควรพิมพ์โควต้าที่เกินออกมาเป็นบรรทัด INFO
ตามคำแนะนำของ Peter Kim ข้างต้นฉันส่งต่อธง --max_num_workers เป็นส่วนหนึ่งของคำสั่งของฉัน:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22
และฉันก็เริ่มปรับขนาด!
สรุปแล้วมันจะดีถ้ามีวิธีแจ้งผู้ใช้ผ่านคอนโซล Dataflow เมื่อถึงโควต้าและมอบวิธีง่ายๆในการขอเพิ่มโควต้านั้น (และแนะนำเสริม) พร้อมกับคำแนะนำสำหรับสิ่งที่ จำนวนเงินที่เพิ่มขึ้นที่จะขอควรเป็น