SNS+SQS+DynamoDBでハードバウンスと苦情の発生を記録する環境をCDK(TypeScript)で作成してみる
はじめに
awsでSESを使う場合には、ハードバウンスに対応する必要があります。 対応のためには発生の検知と、検知したメールアドレスに対して繰り返し送信を行わないようにすることの両方が必要となりますが、今回は発生を検知して記録する仕組みまでを構築したいと思います。
awsのブログ記事1を参考に、少し変更を加えた形で構築します。
前提
- SESで利用するドメイン設定が行われていること
- 本記事では
katoaki.com
が検証済みであるとします
- 本記事では
- TypeScriptを利用します
- CDK v1.38.0を利用します
- 各種モジュールもすべてv1.38.0に合わせます
- SESはオレゴンリージョンを使用します
やってみたこと
全体設計
- 各サービスを以下の順番で連携します
- SES → SNS → SQS → Lambda → DynamoDB
- 各サービスの役割は次の通り
- SES
- バウンス・苦情が発生した場合に、バウンス・苦情用SNSトピックをトリガします
- SNS
- SESからメッセージが発行された場合に、SQSのバウンス・苦情キューにメッセージを送信します
- SQS
- SNSからメッセージが発行された場合に、バウンス・苦情のDB保存用Lambdaをトリガします
- Lambda
- SQSからトリガされた場合に、SESの通知用テーブルへバウンス・苦情の情報を登録します
- DynamoDB
- 送信先メールアドレスと発生時刻を複合キーとして、Lambdaから登録されたバウンス・苦情の情報を保持します
- SES
CDK init & 必要モジュールのインストール
- bash command
cdk init --language typescript
- bash command
npm i "@aws-cdk/[email protected]" npm i "@aws-cdk/[email protected]" npm i "@aws-cdk/[email protected]" npm i "@aws-cdk/[email protected]" npm i "@aws-cdk/[email protected]"
cdk-stack.tsのスケルトンに必要なモジュールのimport文を追加し、リソース作成用の関数 create()
を追加して以下のようにします。
- cdk-stack.ts
import * as cdk from '@aws-cdk/core'; import sqs = require("@aws-cdk/aws-sqs"); import sns = require("@aws-cdk/aws-sns"); import iam = require("@aws-cdk/aws-iam"); import ddb = require("@aws-cdk/aws-dynamodb"); import lambda = require("@aws-cdk/aws-lambda"); export class App1Stack extends cdk.Stack { constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); // The code that defines your stack goes here this.create(); } private create(){ } }
SQSのキューを作成
キューのリソースを定義します。 その上で、SNSトピックからメッセージが送信できるようにするには、キューポリシーの設定が必須なので定義します。
create()
関数内に以下を追加してください。
- cdk-stack.ts
// SQS const BounceQueue = new sqs.Queue(this, "BounceQueue", { queueName: "BounceQueue" }); const BounceQueuePolicy = new iam.PolicyStatement(); BounceQueuePolicy.addActions("SQS:SendMessage"); BounceQueuePolicy.addResources(BounceQueue.queueArn); BounceQueuePolicy.addPrincipals(new iam.AnyPrincipal()); BounceQueue.addToResourcePolicy(BounceQueuePolicy); const ComplaintQueue = new sqs.Queue(this, "ComplaintQueue", { queueName: "ComplaintQueue" }); const ComplaintQueuePolicy = new iam.PolicyStatement(); ComplaintQueuePolicy.addActions("SQS:SendMessage"); ComplaintQueuePolicy.addResources(ComplaintQueue.queueArn); ComplaintQueuePolicy.addPrincipals(new iam.AnyPrincipal()); ComplaintQueue.addToResourcePolicy(ComplaintQueuePolicy);
SNSのトピックを作成
SNSトピックを定義します。 その上で、トピックにメッセージが発行された際にSQSをトリガする(メッセージを送信する)ようにするために、サブスクリプションにキューを設定します。
create()
関数内に以下を追加してください。
- cdk-stack.ts
const BounceTopic = new sns.Topic(this, "BounceTopic", { topicName: "BounceTopic" }) new sns.Subscription(this, "BounceSubscription", { endpoint: BounceQueue.queueArn, protocol: sns.SubscriptionProtocol.SQS, topic: BounceTopic }) const ComplaintTopic = new sns.Topic(this, "ComplaintTopic", { topicName: "ComplaintTopic" }) new sns.Subscription(this, "ComplaintSubscription", { endpoint: ComplaintQueue.queueArn, protocol: sns.SubscriptionProtocol.SQS, topic: ComplaintTopic })
Lambda関数を作成
キューから受け取った内容を処理してテーブルへ書き込むため、Lambda関数を作成します。
ポイントは以下です
- SQSからトリガされるように、EventSourceMappingの設定を行う
- SQSからのメッセージを読めるようにポリシーの設定を行う
- DynamoDBへ書き込みを行えるようにポリシーの設定を行う
- 処理を別ファイルに分けてcodeで指定する
-
cdk-stack.ts
// Lambda const ReportLambdaPolicy = new iam.PolicyStatement(); ReportLambdaPolicy.addActions( "sqs:ReceiveMessage", "dynamodb:PutItem" ); ReportLambdaPolicy.addResources( `arn:aws:dynamodb:*:*:table/ses_notifications`, BounceQueue.queueArn, ComplaintQueue.queueArn ); const ReportLambda = new lambda.Function(this, "ReportLambda", { functionName: "ReportLambda", timeout: cdk.Duration.seconds(10), code: lambda.Code.asset("src/index.zip"), handler: "index.handler", runtime: lambda.Runtime.NODEJS_12_X, initialPolicy: [ReportLambdaPolicy], }) new lambda.EventSourceMapping(this, "BounceEventSourceMapping", { batchSize: 10, enabled: true, eventSourceArn: BounceQueue.queueArn, target: ReportLambda }) new lambda.EventSourceMapping(this, "ComplaintEventSourceMapping", { batchSize: 10, enabled: true, eventSourceArn: ComplaintQueue.queueArn, target: ReportLambda })
Lambda関数の実装部分です。 こちらは同じディレクトリにzip化しておく必要があります。
ポイントは以下です
- AWSのブログ記事1を参考に作成しました
- 当該記事はSNSから直接Lambdaをトリガする構成なので、今回の用途に合わせて修正が必要でした
- exports.handler〜の部分配下の5行を修正しています
修正部分:
exports.handler = function (event, context) { console.log("Received event:", JSON.stringify(event, null, 2)); var body = JSON.parse(event.Records[0].body); var SnsPublishTime = body.Timestamp; var SnsTopicArn = body.TopicArn; var SESMessage = body.Message;
- src/index.js
console.log("Loading event"); var aws = require("aws-sdk"); var ddb = new aws.DynamoDB({ params: { TableName: "ses_notifications" } }); exports.handler = function (event, context) { console.log("Received event:", JSON.stringify(event, null, 2)); var body = JSON.parse(event.Records[0].body); var SnsPublishTime = body.Timestamp; var SnsTopicArn = body.TopicArn; var SESMessage = body.Message; SESMessage = JSON.parse(SESMessage); var SESMessageType = SESMessage.notificationType; var SESMessageId = SESMessage.mail.messageId; var SESDestinationAddress = SESMessage.mail.destination.toString(); var LambdaReceiveTime = new Date().toString(); if (SESMessageType == "Bounce") { var SESreportingMTA = SESMessage.bounce.reportingMTA; var SESbounceSummary = JSON.stringify(SESMessage.bounce.bouncedRecipients); var itemParams = { Item: { SESMessageId: { S: SESMessageId }, SnsPublishTime: { S: SnsPublishTime }, SESreportingMTA: { S: SESreportingMTA }, SESDestinationAddress: { S: SESDestinationAddress }, SESbounceSummary: { S: SESbounceSummary }, SESMessageType: { S: SESMessageType }, }, }; ddb.putItem(itemParams, function (err, data) { if (err) { context.fail(err); } else { console.log(data); context.succeed(); } }); } else if (SESMessageType == "Delivery") { var SESsmtpResponse1 = SESMessage.delivery.smtpResponse; var SESreportingMTA1 = SESMessage.delivery.reportingMTA; var itemParamsdel = { Item: { SESMessageId: { S: SESMessageId }, SnsPublishTime: { S: SnsPublishTime }, SESsmtpResponse: { S: SESsmtpResponse1 }, SESreportingMTA: { S: SESreportingMTA1 }, SESDestinationAddress: { S: SESDestinationAddress }, SESMessageType: { S: SESMessageType }, }, }; ddb.putItem(itemParamsdel, function (err, data) { if (err) { context.fail(err); } else { console.log(data); context.succeed(); } }); } else if (SESMessageType == "Complaint") { var SESComplaintFeedbackType = SESMessage.complaint.complaintFeedbackType; var SESFeedbackId = SESMessage.complaint.feedbackId; var itemParamscomp = { Item: { SESMessageId: { S: SESMessageId }, SnsPublishTime: { S: SnsPublishTime }, SESComplaintFeedbackType: { S: SESComplaintFeedbackType }, SESFeedbackId: { S: SESFeedbackId }, SESDestinationAddress: { S: SESDestinationAddress }, SESMessageType: { S: SESMessageType }, }, }; ddb.putItem(itemParamscomp, function (err, data) { if (err) { context.fail(err); } else { console.log(data); context.succeed(); } }); } };
DynamoDBテーブルを作成
create()
関数内に以下を追加します。
//DynamoDB new ddb.Table(this, "SesNotificationsTable", { tableName: "ses_notifications", partitionKey: { name: "SESMessageId", type: ddb.AttributeType.STRING, }, sortKey: { name: "SnsPublishTime", type: ddb.AttributeType.STRING, }, });
SESの設定
- マネジメントコンソールからNotificationsに、BounceTopic・ComplaintTopicを指定します
- この設定は、執筆時点ではCDKで定義する方法が見つからなかったため手動で行っています
binの設定
cdk deploy
コマンドで作成するスタックの設定で、オレゴンリージョンに作成するように設定変更します。SESのリージョンに合わせるためです。
new App1Stack(app, 'App1Stack', { env: { region: "us-west-2", } } )
おわりに
少し手間ではありますが、一度作成すれば他のAWS環境にも流用できるので一度作っておくと便利だと思います。