Pianifica il flusso d'aria ogni due settimane

Aug 24 2020

Ho la necessità di programmare un lavoro di flusso d'aria ogni venerdì alternativo. Tuttavia, il problema è che non sono in grado di capire come scrivere un programma per questo.

Non voglio avere più lavori per questo.

Ho provato questo

'0 0 1-7,15-21 * 5

Tuttavia non funziona è in esecuzione da 1 a 7 e da 15 a 21 tutti i giorni.

dalla risposta di shubham mi rendo conto che possiamo avere un PythonOperator che può saltare il compito per noi. E ho provato a implementare la soluzione. Tuttavia non sembra funzionare.

Poiché testarlo su un periodo di 2 settimane sarebbe troppo difficile. Questo è quello che ho fatto.

  • Pianifico l'esecuzione del DAG ogni 5 minuti
  • Tuttavia, sto scrivendo all'operatore python l'attività skip althernate (abbastanza simile a quello che sto cercando di fare, venerdì alternativo).

DAG:

args = {
    'owner': 'Gaurang Shah',
    'retries': 0,
    'start_date':airflow.utils.dates.days_ago(1),
}


dag = DAG(
    dag_id='test_dag',
    default_args=args,
    catchup=False,
    schedule_interval='*/5 * * * *',
    max_active_runs=1
    )


dummy_op = DummyOperator(task_id='dummy', dag=dag)

def _check_date(execution_date, **context):
    min_date = datetime.now() - relativedelta(minutes=10)
    print(context)
    print(context.get("prev_execution_date"))
    print(execution_date)
    print(datetime.now())
    print(min_date)
    if execution_date > min_date:
        raise AirflowSkipException(f"No data available on this execution_date ({execution_date}).")

check_date = PythonOperator(
    task_id="check_if_min_date",
    python_callable=_check_date,
    provide_context=True,
    dag=dag,
)

Risposte

y2k-shubham Aug 25 2020 at 03:49
  • Dubito che una singola crontabespressione possa risolvere questo problema

  • Usando airflowi trucchi di, la soluzione è molto più semplice:

    • pianifica il tuo DAG ogni venerdì 0 0 * * FRIe
    • il venerdì alternativo (in base alla logica aziendale), salta il DAG aumentando AirflowSkipException

    Qui dovrai lasciare che il tuo DAG inizi con un'attività dedicata skip_deciderche consentirà al tuo DAG di funzionare / saltare ogni venerdì alternativo entro

    • aumento condizionale AirflowSkipException(per saltare il DAG)
    • non fare nulla per far funzionare il DAG

Puoi anche fare leva

  • ShortCircuitOperator
  • BranchPythonOperator

ma IMO, AirflowSkipExceptionè la soluzione più pulita


Riferimento: come definire un DAG che pianifica un lavoro mensile insieme a un lavoro quotidiano?