こんにちは、櫻井です。
本記事は、ギークフィード2023 Advent Calendarの14日目の記事です。
目次
はじめに
「lambdaを定期実行したい!」という場合、EventBridgeのスケジューラを使うことがまず候補に上がるかと思います。
しかしEventBridgeのスケジューラは、最短1分間でしか定期実行を行うことが出来ません。
そのため、10秒ごとに定期実行したい場合は要件を満たすことが出来ません。そういった場合はSQSを使うことで解決することが出来ます。
※この方法を使う場合、lambdaが再帰ループしているという判定になってしまうため、16 回の再帰呼び出しの後、Amazon SQS、AWS Lambda 間の再帰呼び出しを自動的に停止します(詳しくはこちらを確認してください)
AWSサポートに問い合わせを行うことで、再帰ループの制限を解除することも出来ますが、解除する場合は慎重に管理をしてください。
今回は制限を解除したアカウントで解説します。
構成
上記アーキテクチャは5つのリソースで出来ています。
- メッセージの遅延時間を、定期実行したい秒数に設定したSQS
- SQSから送られてきたメッセージをトリガーに、定期実行したい処理を行った後、再度SQSにメッセージを送信するlambda
- lambdaで何らかのエラーが発生し、SQSにメッセージが送られず再帰処理を行わなくなってしまったことを検知し、SNSにメッセージを送信するcloudwatch aram
- cloudwatch aramからメッセージを受け取るためのSNS
- SNSにメッセージが入ったことをトリガーに起動し、1のSQSにメッセージを送信するlambda
上記リソースはCDKを使って作成しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
import * as cdk from "aws-cdk-lib"; import { Construct } from "constructs"; import * as sqs from "aws-cdk-lib/aws-sqs"; import * as lambda from "aws-cdk-lib/aws-lambda"; import * as sns from "aws-cdk-lib/aws-sns"; import * as subs from "aws-cdk-lib/aws-sns-subscriptions"; import * as cloudwatch from "aws-cdk-lib/aws-cloudwatch"; import * as cloudwatchActions from "aws-cdk-lib/aws-cloudwatch-actions"; import * as iam from "aws-cdk-lib/aws-iam"; import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs"; export class CdkStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); // 再帰的に呼び出すLambda関数 const queue = new sqs.Queue(this, "GeekBlogQueue", { deliveryDelay: cdk.Duration.seconds(10), }); const recursiveFunction = new NodejsFunction(this, "GeekBlogFunction", { runtime: lambda.Runtime.NODEJS_18_X, entry: "src/recursive.js", handler: "handler", environment: { QUEUE_URL: queue.queueUrl, }, memorySize: 128, timeout: cdk.Duration.seconds(30), }); recursiveFunction.addToRolePolicy( new iam.PolicyStatement({ actions: [ "sqs:SendMessage", "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", ], resources: [queue.queueArn], }) ); recursiveFunction.addEventSourceMapping("GeekBlogEventSource", { eventSourceArn: queue.queueArn, batchSize: 10, enabled: true, }); const executionCountZeroTopic = new sns.Topic( this, "GeekBlogExecutionCountZeroTopic" ); const executionCountZeroFunction = new NodejsFunction( this, "GeekBlogExecutionCountZeroFunction", { runtime: lambda.Runtime.NODEJS_18_X, entry: "src/executionZero.js", handler: "handler", environment: { QUEUE_URL: queue.queueUrl, }, memorySize: 128, timeout: cdk.Duration.seconds(30), } ); executionCountZeroFunction.addToRolePolicy( new iam.PolicyStatement({ actions: [ "sqs:SendMessage", "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", ], resources: [queue.queueArn], }) ); executionCountZeroFunction.addPermission( "GeekBlogExecutionCountZeroFunctionPermission", { principal: new iam.ServicePrincipal("sns.amazonaws.com"), sourceArn: executionCountZeroTopic.topicArn, action: "lambda:InvokeFunction", } ); executionCountZeroTopic.addSubscription( new subs.LambdaSubscription(executionCountZeroFunction) ); const recursiveZeroInvocationAlerm = new cloudwatch.Alarm( this, "GeekBlogRecursiveZeroInvocationAlerm", { metric: recursiveFunction.metricInvocations({ period: cdk.Duration.minutes(1), statistic: "Sum", }), threshold: 0, evaluationPeriods: 1, comparisonOperator: cloudwatch.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD, treatMissingData: cloudwatch.TreatMissingData.BREACHING, } ); recursiveZeroInvocationAlerm.addAlarmAction( new cloudwatchActions.SnsAction(executionCountZeroTopic) ); } } |
SQS
CDK抜粋
1 2 3 |
const queue = new sqs.Queue(this, "GeekBlogQueue", { deliveryDelay: cdk.Duration.seconds(10), }); |
まずは、再帰処理を行うためのlambdaのトリガーに設定するSQSを作成します。
deliveryDelay というプロパティは、メッセージがキューに入ったあと、設定した秒数だけ配信を遅延させます。
上記の例だと10秒に設定しているため、トリガーに設定したlambdaはキューにメッセージが入った10秒後に発火することになります。
これにより、10秒ごとの定期実行を実現しています。
再帰処理を行うlambda
・CDK抜粋
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
const recursiveFunction = new NodejsFunction(this, "GeekBlogFunction", { runtime: lambda.Runtime.NODEJS_18_X, entry: "src/recursive.js", handler: "handler", environment: { QUEUE_URL: queue.queueUrl, }, memorySize: 128, timeout: cdk.Duration.seconds(30), }); recursiveFunction.addToRolePolicy( new iam.PolicyStatement({ actions: [ "sqs:SendMessage", "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", ], resources: [queue.queueArn], }) ); recursiveFunction.addEventSourceMapping("GeekBlogEventSource", { eventSourceArn: queue.queueArn, batchSize: 1, enabled: true, }); |
・再帰的に実行されるlambda
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
import { SQSClient, DeleteMessageCommand, SendMessageCommand, } from "@aws-sdk/client-sqs"; const sqsClient = new SQSClient(); const queueUrl = process.env.QUEUE_URL; export const handler = async (event) => { try { const sendMessageCommand = new SendMessageCommand({ QueueUrl: queueUrl, MessageBody: "guruguru", }); await sqsClient.send(sendMessageCommand); return await recursive(); } catch (error) { console.log(error); await deleteMessage(event.Records); throw new Error("処理停止"); } }; async function recursive() { // ここに定期実行したい処理を記述する console.log("定期実行したい処理"); return { status: "OK" }; } async function deleteMessage(records) { for (const record of records) { const receiptHandle = record.receiptHandle; const deleteMessageCommand = new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: receiptHandle, }); await sqsClient.send(deleteMessageCommand); } } |
再帰処理を行うlambdaでは、
- SQSにメッセージを送信する処理を実行する
- 定期実行したい処理を実行する
- なんらかのエラーで失敗した場合は受け取ったメッセージを削除する処理を実行する
という流れになっています。
最初にSQSにメッセージを送信している理由は、定期実行したい関数(recursive)の実行したあとにSQSにメッセージを送る場合だと、”定期実行したい関数の処理時間 + メッセージの遅延時間” が次回このlambdaが実行されるまでの待機時間になってしまいます。
上記のコードのように、定期実行したい処理がログを出すだけだった場合ではあまり誤差はないかもしれません。
しかし“定期実行処理に5秒かかってしまう”というようなケースの場合は、次にこの関数が呼び出されるのは
5(定期実行したい関数の処理時間) + 10(メッセージの遅延時間)= 15秒後
になるため、目的の10秒ごとの定期実行とは大きな誤差が生まれてしまうため、このような順序で実行しています。
例外発生時のみ受け取ったメッセージを削除している理由について、
通常lambdaは処理が正常に完了すれば、SQSからメッセージを受け取ったメッセージを自動的に削除してくれます。
しかし、何らかのエラーによって異常終了した場合は、キューからメッセージを削除せずにメッセージが残り続けてしまいます。
その場合キュー内には、”エラーにより残ってしまったメッセージ” と “当lambdaの最初に送信したメッセージ” の2件が貯まることになります。
そうなると、2件のメッセージがトリガーに2回のlambdaを呼び出してしまうため、10秒ごとに2回のlambdaが実行されてしまうことになります。
この問題を回避するために、lambdaが何らかのエラーによって実行失敗してしまったときの例外処理に受け取ったSQSのメッセージ削除を含めています。
再帰処理が行われていないことを検知し、復旧する
・CDK抜粋
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
const executionCountZeroTopic = new sns.Topic( this, "GeekBlogExecutionCountZeroTopic" ); const executionCountZeroFunction = new NodejsFunction( this, "GeekBlogExecutionCountZeroFunction", { runtime: lambda.Runtime.NODEJS_18_X, entry: "src/executionZero.js", handler: "handler", environment: { QUEUE_URL: queue.queueUrl, }, memorySize: 128, timeout: cdk.Duration.seconds(30), } ); executionCountZeroFunction.addToRolePolicy( new iam.PolicyStatement({ actions: [ "sqs:SendMessage" ], resources: [queue.queueArn], }) ); executionCountZeroFunction.addPermission( "GeekBlogExecutionCountZeroFunctionPermission", { principal: new iam.ServicePrincipal("sns.amazonaws.com"), sourceArn: executionCountZeroTopic.topicArn, action: "lambda:InvokeFunction", } ); executionCountZeroTopic.addSubscription( new subs.LambdaSubscription(executionCountZeroFunction) ); const recursiveZeroInvocationAlerm = new cloudwatch.Alarm( this, "GeekBlogRecursiveZeroInvocationAlerm", { metric: recursiveFunction.metricInvocations({ period: cdk.Duration.minutes(1), statistic: "Sum", }), threshold: 0, evaluationPeriods: 1, comparisonOperator: cloudwatch.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD, treatMissingData: cloudwatch.TreatMissingData.BREACHING, } ); recursiveZeroInvocationAlerm.addAlarmAction( new cloudwatchActions.SnsAction(executionCountZeroTopic) ); |
・アラーム検知時に実行されるlambda
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs"; const sqsClient = new SQSClient(); export const handler = async (event, context) => { console.log(event); const queueUrl = process.env.QUEUE_URL; const sendMessageCommand = new SendMessageCommand({ QueueUrl: queueUrl, MessageBody: "retry", }); const sendMessageResult = await sqsClient.send(sendMessageCommand); console.log(sendMessageResult); return sendMessageResult; }; |
CloudWatchAlarm
先程の”再帰処理を行うlambda”が何らかの原因で再帰実行されなくなってしまったことを検知し、SNSにメッセージを送ります。
監視項目をlambda実行数とし、1分間の総実行回数が0回の状態が直近の1データポイントで発生した場合、アラームが上がるように設定しています。
アラーム時のアクションは”アラーム検知時に実行されるlambda”のトリガーに設定しているSNSにメッセージを送るようにしています。
lambda
アラーム検知時に実行されるlambdaは、”再帰処理を行うlambda”のトリガーに設定しているSQSにメッセージを送信します。
こうすることで、何らかの理由で”再帰処理を行うlambda”の実行が止まってしまった場合でも、自動的に復旧することが出来ます。
動作確認
最後に、上記リソースをデプロイして、動作確認を行います。
正常系
上記コマンドでデプロイが完了した場合、以下の流れで再帰処理が行われます
- 再帰処理を行うlambdaが起動されていないので、CloudWatch アラームがアラーム状態になります。
- CloudWatchアラームのアクションでSNSにメッセージを送信します。
- SNSにメッセージが入ったことをトリガーに”アラーム検知時に実行されるlambda”が発火し、SQSにメッセージを送信します。
- SQSのメッセージをトリガーに再帰処理を実行するlambdaが発火し、再度SQSにメッセージを送信した後、定期実行したい処理を実行します。
デプロイが完了している場合、再帰処理が始まっているはずです。再帰処理を実行するlambdaのログを確認してみます。
以下の画像のタイムスタンプを確認すると、10秒ごとに実行されていることがわかります。
異常系
次に、何らかの問題が起きて、再帰処理を行うlambdaがエラーで終了してしまったときの挙動を確認します。
再帰処理を行うlambdaに少し変更を加えて(11~14行目)、1/50で処理が失敗するようにしました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
import { SQSClient, DeleteMessageCommand, SendMessageCommand, } from "@aws-sdk/client-sqs"; const sqsClient = new SQSClient(); const queueUrl = process.env.QUEUE_URL; export const handler = async (event) => { try { // 1/50の確率で処理が停止する if (Math.floor(Math.random() * 50) === 0) { throw new Error("処理停止"); } const sendMessageCommand = new SendMessageCommand({ QueueUrl: queueUrl, MessageBody: "guruguru", }); await sqsClient.send(sendMessageCommand); return await recursive(); } catch (error) { console.log(error); await deleteMessage(event.Records); return { status: "NG" }; } }; async function recursive() { // ここに定期実行したい処理を記述する console.log("定期実行したい処理"); return { status: "OK" }; } async function deleteMessage(records) { for (const record of records) { const receiptHandle = record.receiptHandle; const deleteMessageCommand = new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: receiptHandle, }); await sqsClient.send(deleteMessageCommand); } } |
以下のログを確認すると、10秒ごとに実行された後、エラーが発生していることがわかります。
エラーが発生することでキュー内からメッセージがなくなってしまうため、ループ機能が停止してしまいます。
ループ機能が停止し、1分あたりの実行数が0にっていることをCloudWatch Alarmが検知し、アラームアクションとしてSNSにメッセージを送信します。
以下の画像はメッセージをトリガーにlambdaが起動した際のログです。処理が停止してから再起動するためのlambdaが実行されるまでに6分程度のタイムラグが有ることがわかりました。
以下のログを確認すると、再起動実行後、再度10秒ごとに定期実行が行われていることがわかります。
検証が終わったらすべてのリソースを削除するか、再帰処理を行うlambdaのトリガーをDisabledにし、CloudWatch Alarmを停止させておきましょう。
削除、もしくは停止を行わない場合、再帰処理が停止せず、課金に影響します。
まとめ
今回はEventBridgeでは対応できない間隔での定期実行する方法を紹介しました。
SQS + lambdaを使った方法はリスクもあるので、今後のアップデートでEventBridgeが1分間隔以下での定期実行ができるようになることに期待したいです。
今回紹介した方法を実装する場合、細心の注意を払って利用してください。
- NAT GatawayからNAT インスタンスに乗り換えて約95%のコスト削減をしてみた - 2024-12-25
- Amazon Connectで同じ電話番号から何回かかってきたかをカウントしてみた - 2024-12-25
- 特定の時間あたりのlambda実行数をSlackに通知する - 2023-12-23
- 公衆電話からでも使える電話帳サービスをLEX + AmazonConnectで作ってみた - 2023-12-19
- SQSを使ってlambdaを10秒ごとに定期実行する - 2023-12-14