Airflow 작업의 Google Cloud Storage에서 mongoimport JSON

Nov 14 2020

이에 대한 문서가 많지 않기 때문에 GCS에서 MongoDB로 데이터를 이동하는 것은 일반적이지 않은 것 같습니다. python_callablePython 연산자 로 전달하는 다음 작업 이 있습니다.이 작업은 BigQuery에서 GCS로 JSON으로 데이터를 이동합니다.

def transfer_gcs_to_mongodb(table_name):
    # connect
    client = bigquery.Client()
    bucket_name = "our-gcs-bucket"
    project_id = "ourproject"
    dataset_id = "ourdataset"
        
    destination_uri = f'gs://{bucket_name}/{table_name}.json'
    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
    table_ref = dataset_ref.table(table_name)

    configuration = bigquery.job.ExtractJobConfig()
    configuration.destination_format = 'NEWLINE_DELIMITED_JSON'

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        job_config=configuration,
        location="US",
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print("Exported {}:{}.{} to {}".format(project_id, dataset_id, table_name, destination_uri))

이 작업은 데이터를 GCS로 성공적으로 가져 오는 중입니다. 그러나이 mongoimport데이터를 MongoDB로 가져 오기 위해 올바르게 실행하는 방법에 관해서는 이제 막혔 습니다. 특히 mongoimportGCS의 파일을 가리킬 수없는 것처럼 보이지만 먼저 로컬에서 다운로드 한 다음 MongoDB로 가져와야합니다.

Airflow에서 어떻게해야합니까? GCS에서 JSON을 다운로드 한 다음 mongoimport올바른 uri모든 플래그를 사용하여 실행하는 셸 스크립트를 작성해야합니까 ? 아니면 mongoimport우리가 놓친 Airflow 에서 실행하는 다른 방법 이 있습니까?

답변

2 Elad Nov 15 2020 at 18:30

GCS에서 다운로드하기 위해 셸 스크립트를 작성할 필요가 없습니다. GCSToLocalFilesystemOperator 를 사용하기 만하면 파일을 열고 MongoHook 의 insert_many 함수를 사용하여 mongo 에 쓸 수 있습니다.

나는 그것을 테스트하지 않았지만 다음과 같아야합니다.

mongo = MongoHook(conn_id=mongo_conn_id)
with open('file.json') as f:
    file_data = json.load(f)
mongo.insert_many(file_data)

이는 BigQuery-> GCS-> 로컬 파일 시스템-> MongoDB 파이프 용입니다.

원하는 경우 BigQuery-> GCS-> MongoDB와 같이 메모리에서 수행 할 수도 있습니다.