DROBEプロダクト開発ブログ

DROBEのプロダクト開発周辺の知見や考え方の共有をしていきます

LambdaとSQSを使ってWebhookによるスパイクに対応する

はじめに

CTOの都筑(@tsuzukit2)です。
DROBE では様々な外部サービスを利用していますが、事前に設定しておく事で外部サービス側で特定のイベントが発生した際に DROBE 側に HTTP のリクエストを送ってくれる仕組みを多く利用しています。

Webhook のわかりやすい例としては、例えば SendGrid のような外部サービスでメールの配信を行っている場合に、実際にメールの配信結果がどうだったかをアプリケーションで確実に検知したいといった場合に、SendGrid での配信完了イベントをトリガーとした Webhook を設定します。

SendGrid の Webhook を使ってメールの配信完了を Application で検知する例

こういった仕組みを使う事で、アプリケーション側ではメールの送信をリクエストした事だけではなく、メールがしっかりと配信された事、もしくはエラーとなってしまった事などを検知し DB に保存したり CS に役立てたりなどという事が出来ます。

この記事では Webhook を使う事によって短期間に大量のアクセスが返ってきてしまうような場合に DROBE がどのように対応しているかを解説します。

課題

そもそも Webhook で大量のアクセスが返ってきてしまう場合とはどういった場合でしょうか?先ほどの SendGrid の例で言うと、アプリケーションの機能として任意のタイミングでユーザー全員にメールマガジンを配信したい場合などが考えられます。

例えばユーザーが 100 万人のサービスで一斉に SendGrid のようなメール送信を行う外部サービスに送信リクエストを送ったらどうなるでしょうか? もちろんサービスの規約や内部実装に依存すると思いますが、多くの大規模なサービスではある程度大量のリクエストが来ても特に問題なく捌き素直に webhook を返すという挙動になると思います。その際にアプリケーション側では大量の HTTP リクエストが一気に来る事になるので (この場合では 100 万人分の Webhook が)、自身が作り出した DDOS 攻撃にアプリケーションがされされるというような状況になります。

Lambda と SQS でリクエストをスロットリングする

DROBE ではこの問題に対処するために Lambda と SQS を用いてリクエストをスロットリングする proxy を作って対処しています。

Webhook proxy の概要図

挙動はいたってシンプルで、外部サービスからのリクエストが来たら AWS Lambda によってリクエストを受けて Header と Body をそのまま SQS に入れていきます。同時に別の Lambda を SQS からのリクエストをデキューし、Header と Body を使って HTTP リクエストをアプリケーションに投げなおします。

ここで API Gateway からのリクエストを SQS に入れていくラムダ (api2sqs と呼んでいます) は並列で動作するように設定しておきつつ、SQS からメッセージをデキューしてアプリケーションに HTTP リクエストを投げ直すラムダ (sqs2upstream と呼んでいます) の並列実行数を制限しておきます。

こうする事で、アプリケーションサーバーに一切の変更を加える事なく、Webhook など大量の外部サービスからのアクセスによるアクセス増を一定のレートに制限する事が可能になります。(もちろん SQS の詰まり具合に応じて Webhook がアプリケーションサーバーに届くまでに一定のタイムラグがあるのでリアルタイム性が重要な機能には使えないという欠点があります。)

Terraform によるセットアップ

ここからは上記の webhook proxy の terraform によるセットアップを解説していきます。

前提として、API Gateway で利用するドメインはすでに準備が出来ていて zone までは手動で切ってあり、ドメインに来たアクセスは zone にくるようにセットアップが出来ているものとします。また SSL のための Certificate も ACM を使って事前に準備しているものとしています。

Lambda のソースコード

まずは API Gateway からリクエストを受け取って SQS に入れる所のラムダのソースコードを書きます。ここでは js で書いていますがどんな言語でも問題ありません。

const https = require('https');
var AWS = require('aws-sdk');

exports.handler = async (event) => {
  const sqsUrl = process.env.SQS_URL # ここでは terraform 側から変数として渡している

  const messageBody = {
    'resource': event.resource,
    'headers': JSON.parse(event.headers),
    'body': JSON.parse(event.body)
  }
  
  var params = {
    MessageBody: JSON.stringify(messageBody),
    QueueUrl: sqsUrl,
   };
  const sqs = new AWS.SQS();
  const queueResponse = await sqs.sendMessage(params).promise()
  
  return {
      "statusCode": 200
  };
};

次に SQS からデキューしてアプリケーション側に HTTP リクエストを投げ直す処理を書きます。

const https = require('https');

