Cloud Composer で Deferrable Operators を使って BigQuery ジョブの完了を待機してみた #cm_google_cloud_adcal_2024
こんにちは!エノカワです。
この記事はクラスメソッドの Google Cloud アドベントカレンダー2024 の 23日目の記事です。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
試したこと
今回は、Deferrable Operators を活用して BigQuery ジョブの完了を待機し、その動作の違いや利点を確かめてみました。
Deferrable Operators を使うと外部リソースの待機中に Worker を解放できるため、大幅にリソース効率を向上できます。
Deferrable Operators とは
Deferrable Operators は、Airflow 2.2 以降で導入された機能です。
イベント駆動型のアーキテクチャを採用することにより、外部リソースの待機中は Worker リソースをほとんど使わずに済む のが最大の特徴です。
以下のような課題に対して有効です。
- BigQuery や外部 API 呼び出しなど、ジョブの完了待ち時間が長い
- Worker ノードのリソースを節約したい
- 多数のタスクを並列に実行したい
Airflow では、ポーリング(一定間隔で状態を確認)するようなオペレーターで長時間待機が発生すると、タスクが Running
のまま Worker を占有してしまいがちです。
しかし、Deferrable Operators を利用すると、待機の間は Worker を解放 (Deferred
状態) するため、リソースの有効活用が期待できます。
Google Cloud がサポートする Deferrable Operators
Google Cloud が提供する演算子の中には、遅延可能モード(Deferrable)をサポートするものがあります。
具体的な一覧や詳細は、公式ドキュメントをご参照ください。
今回、Google Cloud がサポートする Deferrable Operators の一例である BigQueryInsertJobOperator
を使用し、BigQuery ジョブの実行効率を比較します。
環境作成
DAG を動かすための Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから、Composer 3 を選択します。
test-composer
という名前の環境を、東京リージョンで作成します。
サービスアカウントは本番運用を考えるとユーザーマネージドが推奨ですが、今回は検証用なのでデフォルトのものを使用しています。
DAG を作成する
今回は、約 1 分ほど処理がかかるクエリを実行する BigQuery ジョブを並列して実行させ、通常オペレーター版 と Deferrable Operator 版 の 2 つの DAG を用意しました。
以下は、通常のオペレーターと Deferrable Operator を比較するための DAG の例です。
from datetime import datetime
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
# 約1分の待機処理を含むクエリ
SLEEP_QUERY = """
BEGIN
DECLARE x INT64 DEFAULT 0;
REPEAT
SET x = x + 1;
UNTIL x >= 3000 -- 約1分の待機
END REPEAT;
END;
"""
# 通常のオペレーターを使用するDAG
with DAG(
"bigquery_insert_normal",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
max_active_tasks=1,
) as dag_normal:
# 1から5までの通番でタスクを作成
for i in range(1, 6):
insert_job_normal = BigQueryInsertJobOperator(
task_id=f"insert_job_normal_{i}",
configuration={
"query": {
"query": SLEEP_QUERY,
"useLegacySql": False
}
}
)
# Deferrable Operatorを使用するDAG
with DAG(
"bigquery_insert_deferrable",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
max_active_tasks=1,
) as dag_deferrable:
# 1から5までの通番でタスクを作成
for i in range(1, 6):
insert_job_async = BigQueryInsertJobOperator(
task_id=f"insert_job_async_{i}",
configuration={
"query": {
"query": SLEEP_QUERY,
"useLegacySql": False
}
},
deferrable=True
)
SLEEP_QUERY
では、簡易的に約 1 分間の待機を作っていますmax_active_tasks=1
により、同時に実行されるタスク数を 1 に抑えています(Deferrable Operators でも Worker 解放のメリットがある点を確認するため)- Deferrable Operator を使用するには、オペレーターの引数に deferrable=True を設定する必要があります
DAG を実行する
通常のオペレーターを使用するDAG
まずは 通常のオペレーターを使用するDAG を実行した様子を見ます。
1 つのタスクだけが Running
状態となり、他のタスクは実行待ち (scheduled
) のまま順番に処理されていくのがわかります。
BigQuery ジョブを実行しながら約 1 分間の待機が入るため、タスク 5 つ分だと合計で 5 分以上を要する見込みです。
待機中もタスクが Running
のままのため、Worker リソースは常に占有され、ほかのタスクが同時に実行されにくい状態になってしまいます。
DAG 実行後、ガントチャートを見ると、タスクが完全に直列で実行されていることがわかります。
もちろん max_active_tasks=1
の設定に依存しますが、待機中もタスクは Running
のままで、 Worker が占有されます。
Deferrable Operator を使用するDAG
次に Deferrable Operator を使用するDAG を実行します。
今度はタスクが順次 Deferred
状態へ移行し、最終的には全タスクが同時に Deferred
のまま待機している様子が確認できます。
タスクが Deferred
中は Worker リソースが解放されるため、実際には 5 つのタスクを同時に仕掛けていても、待機に伴うリソースの占有がありません。
そのため、DAG 全体の所要時間は 1 つのタスクが終わるまでとほぼ変わらず、効率よく処理が進みます。
ガントチャートを見ると、5つのタスクがほぼ同時に実行され、待機中は Worker のリソースを消費せずに待っているため、効率よく処理されている様子がわかります。
まとめ
Cloud Composer 上で Deferrable Operators を活用すると、BigQuery ジョブなど外部リソースへの長時間待機を効率化できることがわかりました。
特に以下のようなメリットを得られます。
- リソース効率の向上
待機中はDeferred
状態になり、Worker リソースが解放されるため、Cloud Composer のノードリソースを節約できます。 - タスクのスループット向上
同時実行したタスクがすべて待機中でも、リソースの奪い合いが発生しにくいため、より多くのタスクを並列実行できます。 - コスト削減
リソース占有時間を最小化でき、必要に応じてスケールダウンもしやすいので、結果的に Cloud Composer の運用コストも抑えられます。
具体的には、BigQuery ジョブの完了待機や長時間かかる外部 API の監視、ファイルアップロード完了待ち など、実行結果の返却が遅延するシナリオに最適です。
もし、大規模なワークフローや多数のジョブでリソースの枯渇を感じているようであれば、Deferrable Operators をぜひ試してみてください。
明日 12/24 は 和田祐介 さんです。よろしくお願いします!
参考
- 遅延可能な演算子を使用する | Cloud Composer | Google Cloud
- Airflow DAG の改良による Cloud Composer の最適化 | Google Cloud 公式ブログ
- Google Cloud Composer での Airflow DAG とタスクの同時実行について理解する | Google Cloud 公式ブログ
- 環境のパフォーマンスと費用を最適化する | Cloud Composer | Google Cloud
- Cloud Composer 環境を作成する | Google Cloud
- Deferrable Operators & Triggers — Airflow Documentation
- Deferrable Operators入門 - Speaker Deck
- Airflow Deferrable Operators
- BigQueryのトランザクションについて掘り下げて検証してみた