BigQuery のスケジューリングクエリって何がうれしいの? ~ 毎時 GCS データを集計するユースケースを想定して動かしてみた~
こんにちは、データアナリティクス事業本部のみかみです。
本エントリは、クラスメソッド BigQuery Advent Calendar 2020 の 13 日目のエントリです。 25日のアドベントカレンダー終了まで、弊社クラスメソッド データアナリティクス事業本部のメンバーで、Google BigQuery に関する記事を紡いでいこうと思います。
BigQuery のスケジューリングクエリって何がうれしいの?
BigQuery では管理コンソールの UI から、SQL のスケジュール実行を登録することができます。
つまり BigQuery では、マートデータ作成などの SQL 実行で完結する処理(+α)ならば、バッチジョブを実装する必要なく、スケジューリングクエリ機能で実現することができるのです! SQL 限定なので、もちろん全てのバッチ処理をスケジューリングクエリで置き換えることはできませんが、ジョブのコーディングやサードパーティ製ツールを導入する必要なく SQL がスケジュール実行できるって、もしかしてすごく便利なのでは?!
やりたいこと
- BigQuery のスケジューリングクエリを試してみたい
- 毎時 GCS データを集計して結果を BigQuery のテーブルに格納したい
どこか他のシステムから、GCS に毎時 CSV のデータファイルが連携されるものとします。 連携された CSV データを集計して、集計結果を BigQuery のテーブルに格納するケースを想定しました。
通常であれば、データ集計と BigQuery テーブルへの結果格納処理を実行するジョブをコーディングして、Cloud Functions などのサーバレス環境やバッチサーバから実行する方法、またはサードパーティ製のジョブ作成&実行ツールの導入を検討するところです。
BigQuery のスケジューリングクエリを使えば、BigQuery 管理画面の GUI 操作だけで SQL をスケジュール実行する設定ができるので、バッチコードを書く必要なく、GCS データの集計結果を毎時 BigQuery テーブルに格納することができてしまいます!
GCS の連携データを準備
CSV データを 1 時間に 1 回 GCS に連携する、どこか別のシステムの処理を、Cloud Scheduler、Cloud Pub/Sub、Cloud Functions を使って準備します。
5 行~20 行のクリスマスメニューの売上げデータをランダムで作成して CSV ファイルとして GCS に Put する、以下の Cloud Functions の Python コードと、必要なパッケージ情報を記載した requirements.txt
を準備しました。
import pandas as pd import random from datetime import datetime from pytz import timezone from google.cloud import storage def put_csv_sample(data, context): # create data FOODS = [ {'name':'クリスマスケーキ', 'price':3000}, {'name':'ブッシュドノエル', 'price':2000}, {'name':'フライドチキン', 'price':1000}, {'name':'ローストチキン', 'price':2000}, {'name':'ローストビーフ', 'price':2000}, {'name':'ピザ', 'price':1000}, {'name':'ポットパイ', 'price':500}, {'name':'寿司', 'price':5000}, {'name':'オードブル', 'price':3000}, {'name':'シャンパン', 'price':2000}, {'name':'スパークリングワイン', 'price':2000}, {'name':'シャンメリー', 'price':500}, ] row_cnt = random.randint(5, 20) items = [random.choice(FOODS) for _ in range(row_cnt)] df = pd.DataFrame({ 'name':[d.get('name') for d in items], 'price':[d.get('price') for d in items], 'count':[random.randint(1, 5) for _ in range(row_cnt)], 'put_datetime':[datetime.now(timezone('Asia/Tokyo')).strftime('%Y-%m-%d %H:%M:%S') for _ in range(row_cnt)] }) df = df[['name', "price", 'count', 'put_datetime']] print(df) # put data to GCS now = datetime.now(timezone('Asia/Tokyo')) bucket_name = 'test-mikami-schedule' blob_name = 'advent_calendar/foods_christmas_{}.csv'.format(now.strftime('%Y%m%d%H%M%S')) client = storage.Client() bucket = client.get_bucket(bucket_name) blob = bucket.blob(blob_name) blob.upload_from_string(data=df.to_csv(index=False), content_type='text/csv') print("Blob {} created.".format(blob_name))
google-cloud-storage==1.28.1 pandas==1.0.0
Cloud Shell から gcloud
コマンドを使って、以下を作成します。
put_csv_sample
という名前の Cloud Functions の関数と、トリガーとなる 同名の Cloud Pub/Sub トピックsub_put_csv_sample
という名前のput_csv_sample
トピックのサブスクリプション- 毎時 0 分に
put_csv_sample
トピックにメッセージを送信する同名の Cloud Scheduler のジョブ
- Google Cloud Pub/Sub トリガー | Cloud Functions ドキュメント
- Cloud Pub/Sub のチュートリアル | Cloud Functions ドキュメント
- Cloud Scheduler クイックスタート | Cloud Scheduler ドキュメント
- Cloud Shell の使用 | Cloud Shell ドキュメント
- gcloud | コマンドライン インターフェース リファレンス
先ほどの Python コードを main.py
という名前で保存し、requirements.txt
と一緒に Cloud Shell にアップロードしました。
gcp_da_user@cloudshell:~/test_schedule (cm-da-mikami-yuki-258308)$ ls -l total 8 -rw-r--r-- 1 gcp_da_user gcp_da_user 1690 Dec 8 09:03 main.py -rw-r--r-- 1 gcp_da_user gcp_da_user 45 Dec 8 09:03 requirements.txt
以下の 3 つの gcloud
コマンドを実行します。
gcloud functions deploy put_csv_sample --runtime python37 --trigger-topic put_csv_sample gcloud pubsub subscriptions create sub_put_csv_sample --topic put_csv_sample gcloud scheduler jobs create pubsub put_csv_sample --schedule "00 */1 * * *" --time-zone "Asia/Tokyo" --topic put_csv_sample --message-body "test"
gcp_da_user@cloudshell:~/test_schedule (cm-da-mikami-yuki-258308)$ gcloud functions deploy put_csv_sample --runtime python37 --trigger-topic put_csv_sample Deploying function (may take a while - up to 2 minutes)...⠹ For Cloud Build Stackdriver Logs, visit: https://console.cloud.google.com/logs/viewer?project=cm-da-mikami-yuki-258308&advancedFilter=resource.type%3Dbuild%0Aresource.labels.build_id %3De37ed193-3450-4956-856b-40c8d151aeb1%0AlogName%3Dprojects%2Fcm-da-mikami-yuki-258308%2Flogs%2Fcloudbuild Deploying function (may take a while - up to 2 minutes)...done. availableMemoryMb: 256 buildId: e37ed193-3450-4956-856b-40c8d151aeb1 entryPoint: put_csv_sample eventTrigger: eventType: google.pubsub.topic.publish failurePolicy: {} resource: projects/cm-da-mikami-yuki-258308/topics/put_csv_sample service: pubsub.googleapis.com ingressSettings: ALLOW_ALL labels: deployment-tool: cli-gcloud name: projects/cm-da-mikami-yuki-258308/locations/us-central1/functions/put_csv_sample runtime: python37 serviceAccountEmail: [email protected] sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-2365985c-f7e0-4882-9f01-971d82702e1f/25471d36-d53c-4fe1-bca6-e617efec64be.zip?GoogleAccessId=service-7971470195 [email protected]&Expires=1607421201&Signature=o%2F0GwhiKJksDIKwtE6zzS7vDMg92066PnPOeU8zwbZkxPPkIQXvgoq38ZrpnBDyvWD9U%2FQvJ9AthWJCVLJN2%2Fgta3VL6KB%2B2ZGWQh0 SLbuAiivJKRL%2BUMOsPWb7b5TWTSEKDcDWB529GTHqAagyZ%2F7LJbI17U04Rk6nmPqvvHGd2QZEBdgqejyh9S%2Bc5l5BajZxA2FPI1WKi53VUcB47XojAAG6BV15Lb5LemnnpteHqcnAh1SERIQy7Fc6XteCVrrwjJO%2FTKOBDXmW2MS00 LS7pyfxkXmEtGbNvJSFuhpm8D%2F8gc0UwO6Wl1CbJOIMIaAXDZTT7dVESrv%2By9nJQ%2Bg%3D%3D status: ACTIVE timeout: 60s updateTime: '2020-12-08T09:25:01.221Z' versionId: '1'
gcp_da_user@cloudshell:~/test_schedule (cm-da-mikami-yuki-258308)$ gcloud pubsub subscriptions create sub_put_csv_sample --topic put_csv_sample Created subscription [projects/cm-da-mikami-yuki-258308/subscriptions/sub_put_csv_sample].
gcp_da_user@cloudshell:~/test_schedule (cm-da-mikami-yuki-258308)$ gcloud scheduler jobs create pubsub put_csv_sample --schedule "00 */1 * * *" --time-zone "Asia/Tokyo" --topic put_c sv_sample --message-body "test" name: projects/cm-da-mikami-yuki-258308/locations/asia-northeast1/jobs/put_csv_sample pubsubTarget: data: dGVzdA== topicName: projects/cm-da-mikami-yuki-258308/topics/put_csv_sample retryConfig: maxBackoffDuration: 3600s maxDoublings: 16 maxRetryDuration: 0s minBackoffDuration: 5s schedule: 00 */1 * * * state: ENABLED timeZone: Asia/Tokyo userUpdateTime: '2020-12-08T09:27:01Z'
管理コンソールからも、Cloud Functions、Cloud Pub/Sub、Cloud Scheduler それぞれ作成されたことが確認できました。
Cloud Scheduler を手動実行して、動作確認してみます。
結果に「成功」と表示されたのを確認して、Pub/Sub、Cloud Functions も実行されたかどうか見てみます。
無事実行されたようです。
GCS にも CSV ファイルが出力されたことが確認できました。
データの中身はこんな感じです。期待通りのサンプルデータファイルが無事出力されました。
name,price,count,put_datetime ブッシュドノエル,2000,5,2020-12-08 19:17:08 ローストビーフ,2000,2,2020-12-08 19:17:08 ローストビーフ,2000,4,2020-12-08 19:17:08 シャンメリー,500,2,2020-12-08 19:17:08 スパークリングワイン,2000,2,2020-12-08 19:17:08 ブッシュドノエル,2000,2,2020-12-08 19:17:08 寿司,5000,3,2020-12-08 19:17:08 シャンメリー,500,2,2020-12-08 19:17:08 クリスマスケーキ,3000,1,2020-12-08 19:17:08 ブッシュドノエル,2000,4,2020-12-08 19:17:08 オードブル,3000,5,2020-12-08 19:17:08 スパークリングワイン,2000,4,2020-12-08 19:17:08 ブッシュドノエル,2000,3,2020-12-08 19:17:08 ローストチキン,2000,3,2020-12-08 19:17:08 ローストビーフ,2000,2,2020-12-08 19:17:08 シャンパン,2000,4,2020-12-08 19:17:08 スパークリングワイン,2000,4,2020-12-08 19:17:08 シャンメリー,500,5,2020-12-08 19:17:08 スパークリングワイン,2000,1,2020-12-08 19:17:08 寿司,5000,5,2020-12-08 19:17:08
これで、クリスマスメニューの売上げデータを毎時 GCS に CSV ファイルで連携する別システムの処理の準備ができました。
BigQuery で SQL をスケジュール登録
さて、ここからが本題です。 GCS に連携された CSV ファイルのデータを集計して結果を BigQuery のテーブルに格納する処理を SQL で記載し、BigQuery 管理コンソールからスケジュール実行登録します。
登録するのは以下の 3 つの SQL です。
初めの SQL では、GCS ファイルデータを外部データソースとする、外部テーブルを作成します。
IF NOT EXISTS
でテーブルが存在しない場合のみ作成するので、実際にテーブルが作成されるのは初回の 1 回のみです。
CREATE EXTERNAL TABLE IF NOT EXISTS dataset_advent_calendar.foods_christmas ( name STRING, price INT64, count INT64, put_datetime DATETIME ) OPTIONS ( uris=["gs://test-mikami-schedule/advent_calendar/foods_christmas_*.csv"], skip_leading_rows=1, format=CSV );
BigQuery では今年 2020 年 10 月から、EXTERNAL TABLE も SQL で CREATE / DROP できるようになったので、GCS データを BigQuery から参照することが SQL の世界だけで完結できます。
- ユーザーを笑顔にする BigQuery の使いやすい SQL 新機能 | Google Cloud Blog
- CREATE EXTERNAL TABLE ステートメント | BigQuery ドキュメント
GCS ファイルを外部データソースとする外部テーブルを作成してしまえば、GCS のデータも通常の BigQuery テーブル同様 SQL でアクセスできるので、GCS ファイルデータを SQL で集計できるようになります。
次の SQL では、集計結果を格納するテーブルを作成します。 こちらも初回のテーブルが存在しない場合に限って、集計結果格納用のテーブルを作成するための SQL です。
CREATE TABLE IF NOT EXISTS dataset_advent_calendar.mart_sales ( name STRING, sales INT64, sales_cnt INT64, create_timestamp DATETIME ) PARTITION BY DATETIME_TRUNC ( create_timestamp, DAY );
最後の SQL で、SQL の実行と同じ時間帯に Put された CSV データを対象に、商品名ごとに合計の金額と個数を集計し、集計日時を付与して BigQuery の mart_sales
テーブルに格納します。
INSERT INTO dataset_advent_calendar.mart_sales SELECT name, SUM(count*price) as sales, SUM(count) AS sales_cnt, CURRENT_DATETIME('Asia/Tokyo') AS create_timestamp FROM dataset_advent_calendar.foods_christmas WHERE FORMAT_DATETIME('%Y%m%d%H', put_datetime) = FORMAT_DATETIME('%Y%m%d%H', CURRENT_DATETIME('Asia/Tokyo')) GROUP BY name ;
この 3 つの SQL を BigQuery 管理コンソールのクエリエディタに続けて入力し、「クエリのスケジュール」プルダウンから「スケジュールされたクエリを新規作成」をクリックします。
「スケジュールされたクエリの名前」に任意の名前を入力し、「繰り返しの頻度」のプルダウンで「毎時」を選択。「開始時と実行時間」で現在時の 5 分を JST で指定し、「処理を行うロケーション」に「東京(asia-northeast1)」を選択しました。
これで、毎時 5 分に、登録した SQL をスケジュール実行する設定ができました。
BigQuery 管理コンソールの「スケジュールされたクエリ」をクリックすると、登録済みのスケジューリングクエリが確認できます。
設定した時間が過ぎたので、「スケジュールされたクエリ」一覧画面から SQL の実行状況を確認します。
「実行の詳細」から、登録した SQL が正常に実行されたことが確認できました。
BigQuery のテーブルも確認してみます。
1 つ目の SQL で GCS ファイルをデータソースとする外部テーブルが作成され、
2 つ目の SQL で集計結果格納用の BigQuery テーブルが作成されました。
集計結果データも期待通り格納されています。
この時間に GCS に出力されていた CSV ファイルは以下でした。
name,price,count,put_datetime ピザ,1000,1,2020-12-09 00:00:13 スパークリングワイン,2000,1,2020-12-09 00:00:13 ピザ,1000,5,2020-12-09 00:00:13 スパークリングワイン,2000,3,2020-12-09 00:00:13 シャンパン,2000,2,2020-12-09 00:00:13 ローストビーフ,2000,4,2020-12-09 00:00:13
集計処理内容も問題なさそうです。
さらに、そのまま半日ほど放置してから、集計結果テーブルの中身を確認しました。
集計結果の件数を確認してみます。
集計結果レコードが増えてます。
SQL がスケジュール実行された各時間に集計結果が追加で格納され、それぞれの時間ごとの売上の合計も確認できました。
また、スケジューリングクエリの詳細画面からも、期待通り、毎時 SQL クエリがスケジュール実行されていたことが確認できました。
スケジューリングクエリの結果を通知
SQL のスケジュール実行では、実行エラーが発生した場合にメールや Cloud Pub/Sub に通知することができます。
実行エラーになる SQL をスケジュール登録して、メールが通知されるか確認してみます。 メール通知の設定は、スケジューリングクエリの作成(編集)画面で「メール通知を送信する」チェックボックスをチェックするだけです。 またその下の「Cloud Pub/Sub トピック」にトピック ID を入力すれば Pub/Sub に通知することも可能なので、Pub/Sub トリガーで起動する Cloud Functions と連携すれば 、Slack などの他の通知先に通知したり、任意のリカバリ処理を実装することもできそうです。
メール通知を設定しておくと、SQL クエリのオーナー(管理コンソールにログイン中のアカウント)のメールアドレス宛に、通知メールが送信されます。 通知先に別のメールアドレスを指定することはできませんが、Gmail の場合は転送設定が可能なので、運用時にはメーリングリスト宛てに転送設定しておくと良さそうです。
スケジュール実行がエラーになったことを確認して、通知メールが来ているかどうか確認します。
こんなメールが届いてました。
登録した SQL は BigQuery Data Transfer Service を利用してスケジュール実行されているため、通知メールの送信元も BigQuery Data Transfer Service
になってます。
Display name
に、スケジューリングクエリの表示名、Source
として scheduled_query
の記載があるので、スケジューリングクエリのエラー通知だということが分かります。
Run summary
の内容からエラーの詳細が確認できますし、View run history
リンクをクリックすると管理コンソール上でエラーが発生したスケジュールの詳細を確認することもできます。
まとめ(所感)
あまりフォーカスされていない(ように感じる)BigQuery の「スケジュールされたクエリ」の機能ですが、定期的な SQL 実行を画面 UI 操作だけで簡単に登録できて、エラー通知機能もついてます。 今回は GCS データを集計して BigQuery テーブルに格納するケースを想定してみましたが、他にも、例えば定期的に BigQuery テーブルデータを GCS にエクスポートしたり、INFORMATION_SCHEMA を参照して BigQuery のスロット使用率を確認したりする処理を、BigQuery 管理コンソールのクエリエディタと UI 操作だけで簡単に実現することができます。
今まで BigQuery 管理コンソールを使いながら「『スケジュールされたクエリ』って何?」って思ってましたが、これってうまく活用すればバッチジョブの開発工数が削減できるってことで、要件によってはすごく便利な機能なんじゃない?!と思いました!v
明日 14 日目の BigQuery Advent Calendar 2020 はkobayashi.mからお送り予定です。 また、今後のアドベントカレンダーの予定はこちらにも掲載しておりますので、引き続きお楽しみいただけますと幸いです。
参考
- クエリのスケジューリング | BigQuery ドキュメント
- データ定義言語ステートメントの使用 | BigQuery ドキュメント
- データ操作言語の構文 | BigQuery ドキュメント
- BigQuery Data Transfer Service の概要 | BigQuery Data Transfer Service ドキュメント
- BigQuery Data Transfer Service の実行通知 | BigQuery Data Transfer Service ドキュメント
- Cloud Shell の使用 | Cloud Shell ドキュメント
- gcloud | コマンドライン インターフェース リファレンス
- Google Cloud Pub/Sub トリガー | Cloud Functions ドキュメント
- Cloud Pub/Sub のチュートリアル | Cloud Functions ドキュメント
- Cloud Scheduler クイックスタート | Cloud Scheduler ドキュメント