Ccmmutty logo
Commutty IT
0 pv16 min read

CloudFormationでLambdaの非同期呼び出しを含むステートマシンを定義する

https://cdn.magicode.io/media/notebox/746d5291-bee7-46fa-ae18-81478ca79b66.jpeg
あるLambda関数から別のLambda関数を非同期で呼び出して処理を行うような機構を作成する。
AWSのStepFunctionsのステートマシンをCloudFormationで定義するとても簡単な例になる。

作成するもの

機能の概要

JSON形式のリストデータを入力パラメータとしてStepFunctionを実行し、データ登録用のLambdaを非同期で呼び出しデータ登録処理を実行する。
  • JSON形式の[{ "id": 1, "name": "xxx", ... }]のようなリストデータをパラメータとしてStepFunctionsのステートマシンを実行する。
  • ステートマシンが最初のLambda関数InvokeRegisterDataを呼び出す。
  • InvokeRegisterDataがデータ登録用のLambda関数RegisterDataを非同期で呼び出す。
  • RegisterDataは呼び出し元から渡されたリストデータを元にDynamoDBへのデータ登録処理を実行する。

作成するリソース

サービス論理ID内容
Lambda::FunctionRegisterDataデータ登録処理を行うスクリプト
Lambda::FunctionInvokeRegisterDataRegisterDataを非同期で呼び出すスクリプト
StepFunctions::StateMachineLambdaChainStarter一連のワークフローを定義しておくステートマシン
IAM::RoleInvokeRegisterDataRole別のLambdaの呼び出し権限、CloudWatchへのアクセスの権限をLambdaに与える
IAM::RoleRegisterDataRoleDynamoDBへの登録権限、CloudWatchへのアクセスの権限をLambdaに与える
IAM::RoleStepFunctionExecutionRoleステートマシンにLambdaの呼び出し権限を与える
DynamoDB::TableSampleDataTableデータ登録先のDB
Lambda関数はPython3.10で書いたソースコードをzipに圧縮し、S3上のlambda-sources-test1というバケットに置いておく。

CloudFormationテンプレート

パラメータの定義

Lambdaのソースコードの格納フォルダ名を定義して他所から参照できるようにする。
Parameters:
  LambdaSourceBucket:
    Type: String
    Default: lambda-sources-test1
    Description: Lambda関数を格納するS3バケット名

DynamoDBの定義

データを登録する先のDynamoDBのテーブルを定義する。
簡単な例なのでテーブル名と必須項目の定義だけにする。
AttributeDefinitionsはテーブルのインデックスで使う属性(カラム)を定義しておくもので、今回はidを設定しておく。
AttributeTypeS: 文字列型N: Number型B: バイナリ型を指定できる。
KeySchemaはテーブルの主キーを構成する属性を指定するもので、今回はidKeyType: HASH(パーティションキー)として設定する。
KeyType: RANGE(ソートキー)のキーを加えると複合主キーにもできる。
BillingModeはDynamoDB の課金方式を指定するもので、基本はPAY_PER_REQUEST(従量課金)を設定しておけばよい。
SampleDataTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: SampleDataTable
      AttributeDefinitions:
        - AttributeName: id
          AttributeType: S
      KeySchema:
        - AttributeName: id
          KeyType: HASH # パーティションキー
      BillingMode: PAY_PER_REQUEST

Lambda関連の定義

