Fivetran で Aurora PostgreSQL のデータを Snowflake に同期してみた #Fivetran

Fivetran で Aurora PostgreSQL のデータを Snowflake に同期してみた #Fivetran

Clock Icon2024.02.23

Fivetran で Aurora PostgreSQL のデータを Snowflake に同期し、スキーマの自動移行を試してみましたので、その際の条件や手順について記事にしました。

Aurora PostgreSQL との連携

Fivetran で Aurora PostgreSQL をデータソースとする場合、以下のポイントに注意します。
※こちらは 2024/2/23 時点の情報のためご注意ください。

前提条件

  • PostgreSQL version 11 - 15
  • SSL/TLS を使用した DB インスタンスへの接続の暗号化が有効であること

接続方法

Aurora PostgreSQL への代表的な接続方法として以下があります。

  • 直接接続
    • データベースに対して直接 Fivetran の IP CIDR を許可する
  • SSH トンネルによる接続
    • インスタンスに直接接続できない場合のオプション
    • パブリックサブネットの踏み台サーバーを使用
  • AWS PrivateLink
    • インスタンスに直接接続できない場合のオプション
      • ※一部のソースでは BETA ですが、Aurora PostgreSQL は一般提供となっています。
    • Fivetran は、ビジネスクリティカル以上のエディションであることが必要
    • サポートするリージョンにリソースがあること
      • 国内であれば、ap-northeast-1 (Tokyo)

同期方法

Aurora PostgreSQL では、以下の同期方法が提供されています。

  • Logical replication
    • ログ先行書き込み (WAL) に基づく同期
    • 主キーを持つテーブルの行の削除(delete)を複製可能
    • プライマリーインスタンスのみを対象
      • リードレプリカでは有効にできない。
  • XMIN
    • システム列である xmin を使用
    • 更新されたデータの検出に全てのテーブルを完全にスキャンするため、論理レプリケーションよりもパフォーマンスは低い
    • 行の削除(delete)は複製できない
  • Fivetran Teleport Sync
    • Fivetran の推奨
    • 行の削除(delete)を複製可能
      • リードレプリカも対象とできる
    • 同期対象のテーブルは、主キーを持つ必要があります
    • サポートする主キーのタイプ
      • Constrained VARCHAR
      • TEXT
      • UUID
      • BIGINT
      • SMALLINT
      • INT
      • Constrained NUMERIC
      • DATE

同期方法ごとにサポートする DDL の変更は下図の通りです。

※図はSupport for DDL changes | Fivetran より引用。

検証環境

以下の環境を使用しています。

  • 宛先
    • Snowflake
  • データソース
    • Aurora PostgreSQL Compatible with PostgreSQL 15.4
    • インスタンスタイプ:db.r5.large
  • 接続方法
    • 直接接続
  • 同期方法
    • Fivetran Teleport Sync

レコードの削除もキャプチャしたいので、同期方法としてFivetran Teleport Sync を使用します。また、このために後述の手順では、サンプルデータとして、主キーを持つテーブルを作成します。

事前準備

手元のクライアント(psql)からアクセス可能な検証用の Aurora PostgreSQL を構築し、以下の通り Snowfalke に連携するデータを用意します。

  • データベースの作成
CREATE DATABASE sampledb;
  • テーブルの作成とデータの追加
    • Fivetran Teleport Sync のために、主キーを持つテーブルを作成します。
--テーブル作成
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100)
);
--レコードの追加
INSERT INTO users (name, email) VALUES ('John Doe', '[email protected]');
INSERT INTO users (name, email) VALUES ('Jane Smith', '[email protected]');
INSERT INTO users (name, email) VALUES ('Mike Brown', '[email protected]');
  • Fivetran 用の読み取り専用ユーザーを作成
CREATE USER fivetran_user WITH PASSWORD 'fivetran';
  • 権限を付与
GRANT pg_read_all_data TO fivetran_user;
  • 権限を確認
> \du
                                                                                          List of roles
      Role name      |                         Attributes                         |                                                  Member of                                                   
