DROBEプロダクト開発ブログ

DROBEのプロダクト開発周辺の知見や考え方の共有をしていきます

LLM をサービスから使うために Lambda で API Wrapper を作る

はじめに

こんにちわ、DROBE の都筑です。

OpenAI の提供する API を始めとして LLM をサービスで活用されている、もしくはこれから活用しようとしている方は多いと思います。 一方で、OpenAI が公式に提供している Library は Python と Node.js のみなので既存のサービスに直接インテグレーションする事が難しい場合も多いのではないでしょうか。

そういったケースに対応するために、 AWS Lambda で API の Wrapper 関数を作り公開し、サービスからはその API を叩くという構成のシステムをセットアップしたのでこの記事で解説します。

作るもの

以下のような構成のものを GitHub actions と terraform で構築していきます。

全体の構成

Lambda を採用した理由は以下になります。

  • LLM を利用した機能を実装するサーバーになるが、入出力を揃えて API を叩くだけで重たい処理を行わない
  • ECS などのサービスを利用して常時 API を立てておくよりも金額的なメリットがある
  • 弊社が AWS を中心にインフラを構築しているので使いやすい

Lambda は python の container を利用します。Lambda の実行時に SSM から OpenAI の API Key を取得して API を叩く構成です。 クライアントからはカスタムドメインを貼った API Gateway 経由で Lambda にアクセスする構成としています。Log は Cloudwatch に、Metrics は Datadog に送り Lambda の実行回数などをモニタリングできるようにします。

解説

では、それぞれ解説していきます。

Lambda 関数の設計

まず Lambda 関数の設計ですが、以下のような要件を満たせるようにしたいと考えました。

  • LLM を利用したい機能は複数出てくる事が想定されるので、新機能を作るたびにインフラを作らなくて良いようにしたい
  • OpenAI 以外の API も気軽に試せるような構成にしたい

これらを踏まえて、以下のような Clean Architecture 風な File 構成で Lambda の内部を書きました。

├── main.py
└── src
    ├── domain // interface の定義
    ├── infra // interface の実装
    │   └── openai // openai に依存した API の定義 (prompt もこの下で定義)
    └── usecase
        └── interactor.py

main.py には lambda の entry point となる関数を書きます。 main.py の主な役割は DI (interactor の準備) と request の validation と response の整形です。

domain に usecase で使う処理の interface を定義します。例として与えられた入力から特定の商品情報を抜き出す処理の interface は以下のような定義にしました。

RelatedItemExtractionResponse = TypedDict(
    "RelatedItemExtractionResponse", {"items": List[str]}
)


class IFRelatedItemExtraction(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def handle(self, request_text: str) -> RelatedItemExtractionResponse:
        pass

interactor 内部で business logic を呼び出す処理を書きます。ここでは LLM の呼び出し処理を行います。もちろん、このレイヤーは infra には直接依存しないレイヤーになります。

class Interactor:
    def __init__(
        self,
        repo: IFRelatedItemExtraction,
    ):
        self.repo = repo

    def extract(self, request_text: str) -> RelatedItemExtractionResponse:
        return self.repo.handle(request_text)

実際に API を呼び出す処理や prompt の定義は infra 以下に書きます。特定の商品情報を抜き出す処理の実装はこのような形です。

MODEL_NAME = "gpt-3.5-turbo-16k" # gpt-4 なども指定できる。Application に合わせて指定する。

SYSTEM_PROMPT = "" # システムプロンプト を書く

PROMPT = """
{passage}
"""

class OpenAIRelatedItemExtraction:
    @timeout_decorator.timeout(60) # timeout させるための decoration
    def chat_complete(self, request_text: str):
        return openai.ChatCompletion.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": PROMPT.format(passage=request_text)},
            ],
            max_tokens=4098,
            temperature=0,
        )

    def handle(self, request_text: str) -> RelatedItemExtractionResponse:
        output = self.chat_complete(request_text)
        content = output["choices"][0]["message"]["content"]
        return json.loads(content)

ポイントはモデルの指定やプロンプトをここに書いている事です。プロンプトなどは実装の詳細なので、ifra 以外のレイヤーでは意識しなくて良くなり、プロンプトの修正や呼び出し方の変更 (例えば Function calling を使うように修正するなど) をしても、infra レイヤー以外の修正は行わなくて良くなります。

また、こういった構成にしておくことで、例えば OpenAI をやめて別の LLM に変更するなどをする場合でも、domain で定義された interface に沿って infra を新規で作り main.py で interactor を作る時にどちらを使うかを選べるため、 比較コードの修正量を抑えながら AB テストなどを行う事ができます。

Lamda へのリクエストは以下のような形に統一しています。

{
  "type": "function_name",
  "input": "input_text"
}

type によって interactor のどの処理を呼び出すかを決定するようにします。 この構成により、新しく Lambda を Deploy しなくても Lambda のソースコードを変更することで新しいを機能を追加できます。 もちろんコンテナのサイズが肥大化していくという懸念はありますが、API の Wrapper であることを想定しているのである程度までは機能を増やしても問題ないと考えました。

CI/CD

コンテナのビルドと Lambda の更新は GitHub Actions で行います。

CI/CD

Actions は以下のような yaml で定義しました。

name: Build

on:
  push:
    branches:
      - main

env:
  CONTAINER_NAME: xxx # make の中で定義しているコンテナ名
  REPO_NAME: yyy # ECR repo の名前
  LAMBDA_FUNCTION_NAME: zzz # terraform で定義する Lambda 関数の名前

jobs:
  build:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.ML_AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.ML_AWS_SECRET_ACCESS_KEY }}
          aws-region: ap-northeast-1

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Build and push to ecr
        run: |
          make build # コンテナを build するためのコマンド。コンテナ名はこの内部で定義している。
          docker tag ${CONTAINER_NAME}:latest ${REPO_NAME}:latest
          docker push <repo-name>:latest
          docker tag ${CONTAINER_NAME}:latest ${REPO_NAME}:${GITHUB_SHA}
          docker push ${REPO_NAME}:${GITHUB_SHA}
  
      - name: Deploy to lambda
        run: |
          aws lambda update-function-code --function-name ${LAMBDA_FUNCTION_NAME} --image-uri ${REPO_NAME}:latest

環境の構築

Lambda の実行環境は terraform で作ります。 それぞれのリソースに合わせて file を分割して作りました。 ここでは point となる lambda function と api gateway の部分だけピックアップして説明します。

まず lambda function の定義です。

resource "aws_lambda_function" "service_llm_lambda" {
  depends_on = [
    aws_cloudwatch_log_group.service_llm_lambda, # cloudwatch の log group を定義しておきます
  ]

  function_name = "service-llm-lambda-${var.app_env}"
  package_type  = "Image"
  image_uri     = "${aws_ecr_repository.ecr_repo.repository_url}:latest" # ECR repository の名前です
  role          = aws_iam_role.lambda_role.arn # Lambda の実行時の権限です。後述しますが SSM から機密情報を読み出せる権限を渡しました

  memory_size = 128
  timeout     = 60 # OpenAI の API は通常の API よりも実行時間が長い傾向があるので長めにしておきます

  lifecycle {
    ignore_changes = [
      image_uri, last_modified
    ]
  }

  image_config {
    command = ["datadog_lambda.handler.handler"] # 後述しますが Datadog にメトリクスを送るためにコマンドを上書きしておきます。
  }

  environment {
    variables = {
      DD_LAMBDA_HANDLER = "main.lambda_handler", # コマンドを上書しているので、実際のハンドラーはここに記載します。
      DD_SITE           = "datadoghq.com",
      DD_API_KEY        = data.aws_ssm_parameter.dd_api_key.value, ## Datadog の API key を parameter store から渡します。この書き方だとコンソールから鍵が見えてしまうので実際はそうならないように工夫が必要です。
      DD_TRACE_ENABLED  = "true"
    }
  }

}

api gateway はこんな感じです。HTTP モードでの設定とし、あらかじめ Route53 に作っておいたカスタムドメインと紐付けます。

resource "aws_apigatewayv2_api" "gw_api" {
  name          = "gateway_api_service-llm-lambda-${var.app_env}"
  protocol_type = "HTTP"
  cors_configuration {
    allow_origins = ["*"] # FE から直接呼び出したいというニーズがある場合には CORS 設定をしておく。実際には `*` ではなくちゃんとドメインを指定しましょう
    allow_methods = ["POST", "GET", "OPTIONS"]
    allow_headers = ["*"]
    max_age       = 300
  }
}

resource "aws_apigatewayv2_stage" "gw_stage" {
  api_id = aws_apigatewayv2_api.gw_api.id

  name        = "gateway_api_service-llm-lambda-${var.app_env}"
  auto_deploy = true

  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.gw_lg.arn
}