exports.handler = event => {
  const upstream = process.env.UPSTREAM # ここも terraform 側から変数として渡している
  const path = process.env.PATH

  event.Records.forEach(record => {
    
    const { body } = record;
    const parsedBody = JSON.parse(body)

    const data = JSON.stringify(parsedBody.body)
    const options = {
        host: upstream,
        path: path,
        port: 443,
        headers: parsedBody.headers
        method: 'POST',
    }

    const req = https.request(options, res => {
      console.log(`statusCode: ${res.statusCode}`)
    }).on('error', error => {
      console.error(error)
    })

    req.write(data)
    req.end()
  });

};

Lambda

次に Terraform で Lambda を作ります。

locals {
  function_name_sqs2upstream = "lambda_proxy_sqs2upstream-${var.app_env}"
}

locals {
  function_name_api2sqs = "lambda_proxy_api2sqs-${var.app_env}"
}

data "aws_iam_policy_document" "lambda_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]

    principals {
      type = "Service"
      identifiers = [
        "lambda.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role" "lambda_proxy_sqs2upstream" {
  name               = "lambda_proxy-sqs2upstream-${var.app_env}"
  assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
}

resource "aws_iam_role" "lambda_proxy_api2sqs" {
  name               = "lambda_proxy-api2sqs-${var.app_env}"
  assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
}

data "aws_iam_policy" "lambda_execution_policy" {
  arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

# SQS からメッセージを受けたり、メッセージを削除する権限を付与する
data "aws_iam_policy_document" "sqs2upstream_policy_document" {
  source_json = data.aws_iam_policy.lambda_execution_policy.policy

  statement {
    effect = "Allow"

    actions = [
      "sqs:ReceiveMessage",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes"
    ]

    resources = [aws_sqs_queue.sqs_proxy.arn]
  }
}

resource "aws_iam_policy" "sqs2upstream_policy" {
  name   = "sqs2upstream_policy-${var.app_env}"
  policy = data.aws_iam_policy_document.sqs2upstream_policy_document.json
}

resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream" {
  role       = aws_iam_role.lambda_proxy_sqs2upstream.name
  policy_arn = aws_iam_policy.sqs2upstream_policy.arn
}

resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream_vpc_execution_role" {
  role       = aws_iam_role.lambda_proxy_sqs2upstream.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}

# SQS にメッセージを送ったり削除したりする権限を付与する
data "aws_iam_policy_document" "api2sqs_policy_document" {
  source_json = data.aws_iam_policy.lambda_execution_policy.policy

  statement {
    effect = "Allow"

    actions = [
      "sqs:SendMessage",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes"
    ]

    resources = [aws_sqs_queue.sqs_proxy.arn]
  }
}

resource "aws_iam_policy" "api2sqs_policy" {
  name   = "api2sqs_policy-${var.app_env}"
  policy = data.aws_iam_policy_document.api2sqs_policy_document.json
}

resource "aws_iam_role_policy_attachment" "lambda_proxy_api2sqs" {
  role       = aws_iam_role.lambda_proxy_api2sqs.name
  policy_arn = aws_iam_policy.api2sqs_policy.arn
}

data "archive_file" "lambda_proxy_sqs2upstream" {
  type        = "zip"
  source_file = "lambda-src/sqs2upstream/index.js"
  output_path = "lambda-src/sqs2upstream/index.zip"
}

resource "aws_lambda_function" "lambda_proxy_sqs2upstream" {
  filename         = data.archive_file.lambda_proxy_sqs2upstream.output_path
  function_name    = local.function_name_sqs2upstream
  role             = aws_iam_role.lambda_proxy_sqs2upstream.arn
  handler          = "index.handler"
  source_code_hash = data.archive_file.lambda_proxy_sqs2upstream.output_base64sha256
  runtime          = "nodejs12.x"

  publish                        = true
  reserved_concurrent_executions = var.reserved_concurrent_executions # 並列度は変数で渡す

  memory_size = 128
  timeout     = 3
  environment {
    variables = {
      UPSTREAM = var.upstream
    }
  }
}

resource "aws_cloudwatch_log_group" "lambda_proxy_sqs2upstream" {
  name              = "/aws/lambda/${local.function_name_sqs2upstream}"
  retention_in_days = 1
}

data "archive_file" "lambda_proxy_api2sqs" {
  type        = "zip"
  source_file = "lambda-src/api2sqs/index.js"
  output_path = "lambda-src/api2sqs/index.zip"
}

resource "aws_lambda_function" "lambda_proxy_api2sqs" {
  filename         = data.archive_file.lambda_proxy_api2sqs.output_path
  function_name    = local.function_name_api2sqs
  role             = aws_iam_role.lambda_proxy_api2sqs.arn
  handler          = "index.handler"
  source_code_hash = data.archive_file.lambda_proxy_api2sqs.output_base64sha256
  runtime          = "nodejs12.x"

  publish = true

  memory_size = 128
  timeout     = 3
  environment {
    variables = {
      SQS_URL = aws_sqs_queue.sqs_proxy.id
    }
  }
}

resource "aws_cloudwatch_log_group" "lambda_proxy_api2sqs" {
  name              = "/aws/lambda/${local.function_name_api2sqs}"
  retention_in_days = 1
}

SQS

SQS を作ります。設定は非常にシンプルです。

resource "aws_sqs_queue" "sqs_proxy" {
  name = "proxy-queue-${var.app_env}"
  tags = {
    Environment = var.app_env
  }
}

# この設定により SQS にイベントが入ってきたら sqs2upstream 関数が呼び出される
resource "aws_lambda_event_source_mapping" "sqs_proxy" {
  batch_size       = 1 # ここでは 1 にしているがデキューの並列度をあげたい場合はもっと大きな値にする
  event_source_arn = aws_sqs_queue.sqs_proxy.arn
  function_name    = aws_lambda_function.lambda_proxy_sqs2upstream.arn
}

API Gateway

最後に API Gateway を作ります。

resource "aws_apigatewayv2_api" "gw_api_proxy" {
  name          = "gateway_api_proxy-${var.app_env}"
  protocol_type = "HTTP"
}

resource "aws_apigatewayv2_stage" "gw_state_proxy" {
  api_id = aws_apigatewayv2_api.gw_api_proxy.id

  name        = "gateway_state_proxy-${var.app_env}"
  auto_deploy = true

  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.gw_lg_proxy.arn

    format = jsonencode({
      requestId               = "$context.requestId"
      sourceIp                = "$context.identity.sourceIp"
      requestTime             = "$context.requestTime"
      protocol                = "$context.protocol"
      httpMethod              = "$context.httpMethod"
      resourcePath            = "$context.resourcePath"
      routeKey                = "$context.routeKey"
      status                  = "$context.status"
      responseLength          = "$context.responseLength"
      integrationErrorMessage = "$context.integrationErrorMessage"
      }
    )
  }
}