---------------------+------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------
 fivetran_user       |                                                            | {pg_read_all_data}
 postgres            | Create role, Create DB                                    +| {rds_superuser}
                     | Password valid until infinity                              | 
 rds_ad              | Cannot login                                               | {}
 rds_iam             | Cannot login                                               | {}
 rds_password        | Cannot login                                               | {}
 rds_replication     | Cannot login                                               | {}
 rds_superuser       | Cannot login                                               | {pg_read_all_data,pg_write_all_data,pg_monitor,pg_signal_backend,pg_checkpoint,rds_replication,rds_password}
 rdsadmin            | Superuser, Create role, Create DB, Replication, Bypass RLS+| {}
                     | Password valid until infinity                              | 
 rdswriteforwarduser | No inheritance                                             | {}
 user1               | Create DB                                                  | {}

Fivetran の設定

Destination

データの連携先には Snowflake を使用しました。Snowflake を Destination に設定する方法は、以下の記事をご参照ください。

Connector

コネクタに Aurora PostgreSQL を追加します。

  • 接続情報
  • 接続方法
    • ここでは「Connect directly」を指定しました。
  • 同期方法
    • Fivetran Teleport Sync を指定しました。

下図の通り、Fivetran の IP アドレスのリストが表示されるので、セキュリティグループでインバウンド通信を許可しておきます。

Fivetran IP Addresses | Fivetran
接続設定は以上になります。この状態で接続テストを行うと下図の表示となりました。

最初の証明書はルート証明機関 (CA) で、2 番目はリーフ証明書になります。いずれかを選択し、信頼できるものであることを確認することで、コネクタのセットアップを完了することができます。
What TLS Certificate Should I Trust in Fivetran’s Dashboard

再度接続テストを行います。

接続テストが完了すると、スキーマの情報が取得されます。

情報が取得されると、データベース内のスキーマやテーブルが表示されるので、同期したいテーブル(ここでは users テーブル)にチェックが入っている状態で [Save & Continue ] をクリックします。

どの程度まで変更をキャプチャするかの確認画面が表示されるので、任意のオプションを選択します。ここでは「Allow all」を選択し [Continue] をクリックします。

以上で設定は完了です。[Start initial Sync] をクリックし、初期同期を開始します。

初期同期が完了すると下図の表示になります。

  • Snowflakeで確認
    • Snowflake 側で権限を持つユーザーでテーブルを確認します。
USE SCHEMA FIVETRAN_DATABASE.AURORA_POSTGRES_PUBLIC;
SELECT * FROM USERS;

下図の通り同期されていました。

「_FIVETRAN_DELETED」などは Fivetran によって追加されるシステム列です。詳細は以下をご参照ください。
Fivetran audit tables and system columns | Warehouse documentation

スキーマの自動移行の検証

Fivetran では、対応するデータソースや同期方法など条件を満たせば、データソース側のテーブルスキーマが変更された場合でも、自動的にその変更を宛先側に反映します。
通常のレコードの変更も含め、スキーマの変更があった際に、どのようにこの変更が反映されるかも確認しておきます。

レコードの追加

はじめに、データソース側で以下の通り、レコードを追加してみます。

INSERT INTO users (name, email) VALUES ('Alex Johnson', '[email protected]');

Fivetran の同期が完了し、Snowflakeで確認すると、下図のようにレコードが追加されます。

レコードの変更

id が 1 のレコードについて、name 列と email 列を変更してみます。

UPDATE users SET name = 'Updated Name', email = '[email protected]' WHERE id = 1;
> select * from users;
 id |     name     |           email           
----+--------------+---------------------------
  2 | Jane Smith   | [email protected]
  3 | Mike Brown   | [email protected]
  4 | Alex Johnson | [email protected]
  1 | Updated Name | [email protected]

同期完了後、Snowflakeで確認すると、変更がキャプチャされていることが確認できます。

レコードの削除

Aurora PostgreSQL で Fivetran Teleport Sync による同期を行い、対象のテーブルが条件を満たす主キーを持つ場合、レコードの削除もキャプチャ可能です。(Logical replication でも可能)以下の通り、レコードを削除してみます。

--idが2のレコードを削除
DELETE FROM users WHERE id = 2;
sampledb=>  SELECT * FROM users ORDER BY id;
 id |     name     |           email           
----+--------------+---------------------------
  1 | Updated Name | [email protected]
  3 | Mike Brown   | [email protected]
  4 | Alex Johnson | [email protected]
(3 rows)

同期完了後、Snowflake で確認すると「_FIVETRAN_DELETED」が True に変更され論理削除が実施されていることを確認できます。

列の追加

テーブル構造の変化を伴う列の追加を行います。

--列を追加
ALTER TABLE users ADD COLUMN birthday DATE;
>  SELECT * FROM users ORDER BY id;
 id |     name     |           email           | birthday 
