एयरफ्लो xcom पुल केवल रिटर्न स्ट्रिंग

Nov 18 2020

मेरे पास एक एयरफ्लो पाइपलाइन है जहां मुझे पबसब सब्सक्रिप्शन से फ़ाइल नाम प्राप्त करने की आवश्यकता है और फिर उस फ़ाइल को क्लाउड sql उदाहरण में आयात करें। मैं CSV फ़ाइल आयात करने के लिए CloudSqlInstanceImportOperator का उपयोग करता हूं। इस ऑपरेटर को एक निकाय की आवश्यकता होती है, जिसमें फ़ाइल नाम और अन्य पैरामीटर होते हैं। चूंकि मैंने रनटाइम के दौरान उस फ़ाइलनाम को पढ़ा है, इसलिए मुझे रनटाइम के दौरान बॉडी को भी परिभाषित करना होगा। यह सब काम करता है। लेकिन जब मैं शरीर को एक्सकॉम से खींचता हूं, तो यह अजगर शब्दकोश के बजाय एक स्ट्रिंग देता है। तो CloudSqlInstanceImportOperator मुझे निम्न त्रुटि देता है (मेरा अनुमान है, क्योंकि शरीर एक स्ट्रिंग है और शब्दकोश नहीं है):

Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 984, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 715, in execut
    self._validate_body_fields(
  File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 712, in _validate_body_field
    api_version=self.api_version).validate(self.body
  File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 420, in validat
    dictionary_to_validate=body_to_validate
  File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 341, in _validate_fiel
    value = dictionary_to_validate.get(field_name
AttributeError: 'str' object has no attribute 'get

यह मेरे द्वारा उपयोग किया जाने वाला कोड है:

import json 
import os
from datetime import datetime, timedelta
import ast
from airflow import DAG
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.pubsub_sensor import PubSubPullSensor
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceImportOperator


def create_dag(dag_id,default_args):
    BUCKET = "{{ var.value.gp2pg_bucket }}"
    GCP_PROJECT_ID = "{{ var.value.gp2pg_project_id }}"
    INSTANCE_NAME = "{{ var.value.gp2pg_instance_name }}"

    def define_import_body(file,**kwargs):
        import_body = {
            "importContext": {
                "importUser": "databasename",
                "database": "databaseuser",
                "fileType": "csv",
                "uri": "bucketname" + file,
                "csvImportOptions": {
                    "table": "schema.tablename",
                    "columns": ["columns1",
                                "column2"]}
            }
        }
        task_instance = kwargs['task_instance']
        task_instance.xcom_push(key='import_body', value=import_body)
        print(import_body)

    def get_filename(var,**kwargs):
        message = ast.literal_eval(var)
        file = message[0].get('message').get('attributes').get('objectId')
        task_instance = kwargs['task_instance']
        task_instance.xcom_push(key='filename', value=file)
        print(file)

    dag = DAG(dag_id=dag_id, schedule_interval=None, default_args=default_args)

    with dag:
        t1 = PubSubPullSensor(task_id='pull-messages',
                              project="projectname",
                              ack_messages=True,
                              max_messages=1,
                              subscription="subscribtionname")


        message = "{{ task_instance.xcom_pull() }}"

        t2 = PythonOperator(
            task_id='get_filename',
            python_callable=get_filename,
            op_kwargs={'var': message},
            provide_context=True,
        )

        file = "{{ task_instance.xcom_pull(task_ids='get_filename', key='filename') }}"

        t3 = PythonOperator(
            task_id='define_import_body',
            python_callable=define_import_body,
            op_kwargs={'file': file},
            provide_context=True,
        )

        import_body = "{{ task_instance.xcom_pull(task_ids='define_import_body', key='import_body') }}"

        t4 = CloudSqlInstanceImportOperator(
            project_id=GCP_PROJECT_ID,
            body= import_body,
            instance=INSTANCE_NAME,
            gcp_conn_id='postgres_default',
            task_id='sql_import_task',
            validate_body=True,
        )

        t5 = GoogleCloudStorageToGoogleCloudStorageOperator(
            task_id='copy_files',
            source_bucket=BUCKET,
            source_object=file,
            destination_bucket=BUCKET,
            destination_object='processed/import/'+file, )

        t1 >> t2 >> t3 >> t4 >> t5

    return dag


dags_folder = os.getenv('DAGS_FOLDER', "./dags")
flow_config = open(f'{dags_folder}/gp2pg/flow_config.json', 'r').read()
for key, values in json.loads(flow_config).items():
    default_args = {
        "owner": "owner",
        "start_date": datetime(2020, 1, 1),
        "email": [],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 0,
        "retry_delay": timedelta(minutes=5),
    }

    dag_id = f"gp2pg_{key}_data_to_pg"

    globals()[dag_id] = create_dag(dag_id, default_args)

किसी भी विचार मैं उस समस्या को कैसे हल कर सकता हूं?

जवाब

2 Elad Nov 18 2020 at 22:16

सबसे पहले CloudSqlInstanceImportOperatorहै पदावनत । आपको प्रदाताओं से CloudSQLImportInstanceOperator का उपयोग करना चाहिए

bodyपरम के रूप में विस्तार से बताया dict होने की जरूरत है डॉक्स ।

XCOM डेटाबेस में एक टेबल है। डेटा को तार के रूप में सहेजा जाता है। आप डेटाबेस में तानाशाही को स्टोर नहीं कर सकते क्योंकि मेमोरी ऑब्जेक्ट में तानाशाह एक पायथन है। आपके पास शायद एक Json (स्ट्रिंग) है। इसे तानाशाही में बदलने का प्रयास करें:

body= json.loads(import_body) 

संपादित करें: (टिप्पणियों में चर्चा के बाद)

आपको अपने ऑपरेटर को पायथनऑपरेटर के साथ लपेटने की आवश्यकता होगी, ताकि आप इसे तानाशाही में बदल सकें xcomऔर इसका उपयोग कर सकें।

def my_func(ds, **kwargs):
    ti = kwargs['ti']
    body = ti.xcom_pull(task_ids='privious_task_id')
    import_body= json.loads(body)
    op = CloudSqlInstanceImportOperator(
            project_id=GCP_PROJECT_ID,
            body= import_body,
            instance=INSTANCE_NAME,
            gcp_conn_id='postgres_default',
            task_id='sql_import_task',
            validate_body=True,
        )
    op.execute()
    

p = PythonOperator(task_id='python_task', python_callable=my_func)