dbt Cloud Orchestration で Fivetran から dbt Cloud のジョブを実行する #Fivetran
はじめに
Fivetran では dbt Cloud Orchestration として Fivetran 側から dbt Cloud のジョブをトリガー実行できる機能が提供されています。こちらの一通りの手順をまとめてみたく記事としました。
また、本機能については以下の記事でも紹介されていますので、こちらもあわせてご参照ください。
概要
公式ドキュメントとしては以下に記載があります。
dbt Cloud でもスケジュールなどでのジョブの実行が可能ですが、この機能では以下のいずれかをトリガーに Fivetran から dbt Cloud のジョブを実行できます。
- カスタム スケジュール
- 指定の時間間隔で dbt Cloud のジョブをトリガーできる
- 統合スケジュール
- 選択したコネクタの同期をトリガーに dbt Cloud のジョブを実行
前提条件
- dbt Cloud のプラン
- Team または Enterprise
- dbt Cloud Orchestration を使用するには、dbt Cloud への API アクセスが必要なため
本機能の制約
- 1つのプロジェクトは、1つの宛先にしか設定できないため、同じプロジェクトを他の宛先に設定したい場合は、最初にそのプロジェクトを現在の宛先から削除する必要があります
- 統合スケジュールでは、ジョブをトリガーするすべてのコネクタは、同じ宛先に関連付けられている必要があります
- コネクタの同期により、同期で新しいデータが到着したかどうかに関係なく、ジョブの実行自体はトリガーされます
事前準備
Fivetran 側
宛先には Snowflake を使用します。以下の手順で宛先として定義しておきました。
データソースには Google スプレッドシートを使用します。ここでは ChatGPT に作ってもらった以下のサンプルデータを使用します。
- orders テーブル
order_id | customer_id | order_date | total_amount | product_id | quantity |
---|---|---|---|---|---|
1 | 101 | 2024-08-01 | 150 | 1 | 2 |
2 | 102 | 2024-08-03 | 100 | 2 | 1 |
3 | 103 | 2024-08-05 | 75 | 3 | 3 |
4 | 101 | 2024-08-07 | 300 | 1 | 4 |
5 | 104 | 2024-08-09 | 50 | 4 | 1 |
6 | 105 | 2024-08-10 | 500 | 2 | 5 |
7 | 106 | 2024-08-12 | 150 | 3 | 2 |
8 | 102 | 2024-08-13 | 230 | 4 | 3 |
9 | 103 | 2024-08-15 | 90 | 1 | 1 |
10 | 107 | 2024-08-18 | 250 | 2 | 2 |
- customer テーブル
customer_id | customer_name | contact_email |
---|---|---|
101 | Alice | [email protected] |
102 | Bob | [email protected] |
103 | Charlie | [email protected] |
104 | David | [email protected] |
105 | Eve | [email protected] |
106 | Frank | [email protected] |
107 | Grace | [email protected] |
それぞれをスプレッドシートの異なるシートに張り付け以下の手順でコネクタの設定を行います。
接続設定後、初期同期を行います。この時点で下図のようにテーブルが作成されます。
dbt Cloud 側
データロードが完了したら dbt Cloud 側で検証用にサンプルプロジェクトを作成し初期化を行います。models
フォルダは以下の構成としました。
models
├── marts
│ └── total_spent_per_customer.sql
├── source.yml
└── staging
├── stg_google_sheets__customer.sql
└── stg_google_sheets__order.sql
各ファイルは以下のように定義しています。
version: 2
sources:
- name: google_sheets
database: fivetran_database
schema: google_sheets
tables:
- name: orders
- name: customer
select
*
from
{{ source('google_sheets', 'orders') }}
select
*
from
{{ source('google_sheets', 'customer') }}
WITH customer_data AS (
SELECT
customer_id,
customer_name
FROM
{{ ref('stg_google_sheets__customer') }}
),
order_data AS (
SELECT
customer_id,
total_amount
FROM
{{ ref('stg_google_sheets__order') }}
),
final AS (
SELECT
customer_data.customer_id,
customer_data.customer_name,
SUM(order_data.total_amount) AS total_spent
FROM
customer_data
JOIN
order_data ON customer_data.customer_id = order_data.customer_id
GROUP BY
customer_data.customer_id, customer_data.customer_name
ORDER BY
total_spent DESC
)
SELECT
*
FROM
final
リネージグラフは下図のようになります。
あわせてdbt_project.yml
でmatrs
フォルダのモデルはテーブルとして作成されるように設定しておきました。
models:
my_new_project:
marts:
+materialized: table
開発環境のスキーマでテスト後、環境とジョブを定義します。
- 環境の定義:prod スキーマに作成する設定
- ジョブの設定
Fivetran 側でトリガー実行するため、ここでは dbt Cloud 側ではスケジュール等のトリガーの設定は行いませんでした。
セットアップ
事前準備ができたので、以降の手順で dbt Cloud Orchestration の設定を行います。ドキュメントとしては以下に記載があります。
dbt Cloud:サービス トークンの取得
Fivetran から dbt Cloud のジョブをトリガーするために、トークンを取得します。dbt Cloud にログインし右上の歯車マークから「Account settings > Service tokens」をクリックします。
トークンの一覧画面が開くので [+ Create service token] をクリックします。
任意のトークン名を入力し「Permission set」でトークンのパーミッションを設定します。ここでは dbt Cloud のプランに応じてそれぞれ以下の通り設定します。
- Team プラン:Member
- Enterprise プラン:Developer
あわせてトークンでアクセス可能なプロジェクトと環境を指定します。下図ではパーミッションとして「Developer」を選択しています。
設定を保存するとトークンが表示されるので、安全な場所に控えておきます。
Fivetran:dbt Cloud Orchestration の設定
Fivetran ダッシュボードの「Transformations」をクリックし、対象となる宛先を選択します。
はじめての場合、dbt Cloud のリージョン、トークンの登録画面が表示されるので、先の手順で取得した値を入力し [Authenticate] をクリックします。するとトークン権限でアクセス可能な dbt プロジェクトを選択できるので、ドロップダウンから選択します。
設定を保存するとプロジェクトへの接続テストが実行されます。
テストが終わると下図の表示になるので [Add Transforamtion] をクリックします。
どのタイプの変換を行うかを選択します。ここでは dbt Cloud 上のプロジェクトを使用した変換を行いたいので、dbt Cloud の [Add transormations] をクリックします。
すると下図の表示となり、対象のプロジェクト内でアクセス可能なジョブが表示されます。
先の手順で [Manage project] をクリックすると、接続解除などのプロジェクト設定を行えます。
トリガー実行したいジョブを選択すると、どのようなスケジュール実行とするか選択できます。ここでは、Fivetran の同期とあわせてジョブをトリガーしたいので、「Integrated」を選択します。
なお、「Custom」を選択すると指定の頻度でスケジュール実行が可能です。
次に、対象の宛先でジョブのトリガー条件となるコネクタを指定します。複数のコネクタを指定可能です。
コネクタを指定後 [Save and Run] をクリックして設定を完了です。同期のタイミングでトリガーされるため、設定完了後すぐにジョブがトリガーされることはありません。
コネクタの同期を実行
ここでは、手動でコネクタの同期を実行してみます。
同期開始後、すぐにジョブ側もを実行状態となっていました。
実行完了後は下図の表示となります。
Snowflake 側を確認するとジョブで指定した環境で設定したスキーマにオブジェクトが作成されています。
データも確認してみます。データマートを想定したテーブルは下図のように作成されています。(この時点で7レコード)
レコードを追加
さいごにデータソース側でレコードを追加してみます。
- orders テーブル:青枠の3レコードを追加
- customer テーブル:青枠の2レコードを追加
レコードを追加後、同期を実行します。実行後 Snowflake 側でそれぞれテーブルを確認します。
- orders テーブル:赤枠のレコードが追加
- customer テーブル:赤枠のレコードが追加
dbt Cloud との連携を実施しているので、同期と合わせてジョブも実行されます。
変換処理の結果(モデル)を見ると、ジョブが実行された結果こちらも更新されていることが確認できます。赤枠のレコードがデータソースの更新とあわせて追加・更新されています。
さいごに
Fivetran の dbt Cloud Orchestration で dbt Cloud のジョブをトリガーしてみました。Fivetran のデータロード機能と dbt Cloud の変換機能それぞれの強みを活用しつつ簡単に連携設定ができる機能なので組み合わせて使っていけると便利だなと感じました。こちらの内容が何かの参考になれば幸いです。