CloudWatch Logs のログイベントのみ抽出するよう CloudFormation で定義してみる

CloudWatch Logs のログイベントのみ抽出するよう CloudFormation で定義してみる

Data Firehose のログイベント抽出機能を利用した CloudFormation テンプレートを作ってみました
Clock Icon2024.08.06

こんにちは、AWS 事業本部コンサルティング部のたかくに(@takakuni_)です。

今回はタイトルの通り、 CloudWatch Logs のログイベントのみ抽出するよう CloudFormation で定義してみようと思います。

前置き

Data Firehose では、デフォルトで CloudWatch Logs から送信されるログイベントに付加情報を加え、 JSON データを 1 行で配信する仕様となっています。

具体的には以下のような形式で配信され、 CloudWatch Logs の中身は logEvents に記録かつ 1 行で S3 バケットへ保管される状況になります。(長いですね...)

配信サンプル
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"/log/tkkn-123456789012-group","logStream":"tkkn-123456789012-stream","subscriptionFilters":["tkkn-123456789012-filter"],"logEvents":[{"id":"38422339286007929498451156720408396114833289959594065920","timestamp":1722917281192,"message":"{   \"timestamp\": \"2023-05-15T14:37:22.456Z\",   \"level\": \"ERROR\",   \"source\": \"payment_service\",   \"event_type\": \"exception\",   \"message\": \"Failed to process payment\",   \"error_code\": \"PAY_ERR_001\",   \"stack_trace\": \"java.lang.NullPointerException: at com.example.PaymentProcessor.processPayment(PaymentProcessor.java:125)\",   \"user_id\": \"user_789xyz\",   \"transaction_id\": \"tx_987654321\",   \"amount\": 99.99,   \"currency\": \"USD\" }"}]}{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"/log/tkkn-123456789012-group","logStream":"tkkn-123456789012-stream","subscriptionFilters":["tkkn-123456789012-filter"],"logEvents":[{"id":"38422339449182482116099726255845264023101920974913339392","timestamp":1722917288509,"message":"{   \"timestamp\": \"2023-05-15T19:12:03.789Z\",   \"level\": \"WARN\",   \"source\": \"auth_service\",   \"event_type\": \"security_audit\",   \"action\": \"login_attempt\",   \"status\": \"failed\",   \"reason\": \"invalid_credentials\",   \"username\": \"[email protected]\",   \"ip_address\": \"203.0.113.42\",   \"geo_location\": {     \"country\": \"United States\",     \"city\": \"New York\"   },   \"device_type\": \"mobile\",   \"attempt_count\": 3 }"}]}

S3 バケットへ配信されたログを Athena で分析したいケースにおいて、 logEvents(CloudWatch Logs の中身)だけ取り出し、JSON Lines の形式で配信するケースをよく見かけます。

logEvents だけ抜き取る

logEvents を抜き取る方法はいくつかあり、 Data Firehose の処理の中に変換用の Lambda を組み込むパターンと、ネイティブで logEvents の中身だけ取り出し配信する機能の 2 つがあります。

https://dev.classmethod.jp/articles/cloudwatch-logs-kinesis-origin-fireshose-s3-format/

とくに後者のネイティブ機能は、とても素晴らしいアップデートですね。

https://dev.classmethod.jp/articles/data-firehose-message-extraction-cloudwatch-logs/

また、ネイティブの機能の場合、改行されて格納されるようです。

というわけで、今回はネイティブの機能を利用して、 CloudWatch Logs, Data Firehose, S3 までの一連の流れを Lambda 無しで構築していこうと思います。

構成図

今回作成する構成は以下のとおりです。

至ってシンプルで面白みがないですが、一連の Cfn が周りになかったためブログにしてみます。

Untitled(87) (2)

全文
log.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudWatch Logs to S3 via Data Firehose template'
Parameters:
  Environment:
    Type: String
  LogExpirationInDays:
    Type: Number
    Default: 90
