エアフロージョブを隔週でスケジュールする

Aug 24 2020

隔週の金曜日にエアフロージョブをスケジュールする必要があります。しかし、問題は私がこれのスケジュールを書く方法を理解することができないということです。

私はこれのために複数の仕事をしたくありません。

私はこれを試しました

'0 0 1-7,15-21 * 5

ただし、動作していません。毎日1から7および15から21で実行されています。

shubhamの回答から、タスクをスキップできるPythonOperatorを使用できることがわかりました。私はソリューションを実装しようとしました。しかし、うまくいかないようです。

これを2週間でテストするのは難しすぎるでしょう。これが私がしたことです。

  • DAGを5分ごとに実行するようにスケジュールします
  • しかし、私はpython演算子をスキップする代替タスクを書いています(私がやろうとしていることと非常によく似ています、別の金曜日)。

DAG:

args = {
    'owner': 'Gaurang Shah',
    'retries': 0,
    'start_date':airflow.utils.dates.days_ago(1),
}


dag = DAG(
    dag_id='test_dag',
    default_args=args,
    catchup=False,
    schedule_interval='*/5 * * * *',
    max_active_runs=1
    )


dummy_op = DummyOperator(task_id='dummy', dag=dag)

def _check_date(execution_date, **context):
    min_date = datetime.now() - relativedelta(minutes=10)
    print(context)
    print(context.get("prev_execution_date"))
    print(execution_date)
    print(datetime.now())
    print(min_date)
    if execution_date > min_date:
        raise AirflowSkipException(f"No data available on this execution_date ({execution_date}).")

check_date = PythonOperator(
    task_id="check_if_min_date",
    python_callable=_check_date,
    provide_context=True,
    dag=dag,
)

回答

y2k-shubham Aug 25 2020 at 03:49
  • 単一のcrontab式でこれを解決できるとは思えません

  • airflowのトリックを使用すると、解決策ははるかに簡単です。

    • 毎週金曜日あなたのDAGをスケジュール0 0 * * FRIし、
    • 隔週の金曜日(ビジネスロジックに基づく)に、レイズしてDAGをスキップします AirflowSkipException

    ここでは、DAGを、skip_decider隔週の金曜日までに実行/スキップできる専用タスクから開始する必要があります。

    • 条件付きで上げるAirflowSkipException(DAGをスキップするため)
    • DAGを実行させるために何もしていません

活用することもできます

  • ShortCircuitOperator
  • BranchPythonOperator

しかし、IMOAirflowSkipExceptionは最もクリーンなソリューションです


参照:月次ジョブを日次ジョブと一緒にスケジューラーするDAGを定義する方法は?