【祝GA】 Lambda ExtensionsでLambdaのログをGCPのCloud Loggingに送信してみた

【祝GA】 Lambda ExtensionsでLambdaのログをGCPのCloud Loggingに送信してみた

Lambda ExtensionsがGAされたのでGCPのCloud Loggingにログを送信してみました
Clock Icon2021.06.01

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

CX事業本部@大阪の岩田です。先日Lambda ExtensionsがGAされました(東京リージョンはまだですが)。Lambda ExtensionsはLambdaのモニタリング、可観測性、セキュリティ、ガバナンスのための運用ツールをLambdaに統合する機能で、Lambda実行環境のライフサイクルと連動して外部のサードパーティ製品に直接ログやメトリクスを送信するといったことが可能です。

パブリックプレビュー自体は半年以上前から利用できる状態だったのですが、これまでちゃんと触れていなかったので自分の理解を深めるために簡単なExtensionを実装してみようと思い立ちました。何を作るか考えたのですが、今回はLambdaのログをGCPのCloud Loggingに送信するExtensionを作ることにしました。なおLambdaのランタイムにはPython3.8を利用しています。

Lambda ExtensionsのおさらいとLogs API

以下のブログでも紹介されていますが、Extensionの実行形態は大きく内部モードと外部モードの2種類に分かれます。

外部モードはLambda実行環境で独立したプロセスとして実行され、関数呼び出しが完全に処理された後も引き続き実行されるという特徴を持ち、Lambdaのログや各種メトリクスをサードパーティに連携するのに適したモデルです。Lambda実行環境のライフサイクルINITフェーズにExtensions API経由で自作のコードをExtensionとして登録することで、INVOKEフェーズ、SHUTDOWNフェーズに自作のロジックをLambda関数本体と並行して実行できるようになります。

※画像は公式ドキュメントより引用

Lambda Extensions API

Logs API

前述のようにExtensionのコードはLambda関数本体と並行して実行可能なのですが、Lambda Extensionのユースケースの1つであるサードパーティへのログの連携はどのように実現すれば良いのでしょうか?ここで利用できるのがLambdaランタイムAPIの「Logs API」です。Lambdaのサービス基盤はランタイムのログを自動的にキャプチャしてCloudWatch Logsにストリームしているのですが、「Logs API」を利用するとLambda Extensionsからログストリームに直接サブスクライブできます。

Logs APIの仕様は以下の通りです

  • パス: http://${AWS_LAMBDA_RUNTIME_API}/2020-08-15/logs/
  • メソッド: PUT
  • リクエストパラメータ:
    • destination...
      • protocol... Lambdaの実行基盤がExtensionに対してログを配信するためのプロトコルを指定します。 HTTPもしくはTCPが指定可能でHTTPの利用が推奨されています。
      • URI... Extensionがログの配信を待ち受けるエンドポイントのURIを指定します。Extensionの開発者はこのURIでLambdaランタイムからのリクエストを待ち受けるようにExtensionを実装する必要があります。
    • buffering...Lambdaの実行基盤はログをバッファリングしてサブスクライバ(Extension)に配信します。このパラメータはバッファリング関連の挙動を調整するために利用します。
      • maxItems... ログのバッチをバッファーする最大時間(ミリ秒単位)。デフォルト:1000 最小: 25 最大:30000
      • maxBytes... メモリにバッファするログの最大サイズ (バイト単位)。デフォルト:262,144 最小:262,144 最大:1,048,576
      • timeoutMs... メモリにバッファするイベントの最大数。デフォルト:10,000 最小:1,000 最大:10,000
    • types...Extensionがサブスクライブするログの種別を配列形式で指定します。指定可能なログ種別はplatform,function,extensionの3種です

リクエストのサンプルは以下のような形式になります

