Cloud Composer で TriggerDagRunOperator を使ってDAGから別のDAGを呼び出す
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
やりたいこと
Apache Airflowを使用することで、ワークフロー内で一連のタスクを定義し、それらのタスクの実行順序を制御することができます。
これらのタスクは特定のDAG内に存在しますが、DAG間で呼び出しが可能です。
今回は、このDAG間の呼び出しに使用できるTriggerDagRunOperator
を使ってDAGから別のDAGを呼び出す方法を試してみましたのでご紹介します。
TriggerDagRunOperator
Apache Airflowでは、DAGから別のDAGを呼び出すための基本的なOperatorとしてTriggerDagRunOperator
を用います。
以下にその使用例を示します。
trigger = TriggerDagRunOperator( task_id="trigger_dagrun", # タスクID trigger_dag_id="child_dag", # 呼び出し先のDAG ID )
この例では、TriggerDagRunOperator
を使用して、別のDAG(この例では "child_dag")を呼び出すタスクを作成します。
これにより、parent_dagが実行されるときにchild_dagも一緒に実行されるようになります。
以降、呼び出し元のDAGを親DAG、呼び出し先のDAGを子DAGと呼びます。
それでは、TriggerDagRunOperator
を使ったDAGを作成し、Cloud Composer で動かしてみましょう。
Cloud Composer 環境を作成
DAGを動かす Cloud Composer 環境を作成します。
Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。
test-composer
という名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。
子DAGにパラメータを渡す
親DAGから子DAGにパラメータを渡すためには、TriggerDagRunOperator
のconf
引数を使います。
conf
引数には辞書型のオブジェクトを指定します。
ここでは、親DAGから子DAGに渡したパラメータを子DAG側でログ出力し、正しくパラメータが渡されていることを確認してみます。
親DAG
from airflow import DAG from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.utils.dates import days_ago with DAG( dag_id="parent_dag", start_date=days_ago(1), schedule_interval="@daily" ) as dag: # TriggerDagRunOperatorを使って別のDAG(ここでは"child_dag")を呼び出す trigger = TriggerDagRunOperator( task_id="trigger_dagrun", trigger_dag_id="child_dag", conf={"message": "Hello from parent DAG!"}, # パラメータを渡す ) trigger
子DAG
from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago # パラメータをログに出力する関数 def print_message(**context): print(context["dag_run"].conf["message"]) with DAG( dag_id="child_dag", start_date=days_ago(1), schedule_interval=None ) as dag: # PythonOperator で print_message関数を実行 print_task = PythonOperator( task_id="print_task", python_callable=print_message, provide_context=True, # コンテキストを渡すように設定 ) print_task
上記DAGを Cloud Composer にデプロイ後、しばらくすると親DAG、子DAGともに成功しました。
子DAGのログを確認してみましょう。
親DAGから渡されたメッセージHello from parent DAG!
が出力されています。
正しくパラメータが渡されていることが確認できました。
Airflow UI 上の見え方
DAGs 画面
親DAGから子DAGを呼び出す際、TriggerDagRunOperator
を使用すると、それぞれのDAGが独立してUI上に表示されます。
DAGs 画面では、親子DAGの関係が直接的には見えないため、これらが連携して動作することを意識する必要があります。
Task Details
親DAGのtrigger_dagrunタスク(TriggerDagRunOperator
)の Task Details に [Triggered DAG] ボタン表示があります。
クリックすると子DAGの画面に遷移しました。親DAGから子DAGのトレースはできそうです。
一方、子DAG側には同様のボタンはないため、逆方向(子DAGから親DAG)のトレースはできないようです。
DAG Dependencies
Airflow UI 上部の [browse] - [DAG Dependencies] から、DAGの依存関係を示す図を確認することができます。
子DAGのスケジュール設定について
子DAGのスケジュール設定の必要性は、子DAGの使用方法に依存します。
親DAGから呼び出されることが前提の場合、子DAGのスケジュール設定は不要となります。
子DAGのschedule_interval
をNone
に設定することで、スケジューリングから除外されます。
親DAG
with DAG( dag_id="parent_dag", start_date=days_ago(1), schedule_interval="@daily" ) as dag:
子DAG
with DAG( dag_id="child_dag", start_date=days_ago(1), schedule_interval=None ) as dag:
ただし、子DAGが独立して実行されるケースがある場合は、子DAGのスケジュール設定を考慮する必要があります。
子DAGの完了を待つ
親DAGのtrigger_dagrunタスク(TriggerDagRunOperator
)は、子DAGの呼び出し成功とともに直ちに完了します。
ここでは、1分間スリープするタスクを持つ子DAGの呼び出し例を考えてみましょう。
子DAGは1分間のスリープを終えてから完了しますが、
親DAGは子DAGの呼び出し後すぐに完了します。
全体の処理時間は約3秒であり、子DAGの完了を待たずに処理が完了していることが確認できます。
wait_for_completion
wait_for_completion
パラメータを利用すると、親DAGが子DAGの完了を待つことが可能です。
以下のように、wait_for_completion
パラメータにTrue
を設定してみましょう。
親DAG
from airflow import DAG from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.utils.dates import days_ago with DAG( dag_id="parent_wait_dag", start_date=days_ago(1), schedule_interval="@daily" ) as dag: # TriggerDagRunOperatorを使って別のDAG(ここでは"child_dag")を呼び出す trigger = TriggerDagRunOperator( task_id="trigger_dagrun", trigger_dag_id="child_sleep_dag", wait_for_completion=True, retries=0, ) trigger
子DAG
import time from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago def sleep_for_a_while(): time.sleep(60) # 1分間スリープ with DAG( dag_id="child_sleep_dag", start_date=days_ago(1), schedule_interval=None ) as dag: # スリープするタスク sleep_operator = PythonOperator( task_id="sleep", python_callable=sleep_for_a_while ) sleep_operator
これにより、親DAGは子DAGの呼び出し後も実行中の状態が続き、
子DAGが完了してから親DAGも完了しました。
全体の処理時間は約2分で、親DAGが子DAGの完了を待って完了していることが確認できます。
子DAGが失敗した場合
通常、TriggerDagRunOperator
を使用したタスクは、子DAGの実行結果に関係なく成功します。
wait_for_completion
パラメータにTrue
を設定して呼び出した子DAGが失敗した場合はどうなるでしょうか?
子DAGのsleepタスクが実行中に[Mark Failed]ボタンを押下し、故意に失敗させてみます。
すると、親DAGのtrigger_dagrunタスクも失敗しました。
wait_for_completion
パラメータにTrue
を設定すると、親DAGのtrigger_dagrunタスク(TriggerDagRunOperator
)の成功・失敗が子DAGの結果に連動することが分かります。
まとめ
以上、TriggerDagRunOperator
を使ってDAGから別のDAGを呼び出す方法をご紹介しました。
TriggerDagRunOperator
を活用することで、複雑なワークフローを独立した部分(DAG)に分割して管理することが可能になります。
これにより、各DAGが独立して動作し、それぞれのDAG間の依存関係を柔軟に制御することができます。
親DAGから子DAGに対してパラメータを渡せるので、動的なワークフロー設定にも対応できそうですね。
一方で、TriggerDagRunOperator
を使用すると Airflow UI上で親DAGと子DAGの関連性が直接的に見なくなってしまうので、その点は注意が必要そうだと感じました。
今回の検証が誰かのお役に立てれば幸いです!
参考
- airflow.operators.trigger_dagrun — Airflow ドキュメント
- DAG のタスクをグループ化する | Cloud Composer | Google Cloud
- Cross-DAG dependencies | Astronomer Documentation
- Implement DAG Dependencies Using TriggerDagRun Operator | by Aashish | GumGum Tech Blog | Medium
- Airflow の DAG 間で依存関係を設定する - goodbyegangsterのブログ
- mwaa - Airflow 2.2 TriggerDagRunOperator wait_for_completion behavior - Stack Overflow
- Cloud Composer の概要 | Google Cloud
- Cloud Composer 環境を作成する | Google Cloud