resource "aws_apigatewayv2_integration" "gw_integration" {
  api_id = aws_apigatewayv2_api.gw_api.id

  integration_uri    = aws_lambda_function.service_llm_lambda.invoke_arn
  integration_type   = "AWS_PROXY"
  integration_method = "POST"
}

resource "aws_apigatewayv2_route" "gw_route" {
  api_id = aws_apigatewayv2_api.gw_api.id

  route_key = "POST /invoke"
  target    = "integrations/${aws_apigatewayv2_integration.gw_integration.id}"
}

resource "aws_cloudwatch_log_group" "gw_lg" {
  name              = "/aws/api_gw/service-llm-lambda"
  retention_in_days = 1
}

resource "aws_lambda_permission" "api_gw" {
  statement_id  = "AllowExecutionFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.service_llm_lambda.function_name
  principal     = "apigateway.amazonaws.com"

  source_arn = "${aws_apigatewayv2_api.gw_api.execution_arn}/*/*"
}

resource "aws_apigatewayv2_domain_name" "api_gw_domain" {
  domain_name = "${var.app_env}.xxx" # あらかじめ route53 に作っておいた domain 名

  domain_name_configuration {
    certificate_arn = var.domain_cert_arn # あらかじめ ACM で取得しておきた certificate の ARN を指定
    endpoint_type   = "REGIONAL"
    security_policy = "TLS_1_2"
  }
}

resource "aws_route53_record" "api_gw_domain_record" { # ここで Route53 の ZONE に API Gateway にリクエストを向けるための A record を追加
  name    = aws_apigatewayv2_domain_name.api_gw_domain.domain_name
  type    = "A"
  zone_id = var.zone_id

  alias {
    evaluate_target_health = true
    name                   = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].target_domain_name
    zone_id                = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].hosted_zone_id
  }
}

resource "aws_apigatewayv2_api_mapping" "api_gw_domain_mapping" {
  api_id      = aws_apigatewayv2_api.gw_api.id
  stage       = aws_apigatewayv2_stage.gw_stage.id
  domain_name = aws_apigatewayv2_domain_name.api_gw_domain.id
}

設定のポイント

Timeout について

OpenAI の API を通常の API よりも実行時間が長い傾向があります。(公式の python sdk の設定 は 2023年8月現在、default の timeout はなんと 600sec です!)

そのため Lambda の実行時間を 60sec など長めにとっておくのが良いと思います。

また、コード自体でも timeout を適切に入れておくのが望ましいです。 今回は OpenAI の SDK に依存する形ではなく timeout-decorator という library を使って時前で timeout させるようにしました。

@timeout_decorator.timeout(60)
    def chat_complete(self, request_text: str):
        return openai.ChatCompletion.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": PROMPT.format(passage=request_text)},
            ],
            max_tokens=2048,
            temperature=0,
        )

API Key をどこから取り出すか

API Key は機密情報なので Lambda の内部で ssm を経由して取り出すようにしました。 Lambda の Execution role に SSM の該当リソースを取得してデコードする権限を渡しておきます。

data "aws_iam_policy_document" "lambda_execution_policy" {

  ... 省略

  statement {
    effect = "Allow"
    actions = [
      "kms:Decrypt",
      "ssm:GetParameter"
    ]
    resources = [
      "arn:aws:ssm:*:*:parameter/xxx" # parameter 名
    ]
  }
}

python 側では Lambda 起動時に以下のような処理が走るような実装にしています。ただしこれは OpenAI のみで必要な処理なので infra レイヤー内部に実装を入れています。

import boto3
import openai

ssm = boto3.client("ssm", region_name="ap-northeast-1")
response = ssm.get_parameter(
    Name="xxx", WithDecryption=True
)
openai.api_key = response["Parameter"]["Value"]

Datadog との接続

Datadog にメトリクスを送るには、以下の 2 点を行う必要があります。

  1. コンテナに Datadog に関連するライブラリを入れておく
  2. コンテナな起動設定でエントリポイントを Datadog のライブラリに上書きし、必要な環境変数を設定する

1 に関しては Dockerfile に public.ecr.aws/datadog/lambda-extension:latest から設定をコピーする設定を書きつつ、 datadog-lambda ライブラリをインストールします。今回は python のパッケージ管理を poetry で行っているので、 pyproject.toml に設定を追記します。

FROM public.ecr.aws/lambda/python:3.10

... 省略

COPY --from=public.ecr.aws/datadog/lambda-extension:latest /opt/extensions/ /opt/extensions

... 省略

RUN poetry install --no-root
[tool.poetry.dependencies]
python = "^3.10"
openai = "^0.27.8"
boto3 = "^1.28.22"
timeout-decorator = "^0.5.0"
datadog-lambda = "^4.78.0" # ここ

詳しい設定はこちらを参考にしてください。

docs.datadoghq.com

設定が完了した上で Lambda を実行すると、コンソールの Menu で Infrastructure -> Serverless 内で図のようにメトリクスが表示されるようになります。

Datadog のコンソール

ハマりポイント

Lambda 関数の event について

api gateway で integration した場合には event の中身が変化するという事を理解できておらず、ややハマりました。

コンソールからのテストでは正常に動きましたが、postman などで api を試すと internal serveer error が返ってきてしまいました。

{"message":"Internal Server Error"}

調べていくと、公式に以下のような解説がありました。

ペイロード形式バージョンでは、API Gateway が Lambda 統合に送信するデータの形式と、API Gateway が Lambda からのレスポンスをどのように解釈するかを指定します。ペイロード形式バージョンを指定しない場合、AWS Management Console はデフォルトで最新バージョンを使用します。

docs.aws.amazon.com

Api Gateway からの呼び出しの際には event の構造が変化するので、それに対応するように event からパラメータを取り出したり、レスポンスの形式を揃える必要があります。

インフラ的に解決する道もあると思いますが、簡易的に以下の関数を main.py に定義して対応しました。

def get_request(event):
    if "type" not in event:
        event = json.loads(event["body"])

    request_type = event["type"]
    request_text = event["input"]

    return request_type, request_text

def create_response(response: dict):
    return {
        "statusCode": 200,
        "body": json.dumps(response),
    }

さいごに

OpenAI などの LLM を活用するための Lambda の構成について解説しました。 がっつり使う場合はまた別の構成があると思いますが、LLM をちょっとした機能で使う場合には出来る限りインフラを追加で作らずに機能を追加できるような構成にしておく事にはメリットがあると思います。

ECS で GPU を使った ML 系 Task の実行環境をセットアップする

この記事を書いた人

DROBE の都筑 (@tsuzukit2) です

簡単なプロフィールはこちらをご覧ください

はじめに

機械学習系の機能を開発していると、GPU を利用してトレーニングを行いたいケースが多々あると思います。

この記事では、ECS で GPU を使った ML 系 Task の実行環境のセットアップについて記載します。

作りたいもの

作りたいものの概要はこのようなものです。

ECS で構築する GPU を利用した Task の実行環境

GPU は高価なので、常時起動しているインスタンスは 0 としておきつつ、Task が作られたらインスタンスを起動、Task を実行、Task の実行が終わったらインスタンスを落とし 0 に戻す、という環境をセットアップします。

ECS の capacity provider と autoscaling group を紐付け、ECS Task が起動 / 終了のタイミングで必要なインスタンスが変更される構成です。

ECS Task は goofys を利用して S3 をマウントする事とします。S3 をマウントするのは、画像系機械学習モデルのトレーニングに大量の画像が必要であり、それを S3 にマウントする事でコードで S3 の API などを意識せずに使えるようにするためです。

goofys については、こちらの repo を参考にしてください。

ECS で実行するタスクの定義と実行

ECS で実行するタスク定義は事前に Terraform で作っておく事としました。Task 定義の中で実際に training を行うコンテナは latest tag のものを実行するように指定しておきます。 Task を実行する際には、GitHub Actions にてコンテナをビルドし ECR に push (ここで latest tag のイメージが更新されます) し、AWS CLI を使って Task を起動するという流れです。

GitHub Actions は以下のようなものになります。

name: invoke ml training

on:
  workflow_dispatch:

jobs:
  build:
    name: build container image
    runs-on: ubuntu-latest

    steps:
    - name: source checkout
      uses: actions/checkout@v3

    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v2
      with:
        aws-access-key-id: ${{ AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ AWS_SECRET_ACCESS_KEY }}
        aws-region: ap-northeast-1

    - name: Login to Amazon ECR
      id: login-ecr
      uses: aws-actions/amazon-ecr-login@v1

    - name: Build and push to ecr
      run: |
        make build # 自作のコンテナを build するためのコマンド
        docker tag trainer:latest xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/trainer:latest
        docker push xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/trainer:latest:latest
        docker tag trainer:latest xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/trainer:${GITHUB_SHA}
        docker push xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/trainer:${GITHUB_SHA}

  run:
    name: run ecs task
    runs-on: ubuntu-latest
    needs: [build]

    steps:

    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v2
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: ap-northeast-1

    - name: run-task
      env:
        CLUSTER_NAME: ml-train # terraform で定義した cluster 名に合わせる
        FAMILY_NAME: trainer # terraform で定義した task 定義に合わせる
      run: |
        TASK_DEF_ARN=$(aws ecs list-task-definitions --family-prefix "${FAMILY_NAME}" --query "reverse(taskDefinitionArns)[0]" --output text)
        echo "${TASK_DEF_ARN}"
        TASK_ARN=$(aws ecs run-task --cluster ${CLUSTER_NAME} --task-definition ${TASK_DEF_ARN} --query tasks[0].taskArn --output text)
        TASK_ID=$(echo "${TASK_ARN}" | grep -oE "[^/]+$")