PUT http://${AWS_LAMBDA_RUNTIME_API}/2020-08-15/logs/ HTTP/1.1
{ "schemaVersion": "2020-08-15",
  "types": [
      "platform",
      "function"
    ],
  "buffering": {
      "maxItems": 1000,
      "maxBytes": 10240,
      "timeoutMs": 100
    }
  "destination": {
    "protocol": "HTTP",
    "URI": "http://sandbox.localdomain:8080/lambda_logs"
  }
}    

この例ではhttp://sandbox.localdomain:8080/lambda_logsというエンドポイントでプラットフォームのログと関数のログをサブスクライブしています。サブスクライブが完了すると、以後Lambda実行基盤のライフサイクルに合わせてExtensionのエンドポイントに対してJSON形式のログがPOSTされるようになります。例えばLambda Functionのコードからprint('some message')というコードでログを出力すると、以下のようなJSONデータがExtensionの待ち受けエンドポイントに自動的にPOSTされてきます。

{
  type: "function"
  record: "some message"
  time: "2021-05-31T11:59:09.154Z"
}

あとはExtension側でこのPOSTされてきたデータをサードパーティによしなに連携するだけです。

Extensionを作ってGCPにログを送ってみる

実際にExtensionを構築してLambdaからGCPにログを送ってみましょう

GCP側の事前作業

まずCloud Loggingにログを書き込むためのサービスアカウントを作成します

$ gcloud iam service-accounts create <サービスアカウント名>

作成したサービスアカウントにlogging.logWriterのロールをアタッチします。これでサービスアカウントからCloud Loggingにログを書き込めるようになります。

$ gcloud projects add-iam-policy-binding <プロジェクトID> --member=serviceAccount:<サービスアカウント名>@<プロジェクトID>.iam.gserviceaccount.com --role=roles/logging.logWriter

Lambdaからサービスアカウントを利用するためにサービスアカウントキーを発行します

$ gcloud iam service-accounts keys create <適当なファイル名> --iam-account <サービスアカウント名>@<プロジェクトID>.iam.gserviceaccount.com

指定したファイル名でサービスアカウントキーのJSONファイルが作成されるので中身を確認してみましょう

$cat <適当なファイル名>

{
  "type": "service_account",
  "project_id": "<プロジェクトID>",
  "private_key_id": "c9818...略",
  "private_key": "-----BEGIN PRIVATE KEY-----\n...略==\n-----END PRIVATE KEY-----\n",
  "client_email": "<サービスアカウント名>@<プロジェクトID>.iam.gserviceaccount.com",
  "client_id": "123456789012345678901",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/<サービスアカウント名>%40<プロジェクトID>.iam.gserviceaccount.com"
}

こんな感じのJSONが表示されればOKです

AWS側の事前作業

先程作成したGCPのサービスアカウントキーをSecret Managerに登録しておきましょう。

$aws secretsmanager create-secret --name <シークレット名> --secret-string file://<サービスアカウントキーのJSONファイル名>

後ほどExtensionの初期処理でSecret Managerからサービスアカウントキーを取得して利用します

Extensionの実装

ここから実際にExtensionを実装していきます。以下のGitHubリポジトリでLambda Extensionsの各種サンプルが公開されているのですが、この中の

s3-logs-extension-demo-zip-archiveというディレクトリにExtensionからS3バケットに直接ログを送信するサンプルが含まれています。今回はこのサンプルをベースに微修正して実装します。

https://github.com/aws-samples/aws-lambda-extensions

まずextensionssrc/requirements.txtにCloud Loggingのクライアントライブラリを追加します

boto3
google-cloud-logging

これで後ほどsam buildした時にCloud Loggingのクライアントライブラリがデプロイパッケージに組み込まれるようになりました。続いてextensions/logs_api_http_extension.pyのコードを修正します。まず先頭のライブラリ部分に必要なライブラリのimportを追加します。 

import json
from google.cloud import logging
from google.oauth2 import service_account