----+--------------+---------------------------+----------
  1 | Updated Name | [email protected] | 
  3 | Mike Brown   | [email protected]    | 
  4 | Alex Johnson | [email protected]  | 
(3 rows)

その後、レコードも追加しておきます。

INSERT INTO users (name, email, birthday) VALUES ('Chris Green', '[email protected]', '1990-05-15');
> SELECT * FROM users;
 id |     name     |           email           |  birthday  
----+--------------+---------------------------+------------
  3 | Mike Brown   | [email protected]    | 
  4 | Alex Johnson | [email protected]  | 
  1 | Updated Name | [email protected] | 
  5 | Chris Green  | [email protected]   | 1990-05-15
(4 rows)

同期が完了後、Fivetran 側を確認すると、UI でもカラムの追加を確認できます。(birthday 列)

Snowflake でも下図の通り、列が追加されます。

列の削除

列の削除を行ってみます。

--EMAIL列を削除
ALTER TABLE users DROP COLUMN email;
> SELECT * FROM users;
 id |     name     |  birthday  
----+--------------+------------
  3 | Mike Brown   | 
  4 | Alex Johnson | 
  1 | Updated Name | 
  5 | Chris Green  | 1990-05-15
(4 rows)

Snowflakeで確認すると、下図のように変更が反映されています。

Fivetran では、ソースから削除された列は、宛先にそのまま保持されます。代わりに、宛先の対応する列の値には Null が書き込まれます。すでに削除済みのレコード(_FIVETRAN_DELETED が True)は変更がキャプチャされないので、そのままの値が残っています。
Column Deleted from the Source is Still in the Destination

列の名前を変更

既存列の名称を変更してみます。

--name列の名称をfullnameに変更
ALTER TABLE users RENAME COLUMN name TO fullname;
> SELECT * FROM users;
 id |   fullname   |  birthday  
----+--------------+------------
  3 | Mike Brown   | 
  4 | Alex Johnson | 
  1 | Updated Name | 
  5 | Chris Green  | 1990-05-15
(4 rows)

同期完了後、Snowflake で確認すると、変更後の列名を持つ新しい列が追加されています。変更前の列は、値が NULL に変更されます。

テーブルの追加

同期済みのテーブルと同じスキーマにテーブルを追加してみます。

CREATE TABLE products (
  product_id SERIAL PRIMARY KEY,
  product_name VARCHAR(255),
  price DECIMAL(10, 2),
  category VARCHAR(100)
);
INSERT INTO products (product_name, price, category) VALUES 
('Laptop', 1200.00, 'Electronics'),
('Smartphone', 800.00, 'Electronics'),
('Coffee Maker', 150.00, 'Kitchen Appliances'),
('Desk Lamp', 45.99, 'Furniture'),
('Ergonomic Chair', 249.99, 'Office Supplies');

> SELECT * FROM products;
 product_id |  product_name   |  price  |      category      
------------+-----------------+---------+--------------------
          1 | Laptop          | 1200.00 | Electronics
          2 | Smartphone      |  800.00 | Electronics
          3 | Coffee Maker    |  150.00 | Kitchen Appliances
          4 | Desk Lamp       |   45.99 | Furniture
          5 | Ergonomic Chair |  249.99 | Office Supplies
(5 rows)

Fivetranでもすぐに変更が反映されます。

同期完了後、Snowflake 側でも自動的にテーブルの追加を確認できます。

レコードも問題なく同期されています。

テーブルの削除

さいごにテーブルを削除してみます。

--productsテーブルを削除
DROP TABLE products;

Fivetran では、下図のような形で変更が反映されています。

ただし、Snowflake(Destination)ではドキュメント記載の通り、テーブルの削除までは反映されません。

同期の頻度

同期完了後は、同期の頻度も設定可能です。同期可能な頻度は契約プランにより異なります。Enterprise 以上のエディションであれば、最短1分間隔での同期を指定可能です。

※一部のコネクタは1分間隔での同期に非対応のため、ご注意ください。
Sync frequency and scheduling | Fivetran

さいごに

Fivetranで Aurora PostgreSQL のデータを同期してみました。
キャプチャしたい変更の要件によっては、同期方法やソース側のテーブルの条件が満たされる必要があるので、この点は注意が必要と感じました。
こちらの内容が何かの参考になれば幸いです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.