Terraform

全体の構成が決まったので Terraform の設定を書いていきます。 (注 公開するために命名などを修正しており動作未検証なのでコピペでの使用は避けてください)

まずは VPC などネットワーク周りの設定を書きます。

# VPC
resource "aws_vpc" "vpc_name" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true
  enable_dns_support   = true

  tags = {
    Name = "vpc_name"
  }
}

# Public subnet
resource "aws_subnet" "subnet" {
  vpc_id                  = aws_vpc.vpc_name.id
  availability_zone       = "ap-northeast-1a"
  cidr_block              = "10.0.1.0/24"
  map_public_ip_on_launch = true
}

resource "aws_internet_gateway" "ig" {
  vpc_id = aws_vpc.vpc_name.id
}

resource "aws_route_table" "rt" {
  vpc_id = aws_vpc.vpc_name.id
}

resource "aws_route" "route" {
  route_table_id         = aws_route_table.rt.id
  destination_cidr_block = "0.0.0.0/0"
  gateway_id             = aws_internet_gateway.ig.id
}

resource "aws_route_table_association" "rta" {
  subnet_id      = aws_subnet.subnet.id
  route_table_id = aws_route_table.rt.id
}

resource "aws_security_group" "sg" {
  name   = "sg"
  vpc_id = aws_vpc.vpc_name.id
  depends_on = [
    aws_vpc.vpc_name
  ]

  ingress {
    from_port   = "0"
    to_port     = "0"
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  egress {
    from_port   = "0"
    to_port     = "0"
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

次に ECR などを書いていきます

# ECR
resource "aws_ecr_repository" "ecr_repo" {
  name     = "trainer"
}

# ECR Lifecycle policy 
resource "aws_ecr_lifecycle_policy" "lsp" {
  repository = each.value.ecr_repo

  policy = <<EOF
{
    "rules": [
        {
            "rulePriority": 1,
            "description": "Keep last 2 images",
            "selection": {
                "tagStatus": "tagged",
                "tagPrefixList": ["v"],
                "countType": "imageCountMoreThan",
                "countNumber": 2
            },
            "action": {
                "type": "expire"
            }
        }
    ]
}
EOF
}

続いて Task 定義を作ります

ここで goofys や GPU を使うための設定を行います

# Task 定義
resource "aws_ecs_task_definition" "task" {

  family                   = "trainer"
  requires_compatibilities = ["EC2"]
  network_mode             = "bridge"
  cpu                      = 2048
  memory                   = 8192

  task_role_arn      = aws_iam_role.iam.arn
  execution_role_arn = aws_iam_role.iam.arn

  container_definitions = jsonencode([
    {
      image     = "xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/${aws_ecr_repository.ecr_repo.name}:latest" # ここで latest を指定する
      essential = true,
      name      = "trainer"
      cpu       = 2048,
      memory    = 8192,
      logConfiguration = {
        logDriver = "awslogs",
        options = {
          awslogs-group         = aws_cloudwatch_log_group.ml_image_recognition_train.name,
          awslogs-region        = "ap-northeast-1",
          awslogs-stream-prefix = "ml_image_recognition"
        }
      },
      linuxParameters = { # goofys を使うための設定
        capabilities = {
          add = [
            "MKNOD",
            "SYS_ADMIN"
          ]
        },
        "devices" : [
          {
            "hostPath" : "/dev/fuse",
            "containerPath" : "/dev/fuse",
            "permissions" : [
              "read",
              "write",
              "mknod"
            ]
          }
        ]
      },
      environment = [
        {
          name  = "NVIDIA_DRIVER_CAPABILITIES",
          value = "all"
        }
      ],
      resourceRequirements = [ # GPU を使うための設定
        {
          type  = "GPU",
          value = "1"
        }
      ]
    }
  ])
}

resource "aws_iam_role" "iam" {
  name = "ecs-iam"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "ecs-tasks.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}

resource "aws_iam_role_policy" "iam_policy" {
  role = aws_iam_role.iam.id

  policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogGroups",
        "logs:DescribeLogStreams"
      ],
      "Effect": "Allow",
      "Resource": "${aws_cloudwatch_log_group.lg.arn}"
    },
    {
      "Action": [
        "s3:ListBucket",
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Effect": "Allow",
      "Resource": [
        "*" # goofys でマウントしたいバケットを指定する
      ]
    }
  ]
}
EOF
}

resource "aws_cloudwatch_log_group" "trainer" {
  name              = "/ecs/logs/prod/trainer"
  retention_in_days = 14
}

最後に cluster の定義や Autoscaling Group の設定を書いていきます。aws_laumch_template で使いたいインスタンスタイプを指定します。ここでは g4dn.xlarge を指定しています。

# ecs cluster
resource "aws_ecs_cluster" "cluster" {
  name = "ml-train"
}

resource "aws_ecs_cluster_capacity_providers" "trainer" {
  cluster_name = aws_ecs_cluster.cluster.name

  capacity_providers = [aws_ecs_capacity_provider.trainer.name]

  default_capacity_provider_strategy {
    base              = 0
    weight            = 1
    capacity_provider = aws_ecs_capacity_provider.trainer.name
  }

}

resource "aws_ecs_capacity_provider" "trainer" {
  name = "trainer"

  auto_scaling_group_provider {
    auto_scaling_group_arn         = aws_autoscaling_group.trainer.arn
    managed_termination_protection = "ENABLED"
    managed_scaling {
      maximum_scaling_step_size = 10
      minimum_scaling_step_size = 1
      status                    = "ENABLED"
      target_capacity           = 100
    }
  }
}

resource "aws_autoscaling_group" "trainer" {
  name                      = "trainer"
  max_size                  = 1
  min_size                  = 0
  health_check_grace_period = 0
  health_check_type         = "EC2"
  desired_capacity          = 0
  vpc_zone_identifier       = [aws_subnet.trainer.id]

  launch_template {
    id      = aws_launch_template.trainer.id
    version = "$Latest"
  }

  tag {
    # ECSにスケーリングをお願いするために必要なタグ
    # https://docs.aws.amazon.com/ja_jp/AmazonECS/latest/developerguide/cluster-auto-scaling.html#update-ecs-resources-cas
    key                 = "AmazonECSManaged"
    value               = ""
    propagate_at_launch = true
  }

  lifecycle {
    ignore_changes = [
      desired_capacity,
    ]
  }

}

locals {
  node_group_user_data = <<-EOF
  #!/bin/bash
  set -o xtrace
  echo ECS_CLUSTER=${aws_ecs_cluster.ml_image_recognition_train.name} >> /etc/ecs/ecs.config;
  EOF
}

resource "aws_launch_template" "trainer" {
  name = "trainer"

  # https://docs.aws.amazon.com/ja_jp/AmazonECS/latest/developerguide/retrieve-ecs-optimized_AMI.html
  image_id      = "ami-087da40e7559e6193"
  instance_type = "g4dn.xlarge"
  key_name      = "xxxxx"

  vpc_security_group_ids = [aws_security_group.trainer.id]

  block_device_mappings {
    device_name = "/dev/xvda"

    ebs {
      volume_size = 200
    }
  }

  instance_market_options {
    market_type = "spot" # spot の方が金額は安いが長時間の学習では spot だと止まってしまう事があった
  }

  iam_instance_profile {
    arn = aws_iam_instance_profile.trainer.arn
  }

  user_data = base64encode(format(local.node_group_user_data))
}

data "aws_iam_policy" "AmazonEC2ContainerServiceforEC2Role" {
  arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"
}

resource "aws_iam_role_policy_attachment" "trainer" {
  for_each = toset([
    "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy",
    "arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly",
  ])
  role       = aws_iam_role.trainer.name
  policy_arn = each.value
}

data "aws_iam_policy" "AmazonEC2ContainerServiceforEC2Role" {
  arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"
}

resource "aws_iam_role" "trainer" {
  name               = "trainer"
  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "trainer" {
  policy_arn = data.aws_iam_policy.AmazonEC2ContainerServiceforEC2Role.arn
  role       = aws_iam_role.trainer.name
}

resource "aws_iam_instance_profile" "trainer" {
  role = aws_iam_role.trainer.name
}

CapacityProviderReservation について

ECS のインスタンス数を増減させる仕組みとして CapacityProvider を使います。それらの説明は公式のブログに詳しいので一読ください。

Amazon ECS クラスターの Auto Scaling を深く探る | Amazon Web Services

CapacityProvider を利用した Autoscaling においては、クラスターが必要とするインスタンス数と実際に稼働しているインスタンスの数を比率で表した CapacityProviderReservation という値をあらかじめ設定しておき、その値になるようにインスタンス数を ASG が自動で調節します。若干解り伝いのですが、公式の説明を読むとイメージが掴めます。

CAS の中心的な責任は、ASG に割り当てられたタスクの必要を満たす上で「適切な」数のインスタンスが ASG で実行されるようにすることです。これには、既に実行されているタスクと、既存のインスタンスには収まらない、顧客が実行しようとしているタスクの両方が含まれます。その数を M としましょう。また、既に実行されている ASG 内の現在のインスタンス数を N とします。この記事の残りの部分では M と N が繰り返し出てくるので、これらをどう考えたらよいかを十分明確に理解しておくことが重要です。  今のところ、M をいくらにしたらよいかを知る方法は説明していませんが、議論の目的上、M は必要数であるとだけ仮定します。この仮定の下で、もし N = M であるとするなら、スケールアウトは必要ではなく、スケールインは不可能です。一方、N < M なら、十分なインスタンスがないことになるので、スケールアウトが必要です。  最後に N > M なら、スケールインが可能です (ただし必要だとは限りません)。自分の ECS タスクすべてを実行するのに必要な数よりも多くのインスタンスが存在しているからです。また、後ほど見るように、N と M に基づく新しい CloudWatch メトリクスを定義し、それをCapacityProviderReservation と呼ぶことにします。N と M が与えられたときのこのメトリクスの定義は非常にシンプルです。

簡単に説明するなら、このメトリクスは、ASG の実際の大きさと必要な大きさとの比を、パーセント単位で表したものです。

terraform ではこの CapacityProviderReservation を設定する形になります

resource "aws_ecs_capacity_provider" "trainer" {
  name = "trainer"

  auto_scaling_group_provider {
    auto_scaling_group_arn         = aws_autoscaling_group.trainer.arn
    managed_termination_protection = "ENABLED"
    managed_scaling {
      maximum_scaling_step_size = 10
      minimum_scaling_step_size = 1
      status                    = "ENABLED"
      target_capacity           = 100 # <- ここ
    }
  }
}

ここで 100 を指定するという事は、「タスクを実行するのにちょうど必要な数のインスタンスを準備してください」ということになります。

例えばここで 200 を指定しておくと、「タスクを実行するのに必要なインスタンスの数の 2 倍のインスタンスを準備してください」ということです。

CapacityProviderReservation を 100 にしておくことで、タスクのリクエストが無い時はインスタンスの数が 0 になるように出来ます。(GPU インスタンスは非常に高いので使っていない時に 0 に出来るという事はメリットが大きいはずです)

Trouble Shooting

ここからは、環境を構築するにあたって実際にハマってしまったポイントを 2 つ紹介します。

インスタンスが起動はするが ECS に参加しない

ECS クラスターが作られ EC2 インスタンスも建っているのに、training が始まらない場合は EC2 が ECS クラスターに参加できているかを確認してみてください。

EC2 インスタンスを ECS クラスターに参加させるためには、EC2 の /etc/ecs/ecs.configECS_CLUSTER 環境変数をセットする必要がありました。

terraform では以下の辺りになります。

locals {
  node_group_user_data = <<-EOF
  #!/bin/bash
  set -o xtrace
  echo ECS_CLUSTER=${aws_ecs_cluster.trainer.name} >> /etc/ecs/ecs.config;
  EOF
}

resource "aws_launch_template" "trainer" {
  name = "trainer"

  ...省略
  
  user_data = base64encode(format(local.node_group_user_data))
}

Training が 20 時間程度で終わってしまう

環境を構築してから何回か training を実行しましたが、どうしても 20 時間程度で勝手に training が終了してしまうという現象に遭遇しました。

最初はメモリーなどを疑いましたが試しに無限に sleep をし続ける training task を作って実行しても同様の結果になってしまいました。

調査した結果、どうやらスポットインスタンスを使っている事が原因だったようです。

スポットインスタンスの設定を削除した結果数日に及ぶようなトレーニングでも実行を行える事が確認できました。

terraform ではこの辺りになります。

resource "aws_launch_template" "trainer" {
  
  ... 省略

  instance_market_options {
    market_type = "spot" # spot の方が金額は安いが長時間の学習では spot だと止まってしまう事があった
  }

}

おわりに

ML をやっていると GPU を使いたくなる事は多々あり、かつ本番環境への Deploy などを考えると自動のパイプラインとして GPU によるトレーニングを組みたくなる事は多いと思います。

一方で GPU インスタンスは非常に高価であるため、使っていない時は落としておきたいと思うのが人情だと思います。

この記事では ECS で GPU を使った ML 系 Task の実行環境のセットアップについて解説しました。特に必要のない時にはインスタンス数を 0 にしておける設定なので、特にコストにシビアなスタートアップの方などに少しでも参考になれば嬉しいです。

PHPカンファレンス2023にゴールドスポンサーとして協賛します

EMの加川(@shinpr_p)です。
DROBE は PHP Conference Japan 2023 にゴールドスポンサーとして協賛します。

PHPカンファレンスは、2000年より年に一度開催されている日本最大のPHPのイベントです。WEBサーバにインストールされているシェア8割を超える人気言語のイベントとして、初心者から上級者まで幅広い層のWEB系エンジニアが参加します。

今回はオフラインでの開催となる予定ですので、DROBEからも数名のメンバーが参加します。ブース出展も行いますので、ご参加される皆さまぜひ交流させてください。

PHP Conference Japan 2023 の概要

phpcon.php.gr.jp

開催日

October 8 - 8, 2023.

開催地

大田区産業プラザPiO.

さいごに

DROBE は「すべての人がポジティブに意思決定し、自分を楽しめる世界」というビジョンを目指し、パーソナルスタイリングサービス「DROBE」を提供しています。

技術コミュニティが創出した成果を活用することで、DROBEというプロダクトを素早くデリバリーすることが可能となっています。技術コミュニティの恩恵をただ享受するだけでなく、業界の発展のために微力ながら貢献していきたいと考え、活動の一環として技術イベントへのスポンサードを行っています。
今後も当社の活用する技術領域にまつわるスポンサードや情報発信を継続的に行っていきます!

当社の紹介

DROBEのプロダクト開発組織に関する情報をまとめておりますので、少しでも興味を持っていただけたらぜひ参照ください。 info.drobe.co.jp

LambdaとSQSを使ってWebhookによるスパイクに対応する

はじめに

CTOの都筑(@tsuzukit2)です。
DROBE では様々な外部サービスを利用していますが、事前に設定しておく事で外部サービス側で特定のイベントが発生した際に DROBE 側に HTTP のリクエストを送ってくれる仕組みを多く利用しています。

Webhook のわかりやすい例としては、例えば SendGrid のような外部サービスでメールの配信を行っている場合に、実際にメールの配信結果がどうだったかをアプリケーションで確実に検知したいといった場合に、SendGrid での配信完了イベントをトリガーとした Webhook を設定します。

SendGrid の Webhook を使ってメールの配信完了を Application で検知する例

こういった仕組みを使う事で、アプリケーション側ではメールの送信をリクエストした事だけではなく、メールがしっかりと配信された事、もしくはエラーとなってしまった事などを検知し DB に保存したり CS に役立てたりなどという事が出来ます。

この記事では Webhook を使う事によって短期間に大量のアクセスが返ってきてしまうような場合に DROBE がどのように対応しているかを解説します。

課題

そもそも Webhook で大量のアクセスが返ってきてしまう場合とはどういった場合でしょうか?先ほどの SendGrid の例で言うと、アプリケーションの機能として任意のタイミングでユーザー全員にメールマガジンを配信したい場合などが考えられます。

例えばユーザーが 100 万人のサービスで一斉に SendGrid のようなメール送信を行う外部サービスに送信リクエストを送ったらどうなるでしょうか? もちろんサービスの規約や内部実装に依存すると思いますが、多くの大規模なサービスではある程度大量のリクエストが来ても特に問題なく捌き素直に webhook を返すという挙動になると思います。その際にアプリケーション側では大量の HTTP リクエストが一気に来る事になるので (この場合では 100 万人分の Webhook が)、自身が作り出した DDOS 攻撃にアプリケーションがされされるというような状況になります。

Lambda と SQS でリクエストをスロットリングする

DROBE ではこの問題に対処するために Lambda と SQS を用いてリクエストをスロットリングする proxy を作って対処しています。

Webhook proxy の概要図

挙動はいたってシンプルで、外部サービスからのリクエストが来たら AWS Lambda によってリクエストを受けて Header と Body をそのまま SQS に入れていきます。同時に別の Lambda を SQS からのリクエストをデキューし、Header と Body を使って HTTP リクエストをアプリケーションに投げなおします。

ここで API Gateway からのリクエストを SQS に入れていくラムダ (api2sqs と呼んでいます) は並列で動作するように設定しておきつつ、SQS からメッセージをデキューしてアプリケーションに HTTP リクエストを投げ直すラムダ (sqs2upstream と呼んでいます) の並列実行数を制限しておきます。

こうする事で、アプリケーションサーバーに一切の変更を加える事なく、Webhook など大量の外部サービスからのアクセスによるアクセス増を一定のレートに制限する事が可能になります。(もちろん SQS の詰まり具合に応じて Webhook がアプリケーションサーバーに届くまでに一定のタイムラグがあるのでリアルタイム性が重要な機能には使えないという欠点があります。)

Terraform によるセットアップ

ここからは上記の webhook proxy の terraform によるセットアップを解説していきます。

前提として、API Gateway で利用するドメインはすでに準備が出来ていて zone までは手動で切ってあり、ドメインに来たアクセスは zone にくるようにセットアップが出来ているものとします。また SSL のための Certificate も ACM を使って事前に準備しているものとしています。

Lambda のソースコード

まずは API Gateway からリクエストを受け取って SQS に入れる所のラムダのソースコードを書きます。ここでは js で書いていますがどんな言語でも問題ありません。

const https = require('https');
var AWS = require('aws-sdk');

exports.handler = async (event) => {
  const sqsUrl = process.env.SQS_URL # ここでは terraform 側から変数として渡している

  const messageBody = {
    'resource': event.resource,
    'headers': JSON.parse(event.headers),
    'body': JSON.parse(event.body)
  }
  
  var params = {
    MessageBody: JSON.stringify(messageBody),
    QueueUrl: sqsUrl,
   };
  const sqs = new AWS.SQS();
  const queueResponse = await sqs.sendMessage(params).promise()
  
  return {
      "statusCode": 200
  };
};

次に SQS からデキューしてアプリケーション側に HTTP リクエストを投げ直す処理を書きます。

const https = require('https');

exports.handler = event => {
  const upstream = process.env.UPSTREAM # ここも terraform 側から変数として渡している
  const path = process.env.PATH

  event.Records.forEach(record => {
    
    const { body } = record;
    const parsedBody = JSON.parse(body)

    const data = JSON.stringify(parsedBody.body)
    const options = {
        host: upstream,
        path: path,
        port: 443,
        headers: parsedBody.headers
        method: 'POST',
    }

    const req = https.request(options, res => {
      console.log(`statusCode: ${res.statusCode}`)
    }).on('error', error => {
      console.error(error)
    })

    req.write(data)
    req.end()
  });

};

Lambda

次に Terraform で Lambda を作ります。

locals {
  function_name_sqs2upstream = "lambda_proxy_sqs2upstream-${var.app_env}"
}

locals {
  function_name_api2sqs = "lambda_proxy_api2sqs-${var.app_env}"
}

data "aws_iam_policy_document" "lambda_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]

    principals {
      type = "Service"
      identifiers = [
        "lambda.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role" "lambda_proxy_sqs2upstream" {
  name               = "lambda_proxy-sqs2upstream-${var.app_env}"
  assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
}

resource "aws_iam_role" "lambda_proxy_api2sqs" {
  name               = "lambda_proxy-api2sqs-${var.app_env}"
  assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
}

data "aws_iam_policy" "lambda_execution_policy" {
  arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

# SQS からメッセージを受けたり、メッセージを削除する権限を付与する
data "aws_iam_policy_document" "sqs2upstream_policy_document" {
  source_json = data.aws_iam_policy.lambda_execution_policy.policy

  statement {
    effect = "Allow"

    actions = [
      "sqs:ReceiveMessage",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes"
    ]

    resources = [aws_sqs_queue.sqs_proxy.arn]
  }
}

resource "aws_iam_policy" "sqs2upstream_policy" {
  name   = "sqs2upstream_policy-${var.app_env}"
  policy = data.aws_iam_policy_document.sqs2upstream_policy_document.json
}

resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream" {
  role       = aws_iam_role.lambda_proxy_sqs2upstream.name
  policy_arn = aws_iam_policy.sqs2upstream_policy.arn
}

resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream_vpc_execution_role" {
  role       = aws_iam_role.lambda_proxy_sqs2upstream.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}

# SQS にメッセージを送ったり削除したりする権限を付与する
data "aws_iam_policy_document" "api2sqs_policy_document" {
  source_json = data.aws_iam_policy.lambda_execution_policy.policy

  statement {
    effect = "Allow"

    actions = [
      "sqs:SendMessage",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes"
    ]

    resources = [aws_sqs_queue.sqs_proxy.arn]
  }
}

resource "aws_iam_policy" "api2sqs_policy" {
  name   = "api2sqs_policy-${var.app_env}"
  policy = data.aws_iam_policy_document.api2sqs_policy_document.json
}

resource "aws_iam_role_policy_attachment" "lambda_proxy_api2sqs" {
  role       = aws_iam_role.lambda_proxy_api2sqs.name
  policy_arn = aws_iam_policy.api2sqs_policy.arn
}

data "archive_file" "lambda_proxy_sqs2upstream" {
  type        = "zip"
  source_file = "lambda-src/sqs2upstream/index.js"
  output_path = "lambda-src/sqs2upstream/index.zip"
}

resource "aws_lambda_function" "lambda_proxy_sqs2upstream" {
  filename         = data.archive_file.lambda_proxy_sqs2upstream.output_path
  function_name    = local.function_name_sqs2upstream
  role             = aws_iam_role.lambda_proxy_sqs2upstream.arn
  handler          = "index.handler"
  source_code_hash = data.archive_file.lambda_proxy_sqs2upstream.output_base64sha256
  runtime          = "nodejs12.x"

  publish                        = true
  reserved_concurrent_executions = var.reserved_concurrent_executions # 並列度は変数で渡す

  memory_size = 128
  timeout     = 3
  environment {
    variables = {
      UPSTREAM = var.upstream
    }
  }
}

resource "aws_cloudwatch_log_group" "lambda_proxy_sqs2upstream" {
  name              = "/aws/lambda/${local.function_name_sqs2upstream}"
  retention_in_days = 1
}

data "archive_file" "lambda_proxy_api2sqs" {
  type        = "zip"
  source_file = "lambda-src/api2sqs/index.js"
  output_path = "lambda-src/api2sqs/index.zip"
}

resource "aws_lambda_function" "lambda_proxy_api2sqs" {
  filename         = data.archive_file.lambda_proxy_api2sqs.output_path
  function_name    = local.function_name_api2sqs
  role             = aws_iam_role.lambda_proxy_api2sqs.arn
  handler          = "index.handler"
  source_code_hash = data.archive_file.lambda_proxy_api2sqs.output_base64sha256
  runtime          = "nodejs12.x"

  publish = true

  memory_size = 128
  timeout     = 3
  environment {
    variables = {
      SQS_URL = aws_sqs_queue.sqs_proxy.id
    }
  }
}

resource "aws_cloudwatch_log_group" "lambda_proxy_api2sqs" {
  name              = "/aws/lambda/${local.function_name_api2sqs}"
  retention_in_days = 1
}

SQS

SQS を作ります。設定は非常にシンプルです。

resource "aws_sqs_queue" "sqs_proxy" {
  name = "proxy-queue-${var.app_env}"
  tags = {
    Environment = var.app_env
  }
}

# この設定により SQS にイベントが入ってきたら sqs2upstream 関数が呼び出される
resource "aws_lambda_event_source_mapping" "sqs_proxy" {
  batch_size       = 1 # ここでは 1 にしているがデキューの並列度をあげたい場合はもっと大きな値にする
  event_source_arn = aws_sqs_queue.sqs_proxy.arn
  function_name    = aws_lambda_function.lambda_proxy_sqs2upstream.arn
}

API Gateway

最後に API Gateway を作ります。

resource "aws_apigatewayv2_api" "gw_api_proxy" {
  name          = "gateway_api_proxy-${var.app_env}"
  protocol_type = "HTTP"
}

resource "aws_apigatewayv2_stage" "gw_state_proxy" {
  api_id = aws_apigatewayv2_api.gw_api_proxy.id

  name        = "gateway_state_proxy-${var.app_env}"
  auto_deploy = true

  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.gw_lg_proxy.arn

    format = jsonencode({
      requestId               = "$context.requestId"
      sourceIp                = "$context.identity.sourceIp"
      requestTime             = "$context.requestTime"
      protocol                = "$context.protocol"
      httpMethod              = "$context.httpMethod"
      resourcePath            = "$context.resourcePath"
      routeKey                = "$context.routeKey"
      status                  = "$context.status"
      responseLength          = "$context.responseLength"
      integrationErrorMessage = "$context.integrationErrorMessage"
      }
    )
  }
}

resource "aws_apigatewayv2_integration" "gw_integration_proxy" {
  api_id = aws_apigatewayv2_api.gw_api_proxy.id

  integration_uri    = aws_lambda_function.lambda_proxy_api2sqs.invoke_arn
  integration_type   = "AWS_PROXY"
  integration_method = "POST"
}

resource "aws_apigatewayv2_route" "gw_route_proxy" {
  api_id = aws_apigatewayv2_api.gw_api_proxy.id

  route_key = "POST /"
  target    = "integrations/${aws_apigatewayv2_integration.gw_integration_proxy.id}"
}

resource "aws_cloudwatch_log_group" "gw_lg_proxy" {
  name              = "/aws/api_gw/${aws_apigatewayv2_api.gw_api_proxy.name}"
  retention_in_days = 1
}

resource "aws_lambda_permission" "api_gw" {
  statement_id  = "AllowExecutionFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.lambda_proxy_api2sqs.function_name
  principal     = "apigateway.amazonaws.com"

  source_arn = "${aws_apigatewayv2_api.gw_api_proxy.execution_arn}/*/*"
}

resource "aws_apigatewayv2_domain_name" "api_gw_domain" {
  domain_name = "${var.domain_name}"

  domain_name_configuration {
    certificate_arn = var.domain_cert_arn
    endpoint_type   = "REGIONAL"
    security_policy = "TLS_1_2"
  }
}

resource "aws_route53_record" "api_gw_domain_record" {
  name    = aws_apigatewayv2_domain_name.api_gw_domain.domain_name
  type    = "A"
  zone_id = var.zone_id

  alias {
    evaluate_target_health = true
    name                   = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].target_domain_name
    zone_id                = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].hosted_zone_id
  }
}