続いてログ送信の本体部分run_foreverの修正です。まずは初期化処理部分にSecret ManagerからGCPのサービスアカウントキーを取得してクライアントライブラリを初期化する処理を追加します。

    def run_forever(self):        
        client = boto3.client('secretsmanager')
        res = client.get_secret_value(SecretId=os.environ['GCP_SECRET_NAME'])
        json_acct_info = json.loads(res['SecretString'])

        credentials = service_account.Credentials.from_service_account_info(json_acct_info)
        scoped_credentials = credentials.with_scopes(
            ['https://www.googleapis.com/auth/cloud-platform'])

        logging_client = logging.Client(credentials=scoped_credentials)
        log_name = "awslambda"
        logger = logging_client.logger(log_name)

        print(f"extension.logs_api_http_extension: Receiving Logs {self.agent_name}")

続いて実際のログ送信部分です。LambdaのLogs APIから取得したログメッセージをCloud Loggingのクライアントのlog_structを使って送信します。

        while True:
            resp = self.extensions_api_client.next(self.agent_id)
            # Process the received batches if any.
            while not self.queue.empty():
                batch = self.queue.get_nowait()
                # ...略
                try:               
                    for item in range(len(batch)):
                        response = logger.log_struct(batch[item])
                except Exception as e:
                    raise Exception(f"Error sending log to S3 {e}") from e

extensions/logs_api_http_extension.pyの全体です。

#!/bin/sh
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

''''exec python -u -- "$0" ${1+"$@"} # '''
import os
import sys
from pathlib import Path
from datetime import datetime
import json
# Add lib folder to path to import boto3 library.
# Normally with Lambda Layers, python libraries are put into the /python folder which is in the path.
# As this extension is bringing its own Python runtime, and running a separate process, the path is not available.
# Hence, having the files in a different folder and adding it to the path, makes it available. 
lib_folder = Path(__file__).parent / "lib"
sys.path.insert(0,str(lib_folder))
import boto3

from google.cloud import logging
from google.oauth2 import service_account

from logs_api_http_extension.http_listener import http_server_init, RECEIVER_PORT
from logs_api_http_extension.logs_api_client import LogsAPIClient
from logs_api_http_extension.extensions_api_client import ExtensionsAPIClient

from queue import Queue

"""Here is the sample extension code.
    - The extension runs two threads. The "main" thread, will register with the Extension API and process its invoke and
    shutdown events (see next call). The second "listener" thread listens for HTTP POST events that deliver log batches.
    - The "listener" thread places every log batch it receives in a synchronized queue; during each execution slice,
    the "main" thread will make sure to process any event in the queue before returning control by invoking next again.
    - Note that because of the asynchronous nature of the system, it is possible that logs for one invoke are
    processed during the next invoke slice. Likewise, it is possible that logs for the last invoke are processed during
    the SHUTDOWN event.

Note: 

1.  This is a simple example extension to help you understand the Lambda Logs API.
    This code is not production ready. Use it with your own discretion after testing it thoroughly.  

2.  The extension code starts with a shebang. This is to bring Python runtime to the execution environment.
    This works if the lambda function is a python3.x function, therefore it brings the python3.x runtime with itself.
    It may not work for python 2.7 or other runtimes. 
    The recommended best practice is to compile your extension into an executable binary and not rely on the runtime.
  
3.  This file needs to be executable, so make sure you add execute permission to the file 
    `chmod +x logs_api_http_extension.py`

"""

class LogsAPIHTTPExtension():
    def __init__(self, agent_name, registration_body, subscription_body):
 #       print(f"extension.logs_api_http_extension: Initializing LogsAPIExternalExtension {agent_name}")
        self.agent_name = agent_name
        self.queue = Queue()
        self.logs_api_client = LogsAPIClient()
        self.extensions_api_client = ExtensionsAPIClient()

        # Register early so Runtime could start in parallel
        self.agent_id = self.extensions_api_client.register(self.agent_name, registration_body)

        # Start listening before Logs API registration
