あるLambda関数から別のLambda関数を非同期で呼び出して処理を行うような機構を作成する。
AWSのStepFunctionsのステートマシンをCloudFormationで定義するとても簡単な例になる。
作成するもの
機能の概要
JSON形式のリストデータを入力パラメータとしてStepFunctionを実行し、データ登録用のLambdaを非同期で呼び出しデータ登録処理を実行する。
- JSON形式の
[{ "id": 1, "name": "xxx", ... }]
のようなリストデータをパラメータとしてStepFunctionsのステートマシンを実行する。
- ステートマシンが最初のLambda関数
InvokeRegisterData
を呼び出す。
InvokeRegisterData
がデータ登録用のLambda関数RegisterData
を非同期で呼び出す。
RegisterData
は呼び出し元から渡されたリストデータを元にDynamoDBへのデータ登録処理を実行する。
作成するリソース
サービス | 論理ID | 内容 |
---|
Lambda::Function | RegisterData | データ登録処理を行うスクリプト |
Lambda::Function | InvokeRegisterData | RegisterData を非同期で呼び出すスクリプト |
StepFunctions::StateMachine | LambdaChainStarter | 一連のワークフローを定義しておくステートマシン |
IAM::Role | InvokeRegisterDataRole | 別のLambdaの呼び出し権限、CloudWatchへのアクセスの権限をLambdaに与える |
IAM::Role | RegisterDataRole | DynamoDBへの登録権限、CloudWatchへのアクセスの権限をLambdaに与える |
IAM::Role | StepFunctionExecutionRole | ステートマシンにLambdaの呼び出し権限を与える |
DynamoDB::Table | SampleDataTable | データ登録先のDB |
Lambda関数はPython3.10で書いたソースコードをzipに圧縮し、S3上のlambda-sources-test1
というバケットに置いておく。
パラメータの定義
Lambdaのソースコードの格納フォルダ名を定義して他所から参照できるようにする。
Parameters:
LambdaSourceBucket:
Type: String
Default: lambda-sources-test1
Description: Lambda関数を格納するS3バケット名
DynamoDBの定義
データを登録する先のDynamoDBのテーブルを定義する。
簡単な例なのでテーブル名と必須項目の定義だけにする。
AttributeDefinitions
はテーブルのインデックスで使う属性(カラム)を定義しておくもので、今回はid
を設定しておく。
AttributeType
はS: 文字列型
、N: Number型
、B: バイナリ型
を指定できる。
KeySchema
はテーブルの主キーを構成する属性を指定するもので、今回はid
をKeyType: HASH
(パーティションキー)として設定する。
KeyType: RANGE
(ソートキー)のキーを加えると複合主キーにもできる。
BillingMode
はDynamoDB の課金方式を指定するもので、基本はPAY_PER_REQUEST
(従量課金)を設定しておけばよい。
SampleDataTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: SampleDataTable
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH # パーティションキー
BillingMode: PAY_PER_REQUEST
Lambda関連の定義
今回はデータ登録処理を行うRegisterData
とそれを非同期で呼び出すInvokeRegisterData
という2つのLambdaを作る想定なので、まずそれぞれに付けるロールを定義する。
RegisterData
はDynamoDBへのデータ登録を行うのでdynamodb:PutItem
を許可する。
対象のリソースにはテーブルのARNを設定するが、最後の${SampleDataTable}
のところはテンプレート内で定義したDynamoDBの論理IDSampleDataTable
を参照してそのテーブル名"SampleDataTable"
が返される。(!Ref SampleDataTable
と同じ結果)
また、実行時のログをCloudWatchで確認できるようにログ出力の権限も与えている。
RegisterDataRole:
Type: AWS::IAM::Role
Properties:
RoleName: RegisterDataRole
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: RegisterDataPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource: !Sub arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${SampleDataTable}
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/RegisterData:*
InvokeRegisterData
は他のLambdaの呼び出しを行うのでlambda:InvokeFunction
を許可し、対象リソースには後ほど定義するRegisterData
のARNを設定する。
こちらにもCloudWatchへのログ出力の権限を与えている。
InvokeRegisterDataRole:
Type: AWS::IAM::Role
Properties:
RoleName: InvokeRegisterDataRole
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: InvokeRegisterDataPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource: !GetAtt RegisterData.Arn
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/InvokeRegisterData:*
次に、RegisterData
のLambda関数を定義する。
Role
には前述のRegisterDataRole
のARNを指定する。
LambdaのソースはLambdaSourceBucket
で参照されるS3バケットの中にzip形式で保存されている。
Timeout
の設定時間(秒)は最大900秒まで設定可能で、このLambdaはデータ登録処理を行うことを考慮して少し長めに取っている。(今回のサンプルデータ程度ではそんなにかからないが)
RegisterData:
Type: AWS::Lambda::Function
Properties:
FunctionName: RegisterData
Handler: register_data.lambda_handler
Runtime: python3.10
Role: !GetAtt RegisterDataRole.Arn
Code:
S3Bucket: !Ref LambdaSourceBucket
S3Key: register_data.zip
Timeout: 300
InvokeRegisterData
のLambda関数定義も同様に、まずRole
には前述のInvokeRegisterDataRole
のARNを指定する。
このLambdaは呼び出し対象のLambdaの名前とデータ登録先のテーブル名を必要とする(ソース内容は後述)ので、TARGET_FUNCTION_NAME
とTABLE_NAME
という環境変数としてテンプレートから渡すようにすればソースコード内で直に指定しなくてよくなる。
InvokeRegisterData:
Type: AWS::Lambda::Function
Properties:
FunctionName: InvokeRegisterData
Handler: invoke_register_data.lambda_handler
Runtime: python3.10
Role: !GetAtt InvokeRegisterDataRole.Arn
Code:
S3Bucket: !Ref LambdaSourceBucket
S3Key: invoke_register_data.zip
Timeout: 30
Environment:
Variables:
TARGET_FUNCTION_NAME: !Ref RegisterData
TABLE_NAME: !Ref SampleDataTable
StepFunctions関連の定義
まずStepFunctionsの用語として「ステートマシン」と「ステート」というものが出てくるが
ステートマシン
:実行したい処理全体をまとめたワークフロー
ステート
:ワークフロー内の1つ1つの処理
とイメージすると分かりやすい。
ステートマシンを実行する際にも専用のロールを付与する必要がある。
今回作るステートマシンではInvokeRegisterData
を実行することになるのでlambda:InvokeFunction
を許可する。
StepFunctionExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName: StepFunctionExecutionRole
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: states.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: StepFunctionPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: lambda:InvokeFunction
Resource: !GetAtt InvokeRegisterData.Arn
次にステートマシン本体の定義になるが、ステートマシンで実行する処理フローはDefinitionString
に「ステート言語(Amazon States Language)」に基づいてJSON形式で記述することになる。
まず、このワークフローがどのステートから開始されるのかをStartAt
に指定しなければならない。
この例ではStartAt
にExecuteLambda
が指定されており、処理はこの1つのステートだけで構成されている。
ExecuteLambda
ステートはLambda関数の呼び出しを行う Task
タイプのステートであり、Resource
によって呼び出しタイプがlambda:invoke
であることを指定している。
Parameters
では、実行するLambda関数のARNと入力値として渡すペイロードの構造が定義されている。
ここで "Input.$": "$"
は、ステートマシン実行時に最初に渡される入力値全体をLambda関数に渡すことを意味する。
処理はこのステートしかないので、"End": true
によりこのステートで処理が完了することを示している。
StateMachine:
Type: AWS::StepFunctions::StateMachine
Properties:
StateMachineName: LambdaChainStarter
RoleArn: !GetAtt StepFunctionExecutionRole.Arn
DefinitionString:
Fn::Sub: |
{
"StartAt": "ExecuteLambda",
"States": {
"ExecuteLambda": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "${InvokeRegisterData.Arn}",
"Payload": {
"Input.$": "$"
}
},
"End": true
}
}
}
Lambdaスクリプトのソース
Lambda関数のソースコードはPython3.10で記述している。
まずはステートマシンの方から直接実行されるInvokeRegisterData
関数のソースになる。
Lambda関数の内部から別のLambda関数を呼び出すためにはboto3 Lambdaクライアントのinvoke()
を使う。
invoke()
の引数には、呼び出し先の関数名FunctionName
、呼び出しタイプInvocationType
、入力値Payload
を指定する。
FunctionName
にはCloudFormationテンプレートで定義した環境変数TARGET_FUNCTION_NAME
を指定する。
別のLambdaを非同期で呼び出すのでInvocationType
はEvent
を指定する。
なお、同期呼び出しの場合はInvocationType
はRequestResponse
(デフォルト)である。
Payload
には、ステートマシンから渡された入力値event['Input']
と登録先テーブル名(環境変数TABLE_NAME
)を設定している。
# invoke_register_data.py
import boto3
import os
import json
def lambda_handler(event, context):
try:
target_function_name = os.environ.get('TARGET_FUNCTION_NAME')
table_name = os.environ.get('TABLE_NAME')
payload = {
'records': event['Input'], # 登録データのリスト
'table_name': table_name # 登録先のテーブル名
}
client = boto3.client('lambda')
res = client.invoke(
FunctionName=target_function_name,
InvocationType='Event', # 非同期呼び出し
Payload=json.dumps(payload)
)
return {'status': 'Target lambda invoked'}
except Exception as e:
print(f'Error occurred: {e}')
raise
次に、InvokeRegisterData
から非同期で呼ばれるRegisterData
関数のソースコード。
呼び出し元から受け取った登録データrecords
と登録先テーブル名table_name
を元にDynamoDBへ順次登録を行う。
この例だとそれ程でもないが、負荷が大きく処理時間が長くなることが予想されるようなLambda関数はこのように非同期で実行するのがよい。
# register_data.py
import boto3
import time
dynamodb = boto3.resource('dynamodb')
def lambda_handler(event, context):
try:
data_list = event.get('records', [])
table_name = event.get('table_name')
if not data_list or not table_name:
raise ValueError(
'Invalid input: "records" and "table_name" are required.')
table = dynamodb.Table(table_name)
for item in data_list:
table.put_item(Item=item)
print(f'Data registration completed. - Total: {len(data_list)}')
return {'status': 'done', 'count': len(data_list)}
except Exception as e:
print(f'Error occurred: {e}')
raise
デプロイ用のスクリプト
./deploy_cf.sh lambda-async-invoke
ステートマシンの実行・確認
デプロイがうまくいっていればAWSコンソールの「StepFunctions/ステートマシン」の一覧にLambdaChainStarter
というステートマシンが作成されている。
このステートマシンの詳細を開き、「実行を開始」ボタンをクリックすると入力値を指定するフォームが出てくるのでテスト用のサンプルデータをJSON形式で入力する。
[
{ "id": "1", "name": "ballpoint pen", "price": 150 },
{ "id": "2", "name": "eraser", "price": 80 },
{ "id": "3", "name": "clear file", "price": 200 }
]
これで実行を開始するとグラフビューによってステートマシンの実行の様子を目視したり、入力値や実行のレスポンス(状態の出力)を確認したりすることができる。
ステートが1個しかない簡単なものなので処理は一瞬で終わり、今回の場合はInvokeRegisterData
の処理がSat, 05 Jul 2025 08:46:00 GMT
に完了している。
RegisterData
の実行ログはCloudWatchで確認できる。
呼び出し元の処理完了時刻よりも遅い2025-07-05T08:46:11.250Z
にRegisterData
の処理が完了しており、RegisterData
がちゃんと非同期で呼び出せていることが分かる。
参考
CloudFormationの公式リファレンス
boto3公式リファレンス