resource "aws_apigatewayv2_api_mapping" "api_gw_domain_mapping" {
  api_id      = aws_apigatewayv2_api.gw_api_proxy.id
  stage       = aws_apigatewayv2_stage.gw_state_proxy.id
  domain_name = aws_apigatewayv2_domain_name.api_gw_domain.id
}

変数の設定

最後に変数の定義を以下のように書いておしまいです。

app_env                        = "prod"
upstream                       = "example.com" # 向き先のドメイン
reserved_concurrent_executions = 1 # ここで並列度の定義をする
zone_id                        = "zone_id" # 事前に作成した ZONEid
domain_cert_arn                = "arn_of_domain_cert" # ACM で作った CertificateARN

まとめ

この記事では AWS Lambda と SQS を用いて、外部サービスによる大量アクセスに対応する方法について解説しました。高いリアルタイム性が要求されるようなアプリケーションには向かない方法ですが、既存のアプリケーションに一切手を加える事なく実現出来るので、状況によっては使いやすい手法なのではと考えています。

開発プロセス版ADRで振り返る開発チームの改善活動

こんにちは。EMの加川(@shinpr_p)です。
本記事は、開発プロセスの変更を残したADRを眺めながら、スクラムの適用など開発チームの改善と向き合ってきた2-3ヶ月を振り返ります。