#        print(f"extension.logs_api_http_extension: Starting HTTP Server {agent_name}")
        http_server_init(self.queue)
        self.logs_api_client.subscribe(self.agent_id, subscription_body)

    def run_forever(self):        
        client = boto3.client('secretsmanager')
        res = client.get_secret_value(SecretId=os.environ['GCP_SECRET_NAME'])
        json_acct_info = json.loads(res['SecretString'])

        credentials = service_account.Credentials.from_service_account_info(json_acct_info)
        scoped_credentials = credentials.with_scopes(
            ['https://www.googleapis.com/auth/cloud-platform'])

        logging_client = logging.Client(credentials=scoped_credentials)
        log_name = "awslambda"
        logger = logging_client.logger(log_name)

        print(f"extension.logs_api_http_extension: Receiving Logs {self.agent_name}")
        while True:
            resp = self.extensions_api_client.next(self.agent_id)
            # Process the received batches if any.
            while not self.queue.empty():
                batch = self.queue.get_nowait()
                # This following line logs the events received to CloudWatch.
                # Replace it to send logs to elsewhere.
                # If you've subscribed to extension logs, e.g. "types": ["platform", "function", "extension"],
                # you'll receive the logs of this extension back from Logs API.
                # And if you log it again with the line below, it will create a cycle since you receive it back again.
                # Use `extension` log type if you'll egress it to another endpoint,
                # or make sure you've implemented a protocol to handle this case.
                # print(f"Log Batch Received from Lambda: {batch}", flush=True)

#               There are two options illustrated:
#               1. Sending the entire log batch to S3
#               2. Parsing the batch and sending individual log lines.
#                  This could be used to parse the log lines and only selectively send logs to S3, or amend for any other destination.

#               1. The following line writes the entire batch to S3
                try:               
                    for item in range(len(batch)):
                        response = logger.log_struct(batch[item])
                except Exception as e:
                    raise Exception(f"Error sending log to GCP {e}") from e
                
# Register for the INVOKE events from the EXTENSIONS API
_REGISTRATION_BODY = {
    "events": ["INVOKE", "SHUTDOWN"],
}

# Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol.

TIMEOUT_MS = 1000 # Maximum time (in milliseconds) that a batch is buffered.
MAX_BYTES = 262144 # Maximum size in bytes that the logs are buffered in memory.
MAX_ITEMS = 10000 # Maximum number of events that are buffered in memory.

_SUBSCRIPTION_BODY = {
    "destination":{
        "protocol": "HTTP",
        "URI": f"http://sandbox:{RECEIVER_PORT}",
    },
    "types": ["platform", "function"],
    "buffering": {
        "timeoutMs": TIMEOUT_MS,
        "maxBytes": MAX_BYTES,
        "maxItems": MAX_ITEMS
    }
}

def main():
#    print(f"extension.logs_api_http_extension: Starting Extension {_REGISTRATION_BODY} {_SUBSCRIPTION_BODY}")
    # Note: Agent name has to be file name to register as an external extension
    ext = LogsAPIHTTPExtension(os.path.basename(__file__), _REGISTRATION_BODY, _SUBSCRIPTION_BODY)
    ext.run_forever()

if __name__ == "__main__":
    main()

SAMテンプレートを以下のように修正します。修正点は以下の通りです

  • S3関連の記述を削除
  • Secret Managerのシークレット名を受け取るためのパラメータを追加
  • Secret Manager用の権限を追加
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Lambda Extensions S3 Logs Demo
##########################################################################
#  Parameters & Globals                                                  #
##########################################################################
Globals:
  Function:
    Tracing: Active
    Tags:
      Application: S3LogsExtensionDemo
Parameters:
  SecretName:
    Type: String