今回はデータ登録処理を行うRegisterDataとそれを非同期で呼び出すInvokeRegisterDataという2つのLambdaを作る想定なので、まずそれぞれに付けるロールを定義する。
RegisterDataはDynamoDBへのデータ登録を行うのでdynamodb:PutItemを許可する。
対象のリソースにはテーブルのARNを設定するが、最後の${SampleDataTable}のところはテンプレート内で定義したDynamoDBの論理IDSampleDataTableを参照してそのテーブル名"SampleDataTable"が返される。(!Ref SampleDataTableと同じ結果)
また、実行時のログをCloudWatchで確認できるようにログ出力の権限も与えている。
RegisterDataRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: RegisterDataRole
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: RegisterDataPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - dynamodb:PutItem
                Resource: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${SampleDataTable}
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/RegisterData:*
InvokeRegisterDataは他のLambdaの呼び出しを行うのでlambda:InvokeFunctionを許可し、対象リソースには後ほど定義するRegisterDataのARNを設定する。
こちらにもCloudWatchへのログ出力の権限を与えている。
InvokeRegisterDataRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: InvokeRegisterDataRole
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: InvokeRegisterDataPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                Resource: !GetAtt RegisterData.Arn
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/InvokeRegisterData:*
次に、RegisterDataのLambda関数を定義する。
Roleには前述のRegisterDataRoleのARNを指定する。
LambdaのソースはLambdaSourceBucketで参照されるS3バケットの中にzip形式で保存されている。
Timeoutの設定時間(秒)は最大900秒まで設定可能で、このLambdaはデータ登録処理を行うことを考慮して少し長めに取っている。(今回のサンプルデータ程度ではそんなにかからないが)
RegisterData:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: RegisterData
      Handler: register_data.lambda_handler
      Runtime: python3.10
      Role: !GetAtt RegisterDataRole.Arn
      Code:
        S3Bucket: !Ref LambdaSourceBucket
        S3Key: register_data.zip
      Timeout: 300
InvokeRegisterDataのLambda関数定義も同様に、まずRoleには前述のInvokeRegisterDataRoleのARNを指定する。
このLambdaは呼び出し対象のLambdaの名前とデータ登録先のテーブル名を必要とする(ソース内容は後述)ので、TARGET_FUNCTION_NAMETABLE_NAMEという環境変数としてテンプレートから渡すようにすればソースコード内で直に指定しなくてよくなる。
InvokeRegisterData:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: InvokeRegisterData
      Handler: invoke_register_data.lambda_handler
      Runtime: python3.10
      Role: !GetAtt InvokeRegisterDataRole.Arn
      Code:
        S3Bucket: !Ref LambdaSourceBucket
        S3Key: invoke_register_data.zip
      Timeout: 30
      Environment:
        Variables:
          TARGET_FUNCTION_NAME: !Ref RegisterData
          TABLE_NAME: !Ref SampleDataTable

StepFunctions関連の定義

まずStepFunctionsの用語として「ステートマシン」と「ステート」というものが出てくるが
ステートマシン:実行したい処理全体をまとめたワークフロー
ステート:ワークフロー内の1つ1つの処理
とイメージすると分かりやすい。
ステートマシンを実行する際にも専用のロールを付与する必要がある。
今回作るステートマシンではInvokeRegisterDataを実行することになるのでlambda:InvokeFunctionを許可する。
StepFunctionExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: StepFunctionExecutionRole
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: states.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: StepFunctionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: lambda:InvokeFunction
                Resource: !GetAtt InvokeRegisterData.Arn
次にステートマシン本体の定義になるが、ステートマシンで実行する処理フローはDefinitionStringに「ステート言語(Amazon States Language)」に基づいてJSON形式で記述することになる。
まず、このワークフローがどのステートから開始されるのかをStartAtに指定しなければならない。
この例ではStartAtExecuteLambdaが指定されており、処理はこの1つのステートだけで構成されている。
ExecuteLambdaステートはLambda関数の呼び出しを行う Taskタイプのステートであり、Resourceによって呼び出しタイプがlambda:invokeであることを指定している。
Parametersでは、実行するLambda関数のARNと入力値として渡すペイロードの構造が定義されている。
ここで "Input.$": "$"は、ステートマシン実行時に最初に渡される入力値全体をLambda関数に渡すことを意味する。
処理はこのステートしかないので、"End": trueによりこのステートで処理が完了することを示している。
StateMachine:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: LambdaChainStarter
      RoleArn: !GetAtt StepFunctionExecutionRole.Arn
      DefinitionString:
        Fn::Sub: |
          {
            "StartAt": "ExecuteLambda",
            "States": {
              "ExecuteLambda": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "Parameters": {
                  "FunctionName": "${InvokeRegisterData.Arn}",
                  "Payload": {
                    "Input.$": "$"
                  }
                },
                "End": true
              }
            }
          }

Lambdaスクリプトのソース