はじめに

前提知識としてADRについて説明します。
ADRは Architecture Decision Records の略で、アーキテクチャの決定を記録したドキュメント群です。
「意思決定の過程を残すこと」が特徴で、「検討したが導入しなかったもの」や「運用を進めることで適切ではなくなったため廃止したもの」、「技術選定の過程で選ばれなかった選択肢」など、よく歴史的経緯と言われがちな過程の情報が残ることが好きで、私はここ数年よく利用しています。

cloud.google.com

実際のADR

実際のADRがこちらです。
意思決定の関係者はby nameが含まれるので、公開用に加工しています。
以下、それぞれの意思決定について振り返っていきます。

実際のADR

イベント(会議体)の運用

これまでは、よくある「スクラム風な開発プロセス」でした。
この提案から、「継続的に価値を届けられるよう成果やプロセスを点検できるチームになる」ことをテーマに、開発プロセスの見直しを行なっています。
この時点では明確に「スクラムのフレームワークを導入する」とは言っておらず、「スクラムのエッセンスを盛り込む」という表現をしていました。

変更したこと

  • タイムボックスを1週間から2週間にする
  • プランニングMTGの進行を見直す
  • 成果報告、進捗共有、各種振り返りと散らばっていたMTGを整理し、Sprint Review / Retrospective に集約