Resources:
##########################################################################
#  Lambda functions                                                      #
##########################################################################
  Function:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: logs-extension-demo-function
      Description: Logs Extension Demo Function
      CodeUri: functionsrc/
      Runtime: python3.8
      Handler: lambda_function.lambda_handler
      MemorySize: 128
      Timeout: 100
      Environment:
        Variables:
          GCP_SECRET_NAME:
            Ref: SecretName
      Layers:
        - !Ref S3LogExtensionsLayer
      Policies:
        - AWSSecretsManagerGetSecretValuePolicy:
            SecretArn: !Sub arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}*

##########################################################################
#  Lambda layers                                                         #
##########################################################################
  S3LogExtensionsLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      Description: Layer containing extension(s)
      ContentUri: extensionssrc/
      CompatibleRuntimes:
        - python3.8
      LicenseInfo: 'Available under the MIT-0 license.'
      RetentionPolicy: Delete
    Metadata:
      BuildMethod: makefile
##########################################################################
#  OUTPUTS                                                               #
##########################################################################
Outputs:
  ExtensionsLayer:
    Value: !Ref S3LogExtensionsLayer
    Description: Log Extension Layer ARN
  Function:
    Value: !Ref Function
    Description: Lambda Function

Diffはこんな感じです

--- a/s3-logs-extension-demo-zip-archive/template.yml
+++ b/s3-logs-extension-demo-zip-archive/template.yml
@@ -9,6 +9,9 @@ Globals:
     Tracing: Active
     Tags:
       Application: S3LogsExtensionDemo
+Parameters:
+  SecretName:
+    Type: String
 Resources:
 ##########################################################################
 #  Lambda functions                                                      #
@@ -25,13 +28,14 @@ Resources:
       Timeout: 100
       Environment:
         Variables:
-          S3_BUCKET_NAME:
-            Ref: LogExtensionsBucket
+          GCP_SECRET_NAME:
+            Ref: SecretName
       Layers:
         - !Ref S3LogExtensionsLayer
       Policies:
-        - S3WritePolicy:
-            BucketName: !Ref LogExtensionsBucket
+        - AWSSecretsManagerGetSecretValuePolicy:
+            SecretArn: !Sub arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}*
+
 ##########################################################################
 #  Lambda layers                                                         #
 ##########################################################################
@@ -47,17 +51,6 @@ Resources:
     Metadata:
       BuildMethod: makefile
 ##########################################################################
-#  S3 Resources                                                   #
-##########################################################################
-  LogExtensionsBucket:
-    Type: 'AWS::S3::Bucket'
-    Properties:
-      LifecycleConfiguration:
-        Rules:
-          - Id: DeleteAfterSevenDays
-            Status: "Enabled"
-            ExpirationInDays: 7
-##########################################################################
 #  OUTPUTS                                                               #
 ##########################################################################
 Outputs:
@@ -67,5 +60,3 @@ Outputs:
   Function:
     Value: !Ref Function
     Description: Lambda Function
-  LogExtensionsBucketName:
-    Value: !Ref LogExtensionsBucket

あまり本質的な部分ではないですが、せっかくなのでLambda Function本体が出力するログメッセージも修正しておきましょう

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

import json
import os

def lambda_handler(event, context):
    print(f"Function: Logging something which logging extension will send to GCP Cloud Logging")
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

デプロイ&テスト

一通り準備ができたのでデプロイ&テストしてみましょう。まずはビルドしてデプロイパッケージを準備します

$sam build

パッケージの準備ができたらデプロイします

$sam deploy --guided --parameter-overrides SecretName=<シークレットマネージャのシークレット名>

デプロイできたら適当に何度かLambdaをテスト実行してからGCPのClooud Loggingを確認してみましょう。

バッチリです!GCPからAWSのログが確認できました

まとめ

LambdaのログをGCPに送信することの意義はさておきExtensionの仕組みを理解するための良い取っ掛かりになりました。紹介したサンプルリポジトリにはPython以外にも様々なランタイムのサンプルが公開されており、実際に手を動かしならがLambda Extensionsについて学ぶための良い教材となっています。まだLambda Extensionsを試したことのない方は是非ともお試し下さい。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.