resource "aws_apigatewayv2_integration" "gw_integration_proxy" {
  api_id = aws_apigatewayv2_api.gw_api_proxy.id

  integration_uri    = aws_lambda_function.lambda_proxy_api2sqs.invoke_arn
  integration_type   = "AWS_PROXY"
  integration_method = "POST"
}

resource "aws_apigatewayv2_route" "gw_route_proxy" {
  api_id = aws_apigatewayv2_api.gw_api_proxy.id

  route_key = "POST /"
  target    = "integrations/${aws_apigatewayv2_integration.gw_integration_proxy.id}"
}

resource "aws_cloudwatch_log_group" "gw_lg_proxy" {
  name              = "/aws/api_gw/${aws_apigatewayv2_api.gw_api_proxy.name}"
  retention_in_days = 1
}

resource "aws_lambda_permission" "api_gw" {
  statement_id  = "AllowExecutionFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.lambda_proxy_api2sqs.function_name
  principal     = "apigateway.amazonaws.com"

  source_arn = "${aws_apigatewayv2_api.gw_api_proxy.execution_arn}/*/*"
}

resource "aws_apigatewayv2_domain_name" "api_gw_domain" {
  domain_name = "${var.domain_name}"

  domain_name_configuration {
    certificate_arn = var.domain_cert_arn
    endpoint_type   = "REGIONAL"
    security_policy = "TLS_1_2"
  }
}

resource "aws_route53_record" "api_gw_domain_record" {
  name    = aws_apigatewayv2_domain_name.api_gw_domain.domain_name
  type    = "A"
  zone_id = var.zone_id

  alias {
    evaluate_target_health = true
    name                   = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].target_domain_name
    zone_id                = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].hosted_zone_id
  }
}

resource "aws_apigatewayv2_api_mapping" "api_gw_domain_mapping" {
  api_id      = aws_apigatewayv2_api.gw_api_proxy.id
  stage       = aws_apigatewayv2_stage.gw_state_proxy.id
  domain_name = aws_apigatewayv2_domain_name.api_gw_domain.id
}

変数の設定

最後に変数の定義を以下のように書いておしまいです。

app_env                        = "prod"
upstream                       = "example.com" # 向き先のドメイン
reserved_concurrent_executions = 1 # ここで並列度の定義をする
zone_id                        = "zone_id" # 事前に作成した ZONEid
domain_cert_arn                = "arn_of_domain_cert" # ACM で作った CertificateARN

まとめ

この記事では AWS Lambda と SQS を用いて、外部サービスによる大量アクセスに対応する方法について解説しました。高いリアルタイム性が要求されるようなアプリケーションには向かない方法ですが、既存のアプリケーションに一切手を加える事なく実現出来るので、状況によっては使いやすい手法なのではと考えています。


DROBE開発組織の紹介
組織情報ポータル