AWS Step FunctionsからAmazon Athena Parameterized Queriesを実行する構成を作ってみた(AWS CDK)

AWS Step FunctionsからAmazon Athena Parameterized Queriesを実行する構成を作ってみた(AWS CDK)

Clock Icon2022.07.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

前回のエントリでは、Amazon Athena Parameterized QueriesをCDKで作ってみました。

今回はその発展編として、Amazon Athena Parameterized QueriesをAWS Step Functionsから実行する構成をAWS CDKで作ってみました。

やってみた

実装

AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。

import { Construct } from 'constructs';
import {
  aws_s3,
  aws_athena,
  aws_stepfunctions_tasks,
  aws_stepfunctions,
  RemovalPolicy,
  Stack,
  StackProps,
  aws_iam,
} from 'aws-cdk-lib';
import * as glue from '@aws-cdk/aws-glue-alpha';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // ソースデータ格納バケット
    const sourceDataBucket = new aws_s3.Bucket(this, 'sourceDataBucket', {
      bucketName: `data-${this.account}`,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // Athenaクエリ結果格納バケット
    const athenaQueryResultBucket = new aws_s3.Bucket(
      this,
      'athenaQueryResultBucket',
      {
        bucketName: `athena-query-result-${this.account}`,
        removalPolicy: RemovalPolicy.DESTROY,
      },
    );

    // データカタログ
    const dataCatalog = new glue.Database(this, 'dataCatalog', {
      databaseName: 'data_catalog',
    });

    // データカタログテーブル
    const sourceDataGlueTable = new glue.Table(this, 'sourceDataGlueTable', {
      tableName: 'source_data_glue_table',
      database: dataCatalog,
      bucket: sourceDataBucket,
      s3Prefix: 'data/',
      partitionKeys: [
        {
          name: 'date',
          type: glue.Schema.STRING,
        },
      ],
      dataFormat: glue.DataFormat.JSON,
      columns: [
        {
          name: 'userId',
          type: glue.Schema.STRING,
        },
        {
          name: 'count',
          type: glue.Schema.INTEGER,
        },
      ],
    });

    // データカタログテーブルへのPartition Projectionの設定
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    (sourceDataGlueTable.node.defaultChild as any).tableInput.parameters = {
      'projection.enabled': true,
      'projection.date.type': 'date',
      'projection.date.range': '2022/06/20,NOW',
      'projection.date.format': 'yyyy/MM/dd',
      'projection.date.interval': 1,
      'projection.date.interval.unit': 'DAYS',
      'storage.location.template':
        `s3://${sourceDataBucket.bucketName}/data/` + '${date}',
    };

    // Athenaワークグループ
    const athenaWorkGroup = new aws_athena.CfnWorkGroup(
      this,
      'athenaWorkGroup',
      {
        name: 'athenaWorkGroup',
        workGroupConfiguration: {
          resultConfiguration: {
            outputLocation: `s3://${athenaQueryResultBucket.bucketName}/result-data`,
          },
        },
      },
    );

    // Prepared Queries
    const athenaPreparedStatement = new aws_athena.CfnPreparedStatement(
      this,
      'athenaPreparedStatement',
      {
        statementName: 'athenaPreparedStatement',
        workGroup: athenaWorkGroup.name,
        queryStatement: `
        SELECT *
        FROM ${dataCatalog.databaseName}.${sourceDataGlueTable.tableName}
        WHERE date = ?
        LIMIT 10`,
      },
    );

    // Athenaクエリ実行開始
    const startAthenaQueryExecutionTask =
      new aws_stepfunctions_tasks.AthenaStartQueryExecution(
        this,
        'startAthenaQueryExecutionTask',
        {
          queryString: aws_stepfunctions.JsonPath.format(
            `EXECUTE ${athenaPreparedStatement.statementName} USING '{}'`,
            aws_stepfunctions.JsonPath.stringAt('$.query_dt'),
          ),
          workGroup: athenaWorkGroup.name,
          resultPath: '$.startAthenaQueryExecutionTaskOutPut',
        },
      );

    // ステートマシン
    const stateMachine = new aws_stepfunctions.StateMachine(
      this,
      'stateMachine',
      {
        stateMachineName: 'stateMachine',
        definition: startAthenaQueryExecutionTask,
      },
    );

    // Glue Data Catalogリソースへのアクセス権をState Machineに追加
    stateMachine.role.addToPrincipalPolicy(
      new aws_iam.PolicyStatement({
        actions: ['glue:Get*', 'glue:List*'],
        effect: aws_iam.Effect.ALLOW,
        resources: [dataCatalog.databaseArn, sourceDataGlueTable.tableArn],
      }),
    );

    // PreparedStatementを実行するためのアクセス権をState Machineに追加
    stateMachine.addToRolePolicy(
      new aws_iam.PolicyStatement({
        actions: ['athena:GetPreparedStatement'],
        effect: aws_iam.Effect.ALLOW,
        resources: [
          `arn:aws:athena:${this.region}:${this.account}:workgroup/${athenaWorkGroup.name}`,
        ],
      }),
    );
  }
}
  • AthenaStartQueryExecution Taskで、Query String EXECUTE ${athenaPreparedStatement.statementName} USING '{}'によりParameterized Queriesを実行しています。
    • この時、文字列型のParameter値の指定は'{}'のようにシングルクォートで囲む必要があります。
  • State MachineはWorkgroupに対してathena:GetPreparedStatement Actionを行うので、Policyを付与します。

上記をCDK Deployしてスタックをデプロイします。すると次のようなDefinitionのState Machineが作成されます。

{
  "StartAt": "startAthenaQueryExecutionTask",
  "States": {
    "startAthenaQueryExecutionTask": {
      "End": true,
      "Type": "Task",
      "ResultPath": "$.startAthenaQueryExecutionTaskOutPut",
      "Resource": "arn:aws:states:::athena:startQueryExecution",
      "Parameters": {
        "QueryString.$": "States.Format('EXECUTE athenaPreparedStatement USING {}', $.query_dt)",
        "ResultConfiguration": {},
        "WorkGroup": "athenaWorkGroup"
      }
    }
  }
}

動作確認

次の入力を指定してステートマシンを実行します。

{
  "query_dt": "2022/06/28"
}

実行が成功しました。

AthenaのQuery editorで履歴を見ると、実行が正常に完了しています。

同QueryをEditorで開くと、データ取得もちゃんと出来ていますね!

参考

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.