タイムボックスの変更

個人的には1週間のままで良かったのですが、スクラムイベントをしっかりやろうとしたときに「MTGの時間が多くなりすぎる」と言われがちなので、細かくやっていたMTGの時間と同等にできる2週間をリズムとすることにしました。
最近はステークホルダーとのコミュニケーションやデモを見ることの重要性をチームが感じ始めてきているため、いずれ1週間に変更ということもあり得るかもしれません。

プランニングMTGの進行を見直す

これまでは、やることの共有とタスクアサインの決定、定期リリースのためのリリース計画の確認を行なっていました。
それを、Sprint PlanningとしてSprint Goalの決定とSprint Backlogの作成、完了条件(Acceptance Criteria: AC)の認識合わせを行うように変更しました。
ステークホルダーの召集もアジェンダには入れていましたが、当初はあまり召集されていませんでした。直近のスプリントで一度招集し意義が感じられたことで、最近のスプリントでは積極的に召集が検討されるようになってきています。

Sprint Review / Retrospective

「週次のやったこと共有」「週次or隔週の障害振り返り」「月次の数値共有」「月次のKPT」と、レビュー・振り返りの要素が分散した状態で運営されていました。そこで、「リズムを作り、学びを次に活かしやすくする」ために、Sprint Review / Retrospectiveとして統廃合を行いました。
Reviewでは新たに「ゴールの共有」「できなかったことの共有」「デモ・ステークホルダーの召集」が盛り込まれ、振り返りはKPTからYWTに形を変えつつ「ネクストアクションを必ず出す」ことに重きをおくようにしました。

www.jmac.co.jp

印象に残っていること

「スクラムのエッセンスを盛り込む」と表現しているように、比較的ゆるめにフレームワークを適用しています。
そんな中、粒度が大きいPBIに取り組むときに「折角やるのだから守破離の守でMVPを意識した分割をしよう」というムーブメントが発生したことが印象に残っています。

AWS のコスト最適化について

チーム運営とは関係ないため本記事では割愛します

QAの振る舞いの変更

これまでは、コードレビューが終わったものをQAが品質保証し、保証されたものがリリースされるというフローでした。
これを、以下のように変更しようとしています。

  • プロダクトにとって重要な機能を定義し、重要な機能はQAが品質保証をし、リリースする
  • 重要な機能以外は、基本的にはエンジニアが品質保証をし、リリースする
  • QAの基本的な振る舞いを次のように定義する
    • 完了条件に関与し、品質保証の観点が盛り込まれている状態にする
    • コードレビューに関与し、品質保証の観点でフィードバックをする
  • ゆくゆくは、PdM/エンジニアが品質の知識を身に付け、QAではなくチームが品質を保証できるようにする(QA Enabling)

現状はまだまだ道半ばですが、少しずつでも前進していきます。

DROBEのリリースフローの変更について

手動で作業が必要な箇所を一部自動化し、リリースにかかる時間を短縮しました。
また、直近Four Keysの計測が始まったこともあり、これまでは定期リリースだったものが、ある程度まとめてリリースになり、現在ではリリースできる状態になったら順次リリースが行われるようになっています。

デプロイ頻度の推移

今後は「変更のリードタイム」を改善していく予定です。Sprint Backlogでのタスク分解にトライしつつ、放置気味だったPRのCloseや古くからあり対応に苦慮していたタスクのリリースに向けた作業に取り組んでいます。

MLが関係する開発のプロセス

MLの領域は専門性が高く、透明性が担保しづらい(実験や実装が特定個人に閉じてしまい、他のメンバーが内容を把握できていない状態)という課題がありました。
また、MLと機能開発のチームがゆるく分離しており、活動の母体が2つあるような状態となっていました。これにより、何もしないと足並みを揃えることが難しいという課題もありました。

これらの課題を緩和すべく、以下の変更を加えました。

  • MLの活動リズムも2週間にし、タイムボックスの終わりにReview(MLエンジニアの活動状況や実験結果の共有)とRetrospective(MLにまつわるプロセス・システムを眺め、改善することがないか探る)を実施することにした
  • Discoveryという概念を作り、機能開発との連携に活用するようにした
    • 実験の状況(実験したい項目がいくつあり、どれに取り組んでいて、どのPBIと関係しているのか)が分かりやすくなる
    • 実験で明らかにする観点が明確になる