Resources:
  ##############################################################
  # S3 Bucket
  ##############################################################
  LogBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub '${Environment}-log-bucket-${AWS::AccountId}'
      OwnershipControls:
        Rules:
          - ObjectOwnership: BucketOwnerEnforced
      LifecycleConfiguration:
        Rules:
          - Id: !Sub 'ExpirationIn-${LogExpirationInDays}Days'
            ExpirationInDays: !Ref 'LogExpirationInDays'
            Status: Enabled
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      Tags:
        - Key: Name
          Value: !Sub '${Environment}-log-bucket-${AWS::AccountId}'

  ##############################################################
  # Data Firehose
  ##############################################################
  DataFirehoseLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/kinesisfirehose/${Environment}-data-firehose-${AWS::AccountId}'
      RetentionInDays: !Ref LogExpirationInDays
      Tags:
        - Key: Name
          Value: !Sub '/aws/kinesisfirehose/${Environment}-data-firehose-${AWS::AccountId}'

  DataFirehoseLogStream:
    Type: AWS::Logs::LogStream
    Properties:
      LogGroupName: !Ref DataFirehoseLogGroup
      LogStreamName: !Sub '${Environment}-data-firehose-${AWS::AccountId}'

  DataFirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub '${Environment}-data-firehose-role-${AWS::AccountId}'
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
            Condition:
              StringEquals:
                sts:ExternalId: !Ref 'AWS::AccountId'

  DataFirehoseRolePoliy:
    Type: AWS::IAM::RolePolicy
    Properties:
      RoleName: !Ref DataFirehoseRole
      PolicyName: !Sub '${Environment}-data-firehose-policy-${AWS::AccountId}'
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          # S3 バケットへの送信
          - Effect: 'Allow'
            Action:
              - 's3:AbortMultipartUpload'
              - 's3:GetBucketLocation'
              - 's3:GetObject'
              - 's3:ListBucket'
              - 's3:ListBucketMultipartUploads'
              - 's3:PutObject'
            Resource:
              - !Sub 'arn:aws:s3:::${LogBucket}'
              - !Sub 'arn:aws:s3:::${LogBucket}/*'
          # Firehose のログ記録
          - Effect: 'Allow'
            Action:
              - 'logs:PutLogEvents'
            Resource:
              - !GetAtt DataFirehoseLogGroup.Arn

  FirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Sub '${Environment}-data-firehose-${AWS::AccountId}'
      DeliveryStreamType: 'DirectPut'
      DeliveryStreamEncryptionConfigurationInput:
        KeyType: 'AWS_OWNED_CMK'
      ExtendedS3DestinationConfiguration:
        RoleARN: !GetAtt DataFirehoseRole.Arn
        BucketARN: !Sub 'arn:aws:s3:::${LogBucket}'
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 5
        CloudWatchLoggingOptions: # Amazon CloudWatch エラーのログ記録
          Enabled: 'true'
          LogGroupName: !Ref DataFirehoseLogGroup
          LogStreamName: !Sub '${Environment}-data-firehose-${AWS::AccountId}'
        CompressionFormat: GZIP # データレコードの圧縮 → S3 へ送信する際の圧縮方式
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: Decompression # Amazon CloudWatch Logs からソースレコードを解凍する
              Parameters:
                - ParameterName: CompressionFormat
                  ParameterValue: GZIP
            - Type: CloudWatchLogProcessing # ログイベントからのみメッセージフィールドを抽出
              Parameters:
                - ParameterName: DataMessageExtraction
                  ParameterValue: true
      Tags:
        - Key: Name
          Value: !Sub '${Environment}-data-firehose-${AWS::AccountId}'

  ##############################################################
  # CloudWatch Logs
  ##############################################################
  CwlRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub '${Environment}-cwl-role-${AWS::AccountId}'
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: !Sub 'logs.${AWS::Region}.amazonaws.com'
            Action: sts:AssumeRole

  CwlRolePolicy:
    Type: AWS::IAM::RolePolicy
    Properties:
      RoleName: !Ref CwlRole
      PolicyName: !Sub '${Environment}-cwl-policy-${AWS::AccountId}'
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: 'Allow'
            Action:
              - 'firehose:PutRecord'
            Resource:
              - !GetAtt FirehoseDeliveryStream.Arn

  LogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/log/${Environment}-${AWS::AccountId}-group'
      RetentionInDays: !Ref LogExpirationInDays
      Tags:
        - Key: Name
          Value: !Sub '/log/${Environment}-${AWS::AccountId}-group'

  LogStream:
    Type: AWS::Logs::LogStream
    Properties:
      LogGroupName: !Ref LogGroup
      LogStreamName: !Sub '${Environment}-${AWS::AccountId}-stream'

  SubscriptionFilter:
    Type: AWS::Logs::SubscriptionFilter
    Properties:
      FilterName: !Sub '${Environment}-${AWS::AccountId}-filter'
      DestinationArn: !GetAtt FirehoseDeliveryStream.Arn
      FilterPattern: ''
      LogGroupName: !Ref LogGroup
      RoleArn: !GetAtt CwlRole.Arn
    DependsOn:
      - CwlRolePolicy

Data Firehose

肝となる部分は Data Firehose の部分であるため、ここだけ切り取って解説します。

logEvents のみ抜き取って、ターゲットへ配信するには Amazon CloudWatch Logs からソースレコードを解凍する必要がある点に注意です。

CloudWatch Logs のサブスクリプションフィルターはログを GZIP 圧縮して Data Firehose に転送するため、解凍オプションの CompressionFormatGZIP を指定しています。

