はじめに
CTOの都筑(@tsuzukit2)です。
DROBE では様々な外部サービスを利用していますが、事前に設定しておく事で外部サービス側で特定のイベントが発生した際に DROBE 側に HTTP のリクエストを送ってくれる仕組みを多く利用しています。
Webhook のわかりやすい例としては、例えば SendGrid のような外部サービスでメールの配信を行っている場合に、実際にメールの配信結果がどうだったかをアプリケーションで確実に検知したいといった場合に、SendGrid での配信完了イベントをトリガーとした Webhook を設定します。
こういった仕組みを使う事で、アプリケーション側ではメールの送信をリクエストした事だけではなく、メールがしっかりと配信された事、もしくはエラーとなってしまった事などを検知し DB に保存したり CS に役立てたりなどという事が出来ます。
この記事では Webhook を使う事によって短期間に大量のアクセスが返ってきてしまうような場合に DROBE がどのように対応しているかを解説します。
課題
そもそも Webhook で大量のアクセスが返ってきてしまう場合とはどういった場合でしょうか?先ほどの SendGrid の例で言うと、アプリケーションの機能として任意のタイミングでユーザー全員にメールマガジンを配信したい場合などが考えられます。
例えばユーザーが 100 万人のサービスで一斉に SendGrid のようなメール送信を行う外部サービスに送信リクエストを送ったらどうなるでしょうか? もちろんサービスの規約や内部実装に依存すると思いますが、多くの大規模なサービスではある程度大量のリクエストが来ても特に問題なく捌き素直に webhook を返すという挙動になると思います。その際にアプリケーション側では大量の HTTP リクエストが一気に来る事になるので (この場合では 100 万人分の Webhook が)、自身が作り出した DDOS 攻撃にアプリケーションがされされるというような状況になります。
Lambda と SQS でリクエストをスロットリングする
DROBE ではこの問題に対処するために Lambda と SQS を用いてリクエストをスロットリングする proxy を作って対処しています。
挙動はいたってシンプルで、外部サービスからのリクエストが来たら AWS Lambda によってリクエストを受けて Header と Body をそのまま SQS に入れていきます。同時に別の Lambda を SQS からのリクエストをデキューし、Header と Body を使って HTTP リクエストをアプリケーションに投げなおします。
ここで API Gateway からのリクエストを SQS に入れていくラムダ (api2sqs と呼んでいます) は並列で動作するように設定しておきつつ、SQS からメッセージをデキューしてアプリケーションに HTTP リクエストを投げ直すラムダ (sqs2upstream と呼んでいます) の並列実行数を制限しておきます。
こうする事で、アプリケーションサーバーに一切の変更を加える事なく、Webhook など大量の外部サービスからのアクセスによるアクセス増を一定のレートに制限する事が可能になります。(もちろん SQS の詰まり具合に応じて Webhook がアプリケーションサーバーに届くまでに一定のタイムラグがあるのでリアルタイム性が重要な機能には使えないという欠点があります。)
Terraform によるセットアップ
ここからは上記の webhook proxy の terraform によるセットアップを解説していきます。
前提として、API Gateway で利用するドメインはすでに準備が出来ていて zone までは手動で切ってあり、ドメインに来たアクセスは zone にくるようにセットアップが出来ているものとします。また SSL のための Certificate も ACM を使って事前に準備しているものとしています。
Lambda のソースコード
まずは API Gateway からリクエストを受け取って SQS に入れる所のラムダのソースコードを書きます。ここでは js で書いていますがどんな言語でも問題ありません。
const https = require('https'); var AWS = require('aws-sdk'); exports.handler = async (event) => { const sqsUrl = process.env.SQS_URL # ここでは terraform 側から変数として渡している const messageBody = { 'resource': event.resource, 'headers': JSON.parse(event.headers), 'body': JSON.parse(event.body) } var params = { MessageBody: JSON.stringify(messageBody), QueueUrl: sqsUrl, }; const sqs = new AWS.SQS(); const queueResponse = await sqs.sendMessage(params).promise() return { "statusCode": 200 }; };
次に SQS からデキューしてアプリケーション側に HTTP リクエストを投げ直す処理を書きます。
const https = require('https'); exports.handler = event => { const upstream = process.env.UPSTREAM # ここも terraform 側から変数として渡している const path = process.env.PATH event.Records.forEach(record => { const { body } = record; const parsedBody = JSON.parse(body) const data = JSON.stringify(parsedBody.body) const options = { host: upstream, path: path, port: 443, headers: parsedBody.headers method: 'POST', } const req = https.request(options, res => { console.log(`statusCode: ${res.statusCode}`) }).on('error', error => { console.error(error) }) req.write(data) req.end() }); };
Lambda
次に Terraform で Lambda を作ります。
locals { function_name_sqs2upstream = "lambda_proxy_sqs2upstream-${var.app_env}" } locals { function_name_api2sqs = "lambda_proxy_api2sqs-${var.app_env}" } data "aws_iam_policy_document" "lambda_assume_role" { statement { actions = ["sts:AssumeRole"] principals { type = "Service" identifiers = [ "lambda.amazonaws.com", ] } } } resource "aws_iam_role" "lambda_proxy_sqs2upstream" { name = "lambda_proxy-sqs2upstream-${var.app_env}" assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json } resource "aws_iam_role" "lambda_proxy_api2sqs" { name = "lambda_proxy-api2sqs-${var.app_env}" assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json } data "aws_iam_policy" "lambda_execution_policy" { arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" } # SQS からメッセージを受けたり、メッセージを削除する権限を付与する data "aws_iam_policy_document" "sqs2upstream_policy_document" { source_json = data.aws_iam_policy.lambda_execution_policy.policy statement { effect = "Allow" actions = [ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes" ] resources = [aws_sqs_queue.sqs_proxy.arn] } } resource "aws_iam_policy" "sqs2upstream_policy" { name = "sqs2upstream_policy-${var.app_env}" policy = data.aws_iam_policy_document.sqs2upstream_policy_document.json } resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream" { role = aws_iam_role.lambda_proxy_sqs2upstream.name policy_arn = aws_iam_policy.sqs2upstream_policy.arn } resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream_vpc_execution_role" { role = aws_iam_role.lambda_proxy_sqs2upstream.name policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole" } # SQS にメッセージを送ったり削除したりする権限を付与する data "aws_iam_policy_document" "api2sqs_policy_document" { source_json = data.aws_iam_policy.lambda_execution_policy.policy statement { effect = "Allow" actions = [ "sqs:SendMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes" ] resources = [aws_sqs_queue.sqs_proxy.arn] } } resource "aws_iam_policy" "api2sqs_policy" { name = "api2sqs_policy-${var.app_env}" policy = data.aws_iam_policy_document.api2sqs_policy_document.json } resource "aws_iam_role_policy_attachment" "lambda_proxy_api2sqs" { role = aws_iam_role.lambda_proxy_api2sqs.name policy_arn = aws_iam_policy.api2sqs_policy.arn } data "archive_file" "lambda_proxy_sqs2upstream" { type = "zip" source_file = "lambda-src/sqs2upstream/index.js" output_path = "lambda-src/sqs2upstream/index.zip" } resource "aws_lambda_function" "lambda_proxy_sqs2upstream" { filename = data.archive_file.lambda_proxy_sqs2upstream.output_path function_name = local.function_name_sqs2upstream role = aws_iam_role.lambda_proxy_sqs2upstream.arn handler = "index.handler" source_code_hash = data.archive_file.lambda_proxy_sqs2upstream.output_base64sha256 runtime = "nodejs12.x" publish = true reserved_concurrent_executions = var.reserved_concurrent_executions # 並列度は変数で渡す memory_size = 128 timeout = 3 environment { variables = { UPSTREAM = var.upstream } } } resource "aws_cloudwatch_log_group" "lambda_proxy_sqs2upstream" { name = "/aws/lambda/${local.function_name_sqs2upstream}" retention_in_days = 1 } data "archive_file" "lambda_proxy_api2sqs" { type = "zip" source_file = "lambda-src/api2sqs/index.js" output_path = "lambda-src/api2sqs/index.zip" } resource "aws_lambda_function" "lambda_proxy_api2sqs" { filename = data.archive_file.lambda_proxy_api2sqs.output_path function_name = local.function_name_api2sqs role = aws_iam_role.lambda_proxy_api2sqs.arn handler = "index.handler" source_code_hash = data.archive_file.lambda_proxy_api2sqs.output_base64sha256 runtime = "nodejs12.x" publish = true memory_size = 128 timeout = 3 environment { variables = { SQS_URL = aws_sqs_queue.sqs_proxy.id } } } resource "aws_cloudwatch_log_group" "lambda_proxy_api2sqs" { name = "/aws/lambda/${local.function_name_api2sqs}" retention_in_days = 1 }
SQS
SQS を作ります。設定は非常にシンプルです。
resource "aws_sqs_queue" "sqs_proxy" { name = "proxy-queue-${var.app_env}" tags = { Environment = var.app_env } } # この設定により SQS にイベントが入ってきたら sqs2upstream 関数が呼び出される resource "aws_lambda_event_source_mapping" "sqs_proxy" { batch_size = 1 # ここでは 1 にしているがデキューの並列度をあげたい場合はもっと大きな値にする event_source_arn = aws_sqs_queue.sqs_proxy.arn function_name = aws_lambda_function.lambda_proxy_sqs2upstream.arn }
API Gateway
最後に API Gateway を作ります。
resource "aws_apigatewayv2_api" "gw_api_proxy" { name = "gateway_api_proxy-${var.app_env}" protocol_type = "HTTP" } resource "aws_apigatewayv2_stage" "gw_state_proxy" { api_id = aws_apigatewayv2_api.gw_api_proxy.id name = "gateway_state_proxy-${var.app_env}" auto_deploy = true access_log_settings { destination_arn = aws_cloudwatch_log_group.gw_lg_proxy.arn format = jsonencode({ requestId = "$context.requestId" sourceIp = "$context.identity.sourceIp" requestTime = "$context.requestTime" protocol = "$context.protocol" httpMethod = "$context.httpMethod" resourcePath = "$context.resourcePath" routeKey = "$context.routeKey" status = "$context.status" responseLength = "$context.responseLength" integrationErrorMessage = "$context.integrationErrorMessage" } ) } } resource "aws_apigatewayv2_integration" "gw_integration_proxy" { api_id = aws_apigatewayv2_api.gw_api_proxy.id integration_uri = aws_lambda_function.lambda_proxy_api2sqs.invoke_arn integration_type = "AWS_PROXY" integration_method = "POST" } resource "aws_apigatewayv2_route" "gw_route_proxy" { api_id = aws_apigatewayv2_api.gw_api_proxy.id route_key = "POST /" target = "integrations/${aws_apigatewayv2_integration.gw_integration_proxy.id}" } resource "aws_cloudwatch_log_group" "gw_lg_proxy" { name = "/aws/api_gw/${aws_apigatewayv2_api.gw_api_proxy.name}" retention_in_days = 1 } resource "aws_lambda_permission" "api_gw" { statement_id = "AllowExecutionFromAPIGateway" action = "lambda:InvokeFunction" function_name = aws_lambda_function.lambda_proxy_api2sqs.function_name principal = "apigateway.amazonaws.com" source_arn = "${aws_apigatewayv2_api.gw_api_proxy.execution_arn}/*/*" } resource "aws_apigatewayv2_domain_name" "api_gw_domain" { domain_name = "${var.domain_name}" domain_name_configuration { certificate_arn = var.domain_cert_arn endpoint_type = "REGIONAL" security_policy = "TLS_1_2" } } resource "aws_route53_record" "api_gw_domain_record" { name = aws_apigatewayv2_domain_name.api_gw_domain.domain_name type = "A" zone_id = var.zone_id alias { evaluate_target_health = true name = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].target_domain_name zone_id = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].hosted_zone_id } } resource "aws_apigatewayv2_api_mapping" "api_gw_domain_mapping" { api_id = aws_apigatewayv2_api.gw_api_proxy.id stage = aws_apigatewayv2_stage.gw_state_proxy.id domain_name = aws_apigatewayv2_domain_name.api_gw_domain.id }
変数の設定
最後に変数の定義を以下のように書いておしまいです。
app_env = "prod" upstream = "example.com" # 向き先のドメイン reserved_concurrent_executions = 1 # ここで並列度の定義をする zone_id = "zone_id" # 事前に作成した ZONE の id domain_cert_arn = "arn_of_domain_cert" # ACM で作った Certificate の ARN
まとめ
この記事では AWS Lambda と SQS を用いて、外部サービスによる大量アクセスに対応する方法について解説しました。高いリアルタイム性が要求されるようなアプリケーションには向かない方法ですが、既存のアプリケーションに一切手を加える事なく実現出来るので、状況によっては使いやすい手法なのではと考えています。