ReviewにはPdMも招待しており、MLによってできることの理解やPdMが困っていることの把握といった相互理解を育むことも期待しています。
私が入社する以前にはML開発チームがステークホルダーとコミュニケーションを取りながらMLで何をやろうかというディスカッションをしていたらしいですが、最近では実施されなくなってしまっていました。
MLとビジネス/ユーザー価値を繋げていかないと価値の種が枯渇してしまうため、本活動をブラッシュアップさせていくことが急務だと感じています。

ビルドトラップを回避するため、Story Issueのテンプレートとフローを変更する

MLの活動をきっかけに、Discoveryの概念を機能開発チーム(以下スクラムチーム)全体に適用することにしました。
端的に書くと、「以下の観点を満たさないものはProduct Backlogに積まないので、Discoveryという過程を経てそれらを明らかにしてね」ということを強制するようになりました。

  • ビジネス的な価値(複利の効くInvestの削減 or Investを上回るReturn)があるか
  • ユーザーにとって価値があるか
  • ユーザーが使い方がわかるか
  • 現実的な工数での実現が可能か

Issue(当社ではGitHub Issueを用いている)のテンプレートに4つの観点をチェックボックスとして追加し、POは4つの観点が満たされていると判断したらPBIに優先順位をつけてProduct Backlogに積むようにしています。
まだ試行錯誤中のため少々Product Backlogが心許なくなっていますが、協力し乗り越えていきたいと考えています。

Sprint Goalに関する変更

Sprint Goalを廃止する

直近のSprint Planningの中で、「このPBIをやることがSprint Goalと同義であるため、あえて抽象的なゴールを作ると混乱するのではないか?」という課題提起があり、それに対する明確な対案が見出せなかったことから、Sprint Goalを廃止したスプリントに取り組んでみることにしました。
PlanningのMTGの中で「Sprint Goalを合意する」アジェンダに要する時間がそれなりの比重となっていたことも、意思決定を後押しした要素となっています。

Sprint Goalを導入する

Sprint Goalの無いスプリントに取り組む中で、「Sprint Goalがないことの課題 / あることのメリット」の言語化が進みました。その説明をし、結果的にSprint Goalは設けようという結論になり、最近のスプリントからはゴールを考えるようになっています。

  • 優先順位としてはROIが高いものが上がりやすく、1つの施策(タスク)で大きい効果が見込まれるもの(Returnが高く、Investが低いもの)から順に着手することになりがちである
    • Sprint Goalというテーマがあることで、相対的にROIが高くない(Return > Investであるのは必須)タスク群に取り組むことも可能になるのではないか?
  • ゴールが無いことで「ゴールを達成するためにこれをやろう」ではなく「このタスク群を終わらせることがゴール」になってしまい、創意工夫の余地がなくなってしまう
    • 変化に弱くなってしまう
    • 不確実性を取り込むことが困難である

最近好きなスライドがあって、ここに書かれている考え方も参考にしました。

www.docswell.com

今後は「Sprint Goalを合意するために全員が集まる時間をどう短縮するか」と「Sprint Goalを達成するためのタスク群を如何に早く作り込むか」という課題に向き合っていきます。

さいごに

ADRを眺めながら、この2-3ヶ月を振り返りました。
チームはまだまだ改善の真っ只中ですが、変化が起きていることや試行錯誤できていることをポジティブに感じています。
今後も自己管理されたチームを目指して改善を続けていきます。

PyTorchのEmbeddingの挙動についてまとめてみた

はじめに

CTOの都筑(@tsuzukit2)です

この記事では PyTorch の Embedding の挙動について記載します

Embedding とは何か

公式の仕様書はこちらになります

Embedding - PyTorch 1.9.0 documentation

公式の説明は以下となっており、非常に的を得ていると思います

A simple lookup table that stores embeddings of a fixed dictionary and size.

意訳すると、 固定長の辞書埋め込みを保存するシンプルなルックアップテーブル になるんじゃないかなと思います。Embedding は、何だか難しそうにも思えてしまうのですが、ここに記載されている通り非常にシンプルなテーブルでしかないという事です

モジュールの解説としては以下のように記載があります

This module is often used to store word embeddings and retrieve them using indices. The input to the module is a list of indices, and the output is the corresponding word embeddings.

こちらも意訳すると、 このモジュールはワードエンベディングを保存するために使われる事が多く、インデックスによって取得されます。このモジュールへの入力はインデックスのリスト、出力は対応するワードエンベディングになります というような感じかなと思います。

こちらは割と理解しずらいと思うので、以下細かく挙動を見ていきます

Embedding の挙動の確認

挙動を確認するために実際に動かして内部の動作を確認していきます

まず、Embedding を初期化します

>>> import torch
>>> from torch import nn

>>> torch.manual_seed(42) // 再現性のために seed を固定します
>>> emb = nn.Embedding(2, 5) // 2 x 5 次元の embedding を作ります

この時点で emb.weight の中身を確認すると以下のようになります

>>> print(emb.weight)
Parameter containing:
tensor([[ 0.3367,  0.1288,  0.2345,  0.2303, -1.1229],
        [-0.1863,  2.2082, -0.6380,  0.4617,  0.2674]], requires_grad=True)

2 x 5 次元のベクトルがランダムに初期化されている事がわかります

ここで [ 0.3367, 0.1288, 0.2345, 0.2303, -1.1229] が index 0 に、 [-0.1863, 2.2082, -0.6380, 0.4617, 0.2674] が index 1 に対応する事が想像されます

実際に Embedding を通してアクセスしてみると、入力した index に対応する Embedding が取得できるのが確認できます

>>> print(emb(torch.tensor([0]))) // index 0 を入力としたいが、tensor にする必要がある
tensor([[ 0.3367,  0.1288,  0.2345,  0.2303, -1.1229]],
       grad_fn=<EmbeddingBackward>)

>>> print(emb(torch.tensor([1])))
tensor([[-0.1863,  2.2082, -0.6380,  0.4617,  0.2674]],
       grad_fn=<EmbeddingBackward>)

>>> print(emb(torch.tensor([0, 1])))
tensor([[ 0.3367,  0.1288,  0.2345,  0.2303, -1.1229],
        [-0.1863,  2.2082, -0.6380,  0.4617,  0.2674]],
       grad_fn=<EmbeddingBackward>)

公式の説明にある 固定長の辞書埋め込みを保存するシンプルなルックアップテーブル という事が理解できます

Embedding の学習

Embedding がシンプルなルックアップテーブルだという事は理解できましたが、ランダムに作られたベクトルというだけでは何の役にも立ちません。入力に対して学習をしてこのベクトルに意味を持たせる事が大事です。ここでは Embedding がどのように学習していくのかを見ていきます

まず、Embedding の状態を確認したいので、weight の grad という値を確認します

>>> print(emb.weight.grad)
None

学習を何もしていない状態では grad は特に何も無い事がわかります

では学習を進めるために Optimizer を作ります

>>> optimizer = torch.optim.SGD(emb.parameters(), lr=0.1, momentum=0.9)

torch.optim.SGD は embedding の parameter を受け取ります。これによって optimizer object を通して embedding の parameter を確認したり更新したりする事ができるようになります。また lr は learning rate になります

optimizer を作った上で Embedding の loss を計算します ここでは適当に Embedding の index 0 と index 1 のユークリッド距離が最小になるような学習をしたいという事にします

>>> loss = torch.linalg.norm(emb(torch.tensor([0])) - emb(torch.tensor([1])))
>>> print(loss)
tensor(2.7101, grad_fn=<CopyBackwards>)

これは embedding[0] と embedding[1] のユークリッド距離を単純に計算しているだけです

確認のため NumPy で計算しても同じ結果になります

>>> import numpy as np
>>> a = np.array((0.3367,  0.1288,  0.2345,  0.2303, -1.1229))
>>> b = np.array((-0.1863,  2.2082, -0.6380,  0.4617,  0.2674))
>>> dist = np.linalg.norm(a-b)
>>> print(dist)
2.7101973470579592

この時点では単純に loss を計算しているだけなので、grad はまだ特に更新されていません

>>> print(emb.weight.grad)
None

loss.backward 実行するとはじめて emb.weight.grad に値がはいります

>>> loss.backward()
>>> print(emb.weight.grad)
tensor([[ 0.1930, -0.7673,  0.3219, -0.0854, -0.5130],
        [-0.1930,  0.7673, -0.3219,  0.0854,  0.5130]])

ただし、ここでもまだ weight 自体は更新されていません

>>> print(emb.weight)
Parameter containing:
tensor([[ 0.3367,  0.1288,  0.2345,  0.2303, -1.1229],
        [-0.1863,  2.2082, -0.6380,  0.4617,  0.2674]], requires_grad=True)

最後に optimizer.step() を実行する事で emb.weight.grad から計算された値を使って weight が更新されます

>>> optimizer.step()
>>> print(emb.weight)
Parameter containing:
tensor([[ 0.3174,  0.2055,  0.2023,  0.2389, -1.0716],
        [-0.1670,  2.1315, -0.6058,  0.4531,  0.2161]], requires_grad=True)