Lambda関数のソースコードはPython3.10で記述している。
まずはステートマシンの方から直接実行されるInvokeRegisterData関数のソースになる。
Lambda関数の内部から別のLambda関数を呼び出すためにはboto3 Lambdaクライアントのinvoke()を使う。
invoke()の引数には、呼び出し先の関数名FunctionName、呼び出しタイプInvocationType、入力値Payloadを指定する。
FunctionNameにはCloudFormationテンプレートで定義した環境変数TARGET_FUNCTION_NAMEを指定する。
別のLambdaを非同期で呼び出すのでInvocationTypeEventを指定する。
なお、同期呼び出しの場合はInvocationTypeRequestResponse(デフォルト)である。 Payloadには、ステートマシンから渡された入力値event['Input']と登録先テーブル名(環境変数TABLE_NAME)を設定している。
# invoke_register_data.py
import boto3
import os
import json


def lambda_handler(event, context):
    try:
        target_function_name = os.environ.get('TARGET_FUNCTION_NAME')
        table_name = os.environ.get('TABLE_NAME')
        payload = {
            'records': event['Input'],  # 登録データのリスト
            'table_name': table_name  # 登録先のテーブル名
        }
        client = boto3.client('lambda')
        res = client.invoke(
            FunctionName=target_function_name,
            InvocationType='Event',  # 非同期呼び出し
            Payload=json.dumps(payload)
        )
        return {'status': 'Target lambda invoked'}
    except Exception as e:
        print(f'Error occurred: {e}')
        raise
次に、InvokeRegisterDataから非同期で呼ばれるRegisterData関数のソースコード。
呼び出し元から受け取った登録データrecordsと登録先テーブル名table_nameを元にDynamoDBへ順次登録を行う。
この例だとそれ程でもないが、負荷が大きく処理時間が長くなることが予想されるようなLambda関数はこのように非同期で実行するのがよい。
# register_data.py
import boto3
import time

dynamodb = boto3.resource('dynamodb')


def lambda_handler(event, context):
    try:
        data_list = event.get('records', [])
        table_name = event.get('table_name')
        if not data_list or not table_name:
            raise ValueError(
                'Invalid input: "records" and "table_name" are required.')
        table = dynamodb.Table(table_name)
        for item in data_list:
            table.put_item(Item=item)
        print(f'Data registration completed. - Total: {len(data_list)}')
        return {'status': 'done', 'count': len(data_list)}
    except Exception as e:
        print(f'Error occurred: {e}')
        raise

デプロイ用のスクリプト

S3バケット間のファイル自動コピー機構をCloudFormationで作るに記載のdeploy_cf.shを使ってデプロイを行う。
作成するスタック名をlambda-async-invokeとし、スタック名を指定してスクリプトを実行する。
./deploy_cf.sh lambda-async-invoke

ステートマシンの実行・確認

デプロイがうまくいっていればAWSコンソールの「StepFunctions/ステートマシン」の一覧にLambdaChainStarterというステートマシンが作成されている。
このステートマシンの詳細を開き、「実行を開始」ボタンをクリックすると入力値を指定するフォームが出てくるのでテスト用のサンプルデータをJSON形式で入力する。
[
  { "id": "1", "name": "ballpoint pen", "price": 150 },
  { "id": "2", "name": "eraser", "price": 80 },
  { "id": "3", "name": "clear file", "price": 200 }
]
これで実行を開始するとグラフビューによってステートマシンの実行の様子を目視したり、入力値や実行のレスポンス(状態の出力)を確認したりすることができる。
ステートが1個しかない簡単なものなので処理は一瞬で終わり、今回の場合はInvokeRegisterDataの処理がSat, 05 Jul 2025 08:46:00 GMTに完了している。
RegisterDataの実行ログはCloudWatchで確認できる。
呼び出し元の処理完了時刻よりも遅い2025-07-05T08:46:11.250ZRegisterDataの処理が完了しており、RegisterDataがちゃんと非同期で呼び出せていることが分かる。

参考

CloudFormationの公式リファレンス
boto3公式リファレンス

Discussion

コメントにはログインが必要です。