CloudWatch Logs から Amazon Data Firehose に送信されるデータは、すでに gzip レベル 6 圧縮で圧縮されているため、Firehose 配信ストリーム内で圧縮を使用する必要はありません。その後、Firehose の解凍機能を使用して、ログを自動的に解凍できます。詳細については、 CloudWatch 「ログを使用した Kinesis Data Firehose への書き込み」を参照してください。

ロググループレベルのサブスクリプションフィルター - Amazon CloudWatch Logs

yaml
Resources:
  FirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Sub '${Environment}-data-firehose-${AWS::AccountId}'
      DeliveryStreamType: 'DirectPut'
      DeliveryStreamEncryptionConfigurationInput:
        KeyType: 'AWS_OWNED_CMK'
      ExtendedS3DestinationConfiguration:
        RoleARN: !GetAtt DataFirehoseRole.Arn
        BucketARN: !Sub 'arn:aws:s3:::${LogBucket}'
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 5
        CloudWatchLoggingOptions:
          Enabled: 'true'
          LogGroupName: !Ref DataFirehoseLogGroup
          LogStreamName: !Sub '${Environment}-data-firehose-${AWS::AccountId}'
        CompressionFormat: GZIP
        ProcessingConfiguration:
          Enabled: true
          Processors:
+            - Type: Decompression # Amazon CloudWatch Logs からソースレコードを解凍する
+              Parameters:
+                - ParameterName: CompressionFormat
+                  ParameterValue: GZIP
+            - Type: CloudWatchLogProcessing # ログイベントからのみメッセージフィールドを抽出
+              Parameters:
+                - ParameterName: DataMessageExtraction
+                  ParameterValue: true
      Tags:
        - Key: Name
          Value: !Sub '${Environment}-data-firehose-${AWS::AccountId}'

その他、設定値に関しては、お好みで変更してお使いください。

動作確認

動作確認のため、次のログを CloudWatch Logs に記録してみます。

LOG_GROUP_NAME=<ロググループ名>
LOG_STREAM_NAME=<ログストリーム名>

cat << EOF > get-html.log
[
  {
    "timestamp": $(date +%s%3N),
    "message": "{\"timestamp\": \"2023-05-15T08:30:45.123Z\", \"ip_address\": \"192.168.1.100\", \"method\": \"GET\", \"url\": \"/index.html\", \"status_code\": 200, \"user_agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36\", \"response_time\": 0.052}"
  }
]
EOF

aws logs put-log-events --log-group-name $LOG_GROUP_NAME --log-stream-name $LOG_STREAM_NAME --log-events file://get-html.log

cat << EOF > post-api.log
[
  {
    "timestamp": $(date +%s%3N),
    "message": "{\"timestamp\": \"2023-05-15T08:31:02.456Z\", \"ip_address\": \"192.168.1.101\", \"method\": \"POST\", \"url\": \"/api/login\", \"status_code\": 401, \"user_agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36\", \"response_time\": 0.134}"
  }
]
EOF

cat << EOF > get-logo.log
[
  {
    "timestamp": $(date +%s%3N),
    "message": "{\"timestamp\": \"2023-05-15T08:31:15.789Z\", \"ip_address\": \"192.168.1.100\", \"method\": \"GET\", \"url\": \"/images/logo.png\", \"status_code\": 200, \"user_agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36\", \"response_time\": 0.008}"
  }
]
EOF

cat get-html.log
cat post-api.log
cat get-logo.log

aws logs put-log-events --log-group-name $LOG_GROUP_NAME --log-stream-name $LOG_STREAM_NAME --log-events file://get-html.log
aws logs put-log-events --log-group-name $LOG_GROUP_NAME --log-stream-name $LOG_STREAM_NAME --log-events file://post-api.log
aws logs put-log-events --log-group-name $LOG_GROUP_NAME --log-stream-name $LOG_STREAM_NAME --log-events file://get-logo.log

とてもシンプルなクエリですが、 Athena で解析してみました。

CREATE EXTERNAL TABLE IF NOT EXISTS web_logs (
  `timestamp` string,
  `ip_address` string,
  `method` string,
  `url` string,
  `status_code` int,
  `user_agent` string,
  `response_time` float
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://<バケット名>/2024/08/06/08/' /* オブジェクトが保管されている部分を指定 */
TBLPROPERTIES ('has_encrypted_data'='false');
SELECT * FROM web_logs LIMIT 10;

無事、 logEvents だけ抜き取られてクエリができてそうです。

2024-08-06 at 17.52.12-クエリエディタ  Athena  ap-northeast-1

まとめ

以上、「CloudWatch Logs のログイベントのみ抽出するよう Cloudformation で定義してみる」でした。

非常にシンプルな内容ですが、 CloudWatch Logs から S3 へ Firehose を経由したいケースは多いのではないでしょうか。

このブログが参考になれば幸いです。 AWS 事業本部コンサルティング部のたかくに(@takakuni_)でした!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.