>>> print(emb.weight.grad)
tensor([[ 0.1930, -0.7673,  0.3219, -0.0854, -0.5130],
        [-0.1930,  0.7673, -0.3219,  0.0854,  0.5130]])

この計算には optimizer の初期化時に指定した learning rate が使われています

検算するとわかりますが、新しい weight は weight - (grad x learning_rate) の式で計算されます

例えば embedding[0][0] の値はもともと 0.3367 でしたが 0.3367 - (0.1930 * 0.1) されて 0.3174 に更新されています

これを 1 step として学習を繰り返していく事で、loss が最小になるように Embedding が更新されていきます

for i in range(10):
  optimizer.zero_grad()
  loss = torch.linalg.norm(emb(torch.tensor([0])) - emb(torch.tensor([1])))
  loss.backward()
  optimizer.step()

まとめ

PyTorch の Embedding の挙動について記載しました

挙動を追ってみる事で公式の説明の通り、非常にシンプルなルックアップテーブルである事がわかりました

DROBE開発組織のキャリアパスや評価方針について

こんにちは、EMの加川(@shinpr_p)です。
今回は、DROBE開発組織におけるエンジニア職のキャリアパス定義、評価の方針やプロセスについてお伝えします。

個人が目指したいキャリアと会社の方針のマッチ度が高ければ、効率的なキャリアアップを実現できる可能性が高まります。
会社ごとに評価方針は異なります。そこで、DROBEの開発組織としての考え方を明らかにし、ご自身のイメージするキャリアとフィットするか判断可能な状態を作りたいと考えました。

今後アップデートされる可能性はありますが、現時点での定義をお伝えしていきます。
ご自身のキャリアイメージと照らし合わせ本記事を参照いただけると幸いです。

評価方針とエンジニアとしての「能力」の定義

エンジニアの評価について、基本的には「能力」を評価するようにしています。フェーズが進みRACIでいう説明責任(Accountability)1を果たすことが期待されることで「成果」での評価に軸足が移るという設計にしています。

能力の定義

現時点で組織規模も大きくなく評価者(EMおよびCTO)も少数であるため、厳密な定義をすることによる運用コストはリターンに見合わないと考えています。
そのため、「xxxという技術要素を活用し、xxxが出来る」というような定義はしておらず、「自身の職能の専門性をもって制御可能な影響範囲の広さ」を能力として定義しています。

フェーズの定義

フェーズごとに期待する振る舞いと、能力を発揮してほしい影響範囲を定義しています。
フェーズが変わるごとに影響範囲が広がりつつ、説明責任が求められていきます。
表で示すと以下のようになります。

フェーズ 期待する振る舞い 能力を発揮してほしい影響範囲
初期 自身の職能の範囲で、与えられたタスクを自己完結できる 自身、1Sprint
チーム・プロダクト・システムの改善を通してユーザに価値を届けることができる
自身の職能の範囲でRACIでいう実行責任(Responsibility)を持ち、特定テーマの開発ができる
チーム、1ヶ月
中期 プロダクト・システムの改善を通してユーザに価値を届けることに実行責任を持って開発できる
自身の職能の範囲で継続的な技術品質の改善ができる
チーム、四半期
プロダクト・システムの改善を通してユーザに継続的に価値を届けるチームを主導できる
自身の職能の範囲で継続的な技術品質の改善を計画し、遂行の説明責任を果たせる
事業全体、1年
後期 チーム・プロダクト・システムが継続的に改善されることに説明責任を果たせる
自身の職能の範囲でこれまでにない価値を創出する計画を立て、遂行の説明責任を果たせる
会社・業界、3年2

※ マネジメントはフェーズごとに期待される振る舞いを実現できる組織をつくることが責務になる

キャリアパスの定義

SWE(Software Engineer)に関して、基本となるキャリアパスを下記のとおり定義しています。
基本形はこの定義としつつ、分岐するフェーズの前後においては「自身のキャリアイメージと擦り合うか」を話し合います。
「定義以外のキャリアパスを一切許容しない」のではなく、「定義と一致するか、一致しない場合どういった乖離があるかが可視化され、議論できる」ことを大切にします。
もちろん基本形から大幅な乖離がある場合に希望を完全に叶えることはできません。そういった場合でも基準となる定義があることで議論の空中戦を避けることができると考えています。

SWEのキャリアパス

各キャリアパスの期待値

マネジメント

  • 組織の構造にアプローチし、組織の開発生産性を最大化させることに責任を持つ
  • 組織図上のマネージャの責務を担い、ピープルマネジメント(含む人事評価)の責任が生じる

スペシャリスト

プロダクト・システムの改善を通して、ユーザに継続的に価値を届けることに責任を持つ

エキスパート

新たな技術価値の創出を通して、会社の成長や業界の発展に貢献することに責任を持つ

キャリアパス上で定義している「ラベル」への期待値の定義

「ラベル」とは、「定義された責務を担うことができる能力を有している個人に付与される役割」です。能力を満たしていると評価された時点から、個人に付与されます。
現時点で言語化済みのラベルについて、下記にまとめました。

👩‍💻LeadEngineer

ビジネスドメインの技術リーダーとして、チームがユーザに継続的に価値を届けることに実行責任を負う

must

  • PdMと協力し、プロダクト品質を最大化できる開発チームを作ることに実行責任を負う
  • チームが取り扱うシステムの技術品質が維持されることに説明責任を負う

nice to have

  • EMと協力し、チームに所属する開発者の技術スキルを継続的に向上させる

👩‍💻TechLead

特定技術領域のリーダーとして、技術品質が継続的に向上することに説明責任を負う

must

  • 特定技術領域の技術品質が継続的に向上することに説明責任を負う
    • 技術品質の基準を定め、基準を下回らないような仕組み・文化をつくる
    • 技術品質を向上させるための計画立案と施策の遂行
  • LeadEngineerと協力し、開発チームの開発者が特定技術領域の技術を活用できるようにすることに実行責任を負う
  • 必要に応じて、技術的難易度の高いタスクの実行責任を負う

nice to have

  • 会社の技術プレゼンス向上を支援する

👩‍💻Engineering Manager

サーバントリーダーとして、開発組織の生産性を最大化させることに実行責任を負う

must

  • 自身の責任範囲の中で、開発組織の生産性が継続的に向上することに説明責任を負う
  • 自身の責任範囲の中で、開発者が成長することに説明責任を負う
  • 事業計画を実現するために必要な技術のケイパビリティを組織が備えることに実行責任を負う
  • 開発組織が継続的に学び、成長する文化を醸成させることに実行責任を負う

補足

Engineering Manager(EM)に至るステップとしてScrum Master(SM)、Associate EMを定義し、それぞれEMが持つ責任や範囲を限定することで地続きなキャリアパスを実現する

Scrum Masterが持つ責任

  • 自身の責任範囲の中で、スクラムチームの生産性が継続的に向上することに実行責任を負う
  • 開発組織が継続的に学び、成長する文化を醸成させることに実行責任を負う

Associate EMが持つ責任

  • 自身の責任範囲の中で、開発組織の生産性が継続的に向上することに実行責任を負う
  • 事業計画を実現するために必要な技術のケイパビリティを組織が備えることに実行責任を負う
  • 開発組織が継続的に学び、成長する文化を醸成させることに実行責任を負う

評価プロセスについて

目標設定、評価は半期のリズムで実施しています。 - 期初に組織図上のマネージャと目標設定を行う - 少なくとも四半期に一度中間振り返り面談を実施する - 期末に振り返り面談を実施、経営陣+HRでのキャリブレーションを経て評価が確定する

個人目標に加え、3つのバリュー(Ownership、Open、Leverage)に対するアクションプランを設定します。

個人目標とチーム目標の繋がり

個人目標の方向性を把握している組織図上のマネージャが、PO3が四半期ごとに定義するProduct Goalの中で個人目標に関する活動が実現可能か議論します。
議論の結果、実施できない個人目標に関する活動があった場合は、マネージャが説明責任を果たし、メンバーとの目標設定に臨みます。

さいごに

以上が、DROBE開発組織におけるエンジニア職のキャリアパス定義、評価の方針やプロセスです。
定義を言語化し運用を始めたフェーズのため今後改良されていく可能性はありますが、考え方はお伝えできたのではないかと考えています。

もし文章ではわからないことなどあれば直接ご質問も受けられますので、その際は下記フォームからご連絡ください。

youtrust.jp


  1. 説明責任(Accountability)、実行責任(Responsibility)の定義はこちらを参照ください
  2. Product Visionが3年後を見据えた定義になっているため、現時点でのスコープ上限は3年で設定されています
  3. 現在開発チームは1つしかないため、チームのビジネス成果の説明責任者はPOとなっています

DROBE開発組織の紹介
組織情報ポータル