DROBEプロダクト開発ブログ

DROBEのプロダクト開発周辺の知見や考え方の共有をしていきます

golang で Headless Browser によって動的に画像を生成する

はじめに

こんにちは、DROBE の都筑です。 この記事では Go 言語によって動的に画像を生成する Micro service の開発について解説します。

モチベーション

Web サービスを運用していると、メディアサイトなどで SNS の共有のための OG Image の生成などを行うために、動的に画像を生成したいというニーズが出てくるがあると思います。

DROBE でも通知などに使うために動的な画像生成のニーズがあります。

画像の生成方法

画像を動的に生成するには技術的にはいくつかの選択肢がありますが、画像処理系のライブラリを利用して画像生成を行うというのがまず思いつくと思います。

php であれば GD や ImageMagic などを使う形になります。

www.php.net

Go であれば、image package を使うなどが考えられます。

pkg.go.dev

画像生成の課題

画像生成の時にはデザイン性のある画像を作りたいというニーズがありますが、画像処理系のライブラリを使う場合には x, y 座標を考えて調整していく必要があります。

そのため、ワークフローとして

  1. デザイナーがデザイン
  2. エンジニアが実装 (ここが中々重いのと、デザインの再現度はエンジニアの力量に依存する)
  3. デザイナーが実装結果を確認
  4. 必要があれば 2 と 3 を繰り返す

Headless Browser による画像生成サービスの実装

モチベーション

画像処理系のライブラリを使う以外の方法として Headless Browser に CSS 描画を行い、スクリーンショットを撮り画像として保存する、とう手法があります。

この手法の場合は Headless Browser を準備し操作するという面倒さはありますが、デザインは CSS で行えるため、デザイナーによるデザインの実装をイテレーション無しで高い再現度で実現できます。

DROBE ではデザイナーとの作業効率やデザインの再現性の高さを重視して Haedless Browser による手法を採用しました。

構成

Headless Browser をメインのアプリケーションコンテナ内部で準備する事はコンテナサイズの肥大化や管理コストの増大などを招くため、画像生成のサービスをマイクロサービスとする事にしました。コンテナの内部で chromium が動く環境を準備し、それを制御する go のプログラムを走らせる形です。

全体の構成は以下のようになります。

Headless browser を利用した画像生成サーバー

ここで golang による chromium の制御は chromedp というライブラリに依存しています。

github.com

中身の解説

golang の中で 2 つの net.http サーバーを動かしています。

func initialize() {

    // create a WaitGroup
    wg := new(sync.WaitGroup)

    // add two goroutines to `wg` WaitGroup
    wg.Add(2)

    // chronium がアクセスする用のサーバー
    staticServerRouter := chi.NewRouter()
    staticImageHandler := controller.NewStaticImageHandler()
    staticServerRouter.Handle("/", staticImageHandler)
    go func() {
        log.Println("start static image server on :8080")
        http.ListenAndServe(":8080", staticServerRouter)
        wg.Done()
    }()

    // 外部が叩く用のサーバー
    apiServerRouter := chi.NewRouter()
    interactor := interactor.NewCreateImageInteractor(
        imaging.NewChromedpClient(),
        storage.NewS3Client(),
    )
    apiHandler := controller.NewApiHandler(
        interactor,
    )
    apiServerRouter.Handle("/", apiHandler)
    go func() {
        log.Println("start api server on :8081")
        http.ListenAndServe(":8081", apiServerRouter)
        wg.Done()
    }()

    // wait until WaitGroup is done
    wg.Wait()
}

http:8080 サーバーの方は内部で chromium がアクセスするサーバーです。

デザインをしていた html と css を使ってWebページを表示しスクショを取る構造になっています。

外部が叩く api server は一般的な clean architecture のイメージで書いています。

処理の流れは以下のようになります。

  • 外部からリクエストが来る
  • 8081 で動いている api server で処理を受け付ける
  • api server は chromedp を操作し、8080 にWebページを表示、スクショを取る
  • 取ったスクショを S3 にあげる (S3 バケットは公開設定にしておく)
  • S3 の url を api response として返す

chromedp を制御する部分は infra レイヤーに書いています。

package imaging

import (
    "context"
    "fmt"
    "io/ioutil"
    "log"
    "time"

    "github.com/chromedp/chromedp"
)

type ChromedpClient struct {
}

func NewChromedpClient() *ChromedpClient {
    return &ChromedpClient{}
}

func (c *ChromedpClient) TakeScreenshot(url string) (string, error) {
    opts := append(chromedp.DefaultExecAllocatorOptions[:],
        chromedp.DisableGPU,
        chromedp.WindowSize(1500, 1500),
    )
    allocCtx, cancel1 := chromedp.NewExecAllocator(context.Background(), opts...)
    ctx, cancel2 := chromedp.NewContext(allocCtx, chromedp.WithLogf(log.Printf))
    ctx, cancel3 := context.WithTimeout(ctx, 5*time.Second) // set timeout
    for _, cancel := range []context.CancelFunc{cancel1, cancel2, cancel3} {
        defer cancel()
    }

    var buf []byte
    task := chromedp.Tasks{
        chromedp.Navigate(url),
        chromedp.WaitVisible("#target", chromedp.ByID),
        chromedp.Screenshot("#target", &buf, chromedp.NodeVisible),
    }

    if err := chromedp.Run(ctx, task); err != nil {
        return "", err
    }

    fileName := fmt.Sprintf("%d.png", time.Now().UnixNano())
    if err := ioutil.WriteFile(fileName, buf, 0644); err != nil {
        return "", err
    }

    return fileName, nil
}

コンテナとして動かす際の注意点

この構成でサーバーを Deploy しましたが、一つ大きくハマってしまったポイントがあるのでご紹介します。

具体的には memory 使用量がどんどん増え続けていくという挙動が観測されました。

メモリーリークが観測された

典型的なメモリーリークの挙動に見えたので pprof などを利用して golang のサーバーでの memory leak などを疑いましたが特に leak は観測されませんでした。

pkg.go.dev

go 側で leak していないとすると、コンテナ内部で他の process が悪さをしていることが想定されます。

実際に top で process を確認してみると zombie process が発生している事を確認できました。

Mem: 3810340K used, 197548K free, 4352K shrd, 135288K buff, 2115840K cached
CPU:   5% usr   3% sys   0% nic  90% idle   0% io   0% irq   0% sirq
Load average: 0.44 1.55 0.87 2/570 150
  PID  PPID USER     STAT   VSZ %VSZ CPU %CPU COMMAND
   64     1 root     S    4802m 122%   1   0% dlv exec --listen=:2345 --headless=tr
   69    64 root     S     716m  18%   0   0% /go/src/build/main
    1     0 root     S     698m  18%   1   0% air -c .air.toml
   31     0 root     S     1672   0%   0   0% ash
  150    31 root     R     1600   0%   0   0% top
  109     1 root     Z        0   0%   0   0% [chromium]
  100     1 root     Z        0   0%   1   0% [chromium]
   82     1 root     Z        0   0%   0   0% [chromium]
   83     1 root     Z        0   0%   1   0% [chromium]
  136     1 root     Z        0   0%   0   0% [chromium]

zombi process の chromium はサーバーを使えば使うほど増えていくことも確認できました。

Zombie process への対策

この blog を参考に PID1 が zombie を kill してくれるように —init flag をつけて docker を起動したところ screen shot を取った後に zombie process が生まれない事が確認できました。

blog.phusion.nl

終わりに

go を用いた画像を動的に生成するマイクロサービスをご紹介しました。

特にメモリー周りはあまり知見もなかったので同じように chromedp をコンテナで動かそうと考えている方の参考になれば幸いです。

LLM で行う独自シソーラスに対応した校正機能

はじめに

こんにちわ、DROBE の都筑です。 この記事では LLM で行う独自シソーラスに対応した校正機能について解説します。

LLM で校正してもらう

文章校正とは文章内の誤字・脱字・誤植や文法ミスを修正して正しく書き直すことを指します。 一般的には文章の「てにをは」やタイポなどを修正する機能をイメージされる方が多いと思いますが、実際に業務の現場では独自の言い回しや単語などがありライティングのガイドラインがある場合も多いと思います。

LLM による校正で非常に強力なのは、そういった独自ルールやガイドラインを踏まえた上で校正をしてくれる機能をプロンプトを作成するだけで簡単に実装できる点にあると思います。

DROBE でのユースケース

DROBE ではお客さまに商品を発送する際に スタイリングカルテ と呼んでいるスタイリストからの提案をまとめた印刷物を同梱しています。 カルテに記載する文言は全てスタイリストが自分で書いていますが、サービスに特徴的な単語の言い回しなどに一定のルールがあります。

今回はこのスタイリングカルテの文言の校正機能を LLM を用いて開発してみました。

機能の概要

実装としては非常にシンプルで、OpenAI の API を叩く Lambda を先に用意しておき、それを React から直接叩きます。

機能の概要図

UI 的には、校正ボタンを準備しておき、ボタンをクリックしたら API を叩いて結果が返ってきたら修正差分を表示する、という簡易なものを作りました。

実際にサービスに組み込んでみた

Lamda の構成については別で記事にしているので興味があればご参照ください。

tech.drobe.co.jp

プロンプト

Lambda 側のプロンプトを記載します。ここは色々な工夫のしようがあると思います。

あなたは文書を校正するアシスタントです。
意味を変えないように、与えられた文章を以下のポイントに気をつけて訂正して出力してください。
- タイポ
- 文法間違い

以下の点に関しては訂正しないでください。
1. 敬称としての使う「様」には平仮名の「さま」を使ってください
... 省略

表記を間違えやすい単語や表現があるので気をつけてください
以下に例を示します。

===============
正しい表記: DROBE
間違った表記: Drobe

正しい表記: お客さま
間違った表記: お客様、ユーザー

...省略
===============

これらを踏まえて以下の文章を訂正してください。
===============
{passage}
===============

model は gpt-3.5-turbogpt-4 を試しましたが、この場合は圧倒的に gpt-4 の方が自然な校正結果が返ってきました。

パフォーマンスに関して

パフォーマンスに関しては Datadog で処理時間を計測しています。 コンソールを見ると大体 15sec 程度ではレスポンスを返せていそうです。

Datadog のコンソール

一般的な API だと考えるとあり得ないくらい遅いですが、校正という機能を考えると及第点かなと思います。

検証方法

検証を行うために ChatGPT に間違った表記で文章をいくつか作ってもらい、それを校正 API に流して検証しました。

ChatGPT による検証問題の作成

上手くシステム化すれば、検証や評価もある程度自動化できるのではと思いました。

さいごに

OpenAI の LLM である GPT-4 を利用した文章校正ツールについて解説しました。 一般的な校正だけではなく、独自ルールやガイドラインを加味してくれるツールをサッと作れるのが非常に強力だと思います。

参考にさせていただいた資料

以下を非常に参考にさせていただきました。ありがとうございます。

LLM をサービスから使うために Lambda で API Wrapper を作る

はじめに

こんにちわ、DROBE の都筑です。

OpenAI の提供する API を始めとして LLM をサービスで活用されている、もしくはこれから活用しようとしている方は多いと思います。 一方で、OpenAI が公式に提供している Library は Python と Node.js のみなので既存のサービスに直接インテグレーションする事が難しい場合も多いのではないでしょうか。

そういったケースに対応するために、 AWS Lambda で API の Wrapper 関数を作り公開し、サービスからはその API を叩くという構成のシステムをセットアップしたのでこの記事で解説します。

作るもの

以下のような構成のものを GitHub actions と terraform で構築していきます。

全体の構成

Lambda を採用した理由は以下になります。

  • LLM を利用した機能を実装するサーバーになるが、入出力を揃えて API を叩くだけで重たい処理を行わない
  • ECS などのサービスを利用して常時 API を立てておくよりも金額的なメリットがある
  • 弊社が AWS を中心にインフラを構築しているので使いやすい

Lambda は python の container を利用します。Lambda の実行時に SSM から OpenAI の API Key を取得して API を叩く構成です。 クライアントからはカスタムドメインを貼った API Gateway 経由で Lambda にアクセスする構成としています。Log は Cloudwatch に、Metrics は Datadog に送り Lambda の実行回数などをモニタリングできるようにします。

解説

では、それぞれ解説していきます。

Lambda 関数の設計

まず Lambda 関数の設計ですが、以下のような要件を満たせるようにしたいと考えました。

  • LLM を利用したい機能は複数出てくる事が想定されるので、新機能を作るたびにインフラを作らなくて良いようにしたい
  • OpenAI 以外の API も気軽に試せるような構成にしたい

これらを踏まえて、以下のような Clean Architecture 風な File 構成で Lambda の内部を書きました。

├── main.py
└── src
    ├── domain // interface の定義
    ├── infra // interface の実装
    │   └── openai // openai に依存した API の定義 (prompt もこの下で定義)
    └── usecase
        └── interactor.py

main.py には lambda の entry point となる関数を書きます。 main.py の主な役割は DI (interactor の準備) と request の validation と response の整形です。

domain に usecase で使う処理の interface を定義します。例として与えられた入力から特定の商品情報を抜き出す処理の interface は以下のような定義にしました。

RelatedItemExtractionResponse = TypedDict(
    "RelatedItemExtractionResponse", {"items": List[str]}
)


class IFRelatedItemExtraction(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def handle(self, request_text: str) -> RelatedItemExtractionResponse:
        pass

interactor 内部で business logic を呼び出す処理を書きます。ここでは LLM の呼び出し処理を行います。もちろん、このレイヤーは infra には直接依存しないレイヤーになります。

class Interactor:
    def __init__(
        self,
        repo: IFRelatedItemExtraction,
    ):
        self.repo = repo

    def extract(self, request_text: str) -> RelatedItemExtractionResponse:
        return self.repo.handle(request_text)

実際に API を呼び出す処理や prompt の定義は infra 以下に書きます。特定の商品情報を抜き出す処理の実装はこのような形です。

MODEL_NAME = "gpt-3.5-turbo-16k" # gpt-4 なども指定できる。Application に合わせて指定する。

SYSTEM_PROMPT = "" # システムプロンプト を書く

PROMPT = """
{passage}
"""

class OpenAIRelatedItemExtraction:
    @timeout_decorator.timeout(60) # timeout させるための decoration
    def chat_complete(self, request_text: str):
        return openai.ChatCompletion.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": PROMPT.format(passage=request_text)},
            ],
            max_tokens=4098,
            temperature=0,
        )

    def handle(self, request_text: str) -> RelatedItemExtractionResponse:
        output = self.chat_complete(request_text)
        content = output["choices"][0]["message"]["content"]
        return json.loads(content)

ポイントはモデルの指定やプロンプトをここに書いている事です。プロンプトなどは実装の詳細なので、ifra 以外のレイヤーでは意識しなくて良くなり、プロンプトの修正や呼び出し方の変更 (例えば Function calling を使うように修正するなど) をしても、infra レイヤー以外の修正は行わなくて良くなります。

また、こういった構成にしておくことで、例えば OpenAI をやめて別の LLM に変更するなどをする場合でも、domain で定義された interface に沿って infra を新規で作り main.py で interactor を作る時にどちらを使うかを選べるため、 比較コードの修正量を抑えながら AB テストなどを行う事ができます。

Lamda へのリクエストは以下のような形に統一しています。

{
  "type": "function_name",
  "input": "input_text"
}

type によって interactor のどの処理を呼び出すかを決定するようにします。 この構成により、新しく Lambda を Deploy しなくても Lambda のソースコードを変更することで新しいを機能を追加できます。 もちろんコンテナのサイズが肥大化していくという懸念はありますが、API の Wrapper であることを想定しているのである程度までは機能を増やしても問題ないと考えました。

CI/CD

コンテナのビルドと Lambda の更新は GitHub Actions で行います。

CI/CD

Actions は以下のような yaml で定義しました。

name: Build

on:
  push:
    branches:
      - main

env:
  CONTAINER_NAME: xxx # make の中で定義しているコンテナ名
  REPO_NAME: yyy # ECR repo の名前
  LAMBDA_FUNCTION_NAME: zzz # terraform で定義する Lambda 関数の名前

jobs:
  build:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.ML_AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.ML_AWS_SECRET_ACCESS_KEY }}
          aws-region: ap-northeast-1

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Build and push to ecr
        run: |
          make build # コンテナを build するためのコマンド。コンテナ名はこの内部で定義している。
          docker tag ${CONTAINER_NAME}:latest ${REPO_NAME}:latest
          docker push <repo-name>:latest
          docker tag ${CONTAINER_NAME}:latest ${REPO_NAME}:${GITHUB_SHA}
          docker push ${REPO_NAME}:${GITHUB_SHA}
  
      - name: Deploy to lambda
        run: |
          aws lambda update-function-code --function-name ${LAMBDA_FUNCTION_NAME} --image-uri ${REPO_NAME}:latest

環境の構築

Lambda の実行環境は terraform で作ります。 それぞれのリソースに合わせて file を分割して作りました。 ここでは point となる lambda function と api gateway の部分だけピックアップして説明します。

まず lambda function の定義です。

resource "aws_lambda_function" "service_llm_lambda" {
  depends_on = [
    aws_cloudwatch_log_group.service_llm_lambda, # cloudwatch の log group を定義しておきます
  ]

  function_name = "service-llm-lambda-${var.app_env}"
  package_type  = "Image"
  image_uri     = "${aws_ecr_repository.ecr_repo.repository_url}:latest" # ECR repository の名前です
  role          = aws_iam_role.lambda_role.arn # Lambda の実行時の権限です。後述しますが SSM から機密情報を読み出せる権限を渡しました

  memory_size = 128
  timeout     = 60 # OpenAI の API は通常の API よりも実行時間が長い傾向があるので長めにしておきます

  lifecycle {
    ignore_changes = [
      image_uri, last_modified
    ]
  }

  image_config {
    command = ["datadog_lambda.handler.handler"] # 後述しますが Datadog にメトリクスを送るためにコマンドを上書きしておきます。
  }

  environment {
    variables = {
      DD_LAMBDA_HANDLER = "main.lambda_handler", # コマンドを上書しているので、実際のハンドラーはここに記載します。
      DD_SITE           = "datadoghq.com",
      DD_API_KEY        = data.aws_ssm_parameter.dd_api_key.value, ## Datadog の API key を parameter store から渡します。この書き方だとコンソールから鍵が見えてしまうので実際はそうならないように工夫が必要です。
      DD_TRACE_ENABLED  = "true"
    }
  }

}

api gateway はこんな感じです。HTTP モードでの設定とし、あらかじめ Route53 に作っておいたカスタムドメインと紐付けます。

resource "aws_apigatewayv2_api" "gw_api" {
  name          = "gateway_api_service-llm-lambda-${var.app_env}"
  protocol_type = "HTTP"
  cors_configuration {
    allow_origins = ["*"] # FE から直接呼び出したいというニーズがある場合には CORS 設定をしておく。実際には `*` ではなくちゃんとドメインを指定しましょう
    allow_methods = ["POST", "GET", "OPTIONS"]
    allow_headers = ["*"]
    max_age       = 300
  }
}

resource "aws_apigatewayv2_stage" "gw_stage" {
  api_id = aws_apigatewayv2_api.gw_api.id

  name        = "gateway_api_service-llm-lambda-${var.app_env}"
  auto_deploy = true

  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.gw_lg.arn
}

resource "aws_apigatewayv2_integration" "gw_integration" {
  api_id = aws_apigatewayv2_api.gw_api.id

  integration_uri    = aws_lambda_function.service_llm_lambda.invoke_arn
  integration_type   = "AWS_PROXY"
  integration_method = "POST"
}

resource "aws_apigatewayv2_route" "gw_route" {
  api_id = aws_apigatewayv2_api.gw_api.id

  route_key = "POST /invoke"
  target    = "integrations/${aws_apigatewayv2_integration.gw_integration.id}"
}

resource "aws_cloudwatch_log_group" "gw_lg" {
  name              = "/aws/api_gw/service-llm-lambda"
  retention_in_days = 1
}

resource "aws_lambda_permission" "api_gw" {
  statement_id  = "AllowExecutionFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.service_llm_lambda.function_name
  principal     = "apigateway.amazonaws.com"

  source_arn = "${aws_apigatewayv2_api.gw_api.execution_arn}/*/*"
}

resource "aws_apigatewayv2_domain_name" "api_gw_domain" {
  domain_name = "${var.app_env}.xxx" # あらかじめ route53 に作っておいた domain 名

  domain_name_configuration {
    certificate_arn = var.domain_cert_arn # あらかじめ ACM で取得しておきた certificate の ARN を指定
    endpoint_type   = "REGIONAL"
    security_policy = "TLS_1_2"
  }
}

resource "aws_route53_record" "api_gw_domain_record" { # ここで Route53 の ZONE に API Gateway にリクエストを向けるための A record を追加
  name    = aws_apigatewayv2_domain_name.api_gw_domain.domain_name
  type    = "A"
  zone_id = var.zone_id

  alias {
    evaluate_target_health = true
    name                   = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].target_domain_name
    zone_id                = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].hosted_zone_id
  }
}

resource "aws_apigatewayv2_api_mapping" "api_gw_domain_mapping" {
  api_id      = aws_apigatewayv2_api.gw_api.id
  stage       = aws_apigatewayv2_stage.gw_stage.id
  domain_name = aws_apigatewayv2_domain_name.api_gw_domain.id
}

設定のポイント

Timeout について

OpenAI の API を通常の API よりも実行時間が長い傾向があります。(公式の python sdk の設定 は 2023年8月現在、default の timeout はなんと 600sec です!)

そのため Lambda の実行時間を 60sec など長めにとっておくのが良いと思います。

また、コード自体でも timeout を適切に入れておくのが望ましいです。 今回は OpenAI の SDK に依存する形ではなく timeout-decorator という library を使って時前で timeout させるようにしました。

@timeout_decorator.timeout(60)
    def chat_complete(self, request_text: str):
        return openai.ChatCompletion.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": PROMPT.format(passage=request_text)},
            ],
            max_tokens=2048,
            temperature=0,
        )

API Key をどこから取り出すか

API Key は機密情報なので Lambda の内部で ssm を経由して取り出すようにしました。 Lambda の Execution role に SSM の該当リソースを取得してデコードする権限を渡しておきます。

data "aws_iam_policy_document" "lambda_execution_policy" {

  ... 省略

  statement {
    effect = "Allow"
    actions = [
      "kms:Decrypt",
      "ssm:GetParameter"
    ]
    resources = [
      "arn:aws:ssm:*:*:parameter/xxx" # parameter 名
    ]
  }
}

python 側では Lambda 起動時に以下のような処理が走るような実装にしています。ただしこれは OpenAI のみで必要な処理なので infra レイヤー内部に実装を入れています。

import boto3
import openai

ssm = boto3.client("ssm", region_name="ap-northeast-1")
response = ssm.get_parameter(
    Name="xxx", WithDecryption=True
)
openai.api_key = response["Parameter"]["Value"]

Datadog との接続

Datadog にメトリクスを送るには、以下の 2 点を行う必要があります。

  1. コンテナに Datadog に関連するライブラリを入れておく
  2. コンテナな起動設定でエントリポイントを Datadog のライブラリに上書きし、必要な環境変数を設定する

1 に関しては Dockerfile に public.ecr.aws/datadog/lambda-extension:latest から設定をコピーする設定を書きつつ、 datadog-lambda ライブラリをインストールします。今回は python のパッケージ管理を poetry で行っているので、 pyproject.toml に設定を追記します。

FROM public.ecr.aws/lambda/python:3.10

... 省略

COPY --from=public.ecr.aws/datadog/lambda-extension:latest /opt/extensions/ /opt/extensions

... 省略

RUN poetry install --no-root
[tool.poetry.dependencies]
python = "^3.10"
openai = "^0.27.8"
boto3 = "^1.28.22"
timeout-decorator = "^0.5.0"
datadog-lambda = "^4.78.0" # ここ

詳しい設定はこちらを参考にしてください。

docs.datadoghq.com

設定が完了した上で Lambda を実行すると、コンソールの Menu で Infrastructure -> Serverless 内で図のようにメトリクスが表示されるようになります。

Datadog のコンソール

ハマりポイント

Lambda 関数の event について

api gateway で integration した場合には event の中身が変化するという事を理解できておらず、ややハマりました。

コンソールからのテストでは正常に動きましたが、postman などで api を試すと internal serveer error が返ってきてしまいました。

{"message":"Internal Server Error"}

調べていくと、公式に以下のような解説がありました。

ペイロード形式バージョンでは、API Gateway が Lambda 統合に送信するデータの形式と、API Gateway が Lambda からのレスポンスをどのように解釈するかを指定します。ペイロード形式バージョンを指定しない場合、AWS Management Console はデフォルトで最新バージョンを使用します。

docs.aws.amazon.com

Api Gateway からの呼び出しの際には event の構造が変化するので、それに対応するように event からパラメータを取り出したり、レスポンスの形式を揃える必要があります。

インフラ的に解決する道もあると思いますが、簡易的に以下の関数を main.py に定義して対応しました。

def get_request(event):
    if "type" not in event:
        event = json.loads(event["body"])

    request_type = event["type"]
    request_text = event["input"]

    return request_type, request_text

def create_response(response: dict):
    return {
        "statusCode": 200,
        "body": json.dumps(response),
    }

さいごに

OpenAI などの LLM を活用するための Lambda の構成について解説しました。 がっつり使う場合はまた別の構成があると思いますが、LLM をちょっとした機能で使う場合には出来る限りインフラを追加で作らずに機能を追加できるような構成にしておく事にはメリットがあると思います。

ECS で GPU を使った ML 系 Task の実行環境をセットアップする

この記事を書いた人

DROBE の都筑 (@tsuzukit2) です

簡単なプロフィールはこちらをご覧ください

はじめに

機械学習系の機能を開発していると、GPU を利用してトレーニングを行いたいケースが多々あると思います。

この記事では、ECS で GPU を使った ML 系 Task の実行環境のセットアップについて記載します。

作りたいもの

作りたいものの概要はこのようなものです。

ECS で構築する GPU を利用した Task の実行環境

GPU は高価なので、常時起動しているインスタンスは 0 としておきつつ、Task が作られたらインスタンスを起動、Task を実行、Task の実行が終わったらインスタンスを落とし 0 に戻す、という環境をセットアップします。

ECS の capacity provider と autoscaling group を紐付け、ECS Task が起動 / 終了のタイミングで必要なインスタンスが変更される構成です。

ECS Task は goofys を利用して S3 をマウントする事とします。S3 をマウントするのは、画像系機械学習モデルのトレーニングに大量の画像が必要であり、それを S3 にマウントする事でコードで S3 の API などを意識せずに使えるようにするためです。

goofys については、こちらの repo を参考にしてください。

ECS で実行するタスクの定義と実行

ECS で実行するタスク定義は事前に Terraform で作っておく事としました。Task 定義の中で実際に training を行うコンテナは latest tag のものを実行するように指定しておきます。 Task を実行する際には、GitHub Actions にてコンテナをビルドし ECR に push (ここで latest tag のイメージが更新されます) し、AWS CLI を使って Task を起動するという流れです。

GitHub Actions は以下のようなものになります。

name: invoke ml training

on:
  workflow_dispatch:

jobs:
  build:
    name: build container image
    runs-on: ubuntu-latest

    steps:
    - name: source checkout
      uses: actions/checkout@v3

    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v2
      with:
        aws-access-key-id: ${{ AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ AWS_SECRET_ACCESS_KEY }}
        aws-region: ap-northeast-1

    - name: Login to Amazon ECR
      id: login-ecr
      uses: aws-actions/amazon-ecr-login@v1

    - name: Build and push to ecr
      run: |
        make build # 自作のコンテナを build するためのコマンド
        docker tag trainer:latest xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/trainer:latest
        docker push xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/trainer:latest:latest
        docker tag trainer:latest xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/trainer:${GITHUB_SHA}
        docker push xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/trainer:${GITHUB_SHA}

  run:
    name: run ecs task
    runs-on: ubuntu-latest
    needs: [build]

    steps:

    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v2
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: ap-northeast-1

    - name: run-task
      env:
        CLUSTER_NAME: ml-train # terraform で定義した cluster 名に合わせる
        FAMILY_NAME: trainer # terraform で定義した task 定義に合わせる
      run: |
        TASK_DEF_ARN=$(aws ecs list-task-definitions --family-prefix "${FAMILY_NAME}" --query "reverse(taskDefinitionArns)[0]" --output text)
        echo "${TASK_DEF_ARN}"
        TASK_ARN=$(aws ecs run-task --cluster ${CLUSTER_NAME} --task-definition ${TASK_DEF_ARN} --query tasks[0].taskArn --output text)
        TASK_ID=$(echo "${TASK_ARN}" | grep -oE "[^/]+$")

Terraform

全体の構成が決まったので Terraform の設定を書いていきます。 (注 公開するために命名などを修正しており動作未検証なのでコピペでの使用は避けてください)

まずは VPC などネットワーク周りの設定を書きます。

# VPC
resource "aws_vpc" "vpc_name" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true
  enable_dns_support   = true

  tags = {
    Name = "vpc_name"
  }
}

# Public subnet
resource "aws_subnet" "subnet" {
  vpc_id                  = aws_vpc.vpc_name.id
  availability_zone       = "ap-northeast-1a"
  cidr_block              = "10.0.1.0/24"
  map_public_ip_on_launch = true
}

resource "aws_internet_gateway" "ig" {
  vpc_id = aws_vpc.vpc_name.id
}

resource "aws_route_table" "rt" {
  vpc_id = aws_vpc.vpc_name.id
}

resource "aws_route" "route" {
  route_table_id         = aws_route_table.rt.id
  destination_cidr_block = "0.0.0.0/0"
  gateway_id             = aws_internet_gateway.ig.id
}

resource "aws_route_table_association" "rta" {
  subnet_id      = aws_subnet.subnet.id
  route_table_id = aws_route_table.rt.id
}

resource "aws_security_group" "sg" {
  name   = "sg"
  vpc_id = aws_vpc.vpc_name.id
  depends_on = [
    aws_vpc.vpc_name
  ]

  ingress {
    from_port   = "0"
    to_port     = "0"
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  egress {
    from_port   = "0"
    to_port     = "0"
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

次に ECR などを書いていきます

# ECR
resource "aws_ecr_repository" "ecr_repo" {
  name     = "trainer"
}

# ECR Lifecycle policy 
resource "aws_ecr_lifecycle_policy" "lsp" {
  repository = each.value.ecr_repo

  policy = <<EOF
{
    "rules": [
        {
            "rulePriority": 1,
            "description": "Keep last 2 images",
            "selection": {
                "tagStatus": "tagged",
                "tagPrefixList": ["v"],
                "countType": "imageCountMoreThan",
                "countNumber": 2
            },
            "action": {
                "type": "expire"
            }
        }
    ]
}
EOF
}

続いて Task 定義を作ります

ここで goofys や GPU を使うための設定を行います

# Task 定義
resource "aws_ecs_task_definition" "task" {

  family                   = "trainer"
  requires_compatibilities = ["EC2"]
  network_mode             = "bridge"
  cpu                      = 2048
  memory                   = 8192

  task_role_arn      = aws_iam_role.iam.arn
  execution_role_arn = aws_iam_role.iam.arn

  container_definitions = jsonencode([
    {
      image     = "xxxx.dkr.ecr.ap-northeast-1.amazonaws.com/${aws_ecr_repository.ecr_repo.name}:latest" # ここで latest を指定する
      essential = true,
      name      = "trainer"
      cpu       = 2048,
      memory    = 8192,
      logConfiguration = {
        logDriver = "awslogs",
        options = {
          awslogs-group         = aws_cloudwatch_log_group.ml_image_recognition_train.name,
          awslogs-region        = "ap-northeast-1",
          awslogs-stream-prefix = "ml_image_recognition"
        }
      },
      linuxParameters = { # goofys を使うための設定
        capabilities = {
          add = [
            "MKNOD",
            "SYS_ADMIN"
          ]
        },
        "devices" : [
          {
            "hostPath" : "/dev/fuse",
            "containerPath" : "/dev/fuse",
            "permissions" : [
              "read",
              "write",
              "mknod"
            ]
          }
        ]
      },
      environment = [
        {
          name  = "NVIDIA_DRIVER_CAPABILITIES",
          value = "all"
        }
      ],
      resourceRequirements = [ # GPU を使うための設定
        {
          type  = "GPU",
          value = "1"
        }
      ]
    }
  ])
}

resource "aws_iam_role" "iam" {
  name = "ecs-iam"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "ecs-tasks.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}

resource "aws_iam_role_policy" "iam_policy" {
  role = aws_iam_role.iam.id

  policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogGroups",
        "logs:DescribeLogStreams"
      ],
      "Effect": "Allow",
      "Resource": "${aws_cloudwatch_log_group.lg.arn}"
    },
    {
      "Action": [
        "s3:ListBucket",
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Effect": "Allow",
      "Resource": [
        "*" # goofys でマウントしたいバケットを指定する
      ]
    }
  ]
}
EOF
}

resource "aws_cloudwatch_log_group" "trainer" {
  name              = "/ecs/logs/prod/trainer"
  retention_in_days = 14
}

最後に cluster の定義や Autoscaling Group の設定を書いていきます。aws_laumch_template で使いたいインスタンスタイプを指定します。ここでは g4dn.xlarge を指定しています。

# ecs cluster
resource "aws_ecs_cluster" "cluster" {
  name = "ml-train"
}

resource "aws_ecs_cluster_capacity_providers" "trainer" {
  cluster_name = aws_ecs_cluster.cluster.name

  capacity_providers = [aws_ecs_capacity_provider.trainer.name]

  default_capacity_provider_strategy {
    base              = 0
    weight            = 1
    capacity_provider = aws_ecs_capacity_provider.trainer.name
  }

}

resource "aws_ecs_capacity_provider" "trainer" {
  name = "trainer"

  auto_scaling_group_provider {
    auto_scaling_group_arn         = aws_autoscaling_group.trainer.arn
    managed_termination_protection = "ENABLED"
    managed_scaling {
      maximum_scaling_step_size = 10
      minimum_scaling_step_size = 1
      status                    = "ENABLED"
      target_capacity           = 100
    }
  }
}

resource "aws_autoscaling_group" "trainer" {
  name                      = "trainer"
  max_size                  = 1
  min_size                  = 0
  health_check_grace_period = 0
  health_check_type         = "EC2"
  desired_capacity          = 0
  vpc_zone_identifier       = [aws_subnet.trainer.id]

  launch_template {
    id      = aws_launch_template.trainer.id
    version = "$Latest"
  }

  tag {
    # ECSにスケーリングをお願いするために必要なタグ
    # https://docs.aws.amazon.com/ja_jp/AmazonECS/latest/developerguide/cluster-auto-scaling.html#update-ecs-resources-cas
    key                 = "AmazonECSManaged"
    value               = ""
    propagate_at_launch = true
  }

  lifecycle {
    ignore_changes = [
      desired_capacity,
    ]
  }

}

locals {
  node_group_user_data = <<-EOF
  #!/bin/bash
  set -o xtrace
  echo ECS_CLUSTER=${aws_ecs_cluster.ml_image_recognition_train.name} >> /etc/ecs/ecs.config;
  EOF
}

resource "aws_launch_template" "trainer" {
  name = "trainer"

  # https://docs.aws.amazon.com/ja_jp/AmazonECS/latest/developerguide/retrieve-ecs-optimized_AMI.html
  image_id      = "ami-087da40e7559e6193"
  instance_type = "g4dn.xlarge"
  key_name      = "xxxxx"

  vpc_security_group_ids = [aws_security_group.trainer.id]

  block_device_mappings {
    device_name = "/dev/xvda"

    ebs {
      volume_size = 200
    }
  }

  instance_market_options {
    market_type = "spot" # spot の方が金額は安いが長時間の学習では spot だと止まってしまう事があった
  }

  iam_instance_profile {
    arn = aws_iam_instance_profile.trainer.arn
  }

  user_data = base64encode(format(local.node_group_user_data))
}

data "aws_iam_policy" "AmazonEC2ContainerServiceforEC2Role" {
  arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"
}

resource "aws_iam_role_policy_attachment" "trainer" {
  for_each = toset([
    "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy",
    "arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly",
  ])
  role       = aws_iam_role.trainer.name
  policy_arn = each.value
}

data "aws_iam_policy" "AmazonEC2ContainerServiceforEC2Role" {
  arn = "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"
}

resource "aws_iam_role" "trainer" {
  name               = "trainer"
  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "trainer" {
  policy_arn = data.aws_iam_policy.AmazonEC2ContainerServiceforEC2Role.arn
  role       = aws_iam_role.trainer.name
}

resource "aws_iam_instance_profile" "trainer" {
  role = aws_iam_role.trainer.name
}

CapacityProviderReservation について

ECS のインスタンス数を増減させる仕組みとして CapacityProvider を使います。それらの説明は公式のブログに詳しいので一読ください。

Amazon ECS クラスターの Auto Scaling を深く探る | Amazon Web Services

CapacityProvider を利用した Autoscaling においては、クラスターが必要とするインスタンス数と実際に稼働しているインスタンスの数を比率で表した CapacityProviderReservation という値をあらかじめ設定しておき、その値になるようにインスタンス数を ASG が自動で調節します。若干解り伝いのですが、公式の説明を読むとイメージが掴めます。

CAS の中心的な責任は、ASG に割り当てられたタスクの必要を満たす上で「適切な」数のインスタンスが ASG で実行されるようにすることです。これには、既に実行されているタスクと、既存のインスタンスには収まらない、顧客が実行しようとしているタスクの両方が含まれます。その数を M としましょう。また、既に実行されている ASG 内の現在のインスタンス数を N とします。この記事の残りの部分では M と N が繰り返し出てくるので、これらをどう考えたらよいかを十分明確に理解しておくことが重要です。  今のところ、M をいくらにしたらよいかを知る方法は説明していませんが、議論の目的上、M は必要数であるとだけ仮定します。この仮定の下で、もし N = M であるとするなら、スケールアウトは必要ではなく、スケールインは不可能です。一方、N < M なら、十分なインスタンスがないことになるので、スケールアウトが必要です。  最後に N > M なら、スケールインが可能です (ただし必要だとは限りません)。自分の ECS タスクすべてを実行するのに必要な数よりも多くのインスタンスが存在しているからです。また、後ほど見るように、N と M に基づく新しい CloudWatch メトリクスを定義し、それをCapacityProviderReservation と呼ぶことにします。N と M が与えられたときのこのメトリクスの定義は非常にシンプルです。

簡単に説明するなら、このメトリクスは、ASG の実際の大きさと必要な大きさとの比を、パーセント単位で表したものです。

terraform ではこの CapacityProviderReservation を設定する形になります

resource "aws_ecs_capacity_provider" "trainer" {
  name = "trainer"

  auto_scaling_group_provider {
    auto_scaling_group_arn         = aws_autoscaling_group.trainer.arn
    managed_termination_protection = "ENABLED"
    managed_scaling {
      maximum_scaling_step_size = 10
      minimum_scaling_step_size = 1
      status                    = "ENABLED"
      target_capacity           = 100 # <- ここ
    }
  }
}

ここで 100 を指定するという事は、「タスクを実行するのにちょうど必要な数のインスタンスを準備してください」ということになります。

例えばここで 200 を指定しておくと、「タスクを実行するのに必要なインスタンスの数の 2 倍のインスタンスを準備してください」ということです。

CapacityProviderReservation を 100 にしておくことで、タスクのリクエストが無い時はインスタンスの数が 0 になるように出来ます。(GPU インスタンスは非常に高いので使っていない時に 0 に出来るという事はメリットが大きいはずです)

Trouble Shooting

ここからは、環境を構築するにあたって実際にハマってしまったポイントを 2 つ紹介します。

インスタンスが起動はするが ECS に参加しない

ECS クラスターが作られ EC2 インスタンスも建っているのに、training が始まらない場合は EC2 が ECS クラスターに参加できているかを確認してみてください。

EC2 インスタンスを ECS クラスターに参加させるためには、EC2 の /etc/ecs/ecs.configECS_CLUSTER 環境変数をセットする必要がありました。

terraform では以下の辺りになります。

locals {
  node_group_user_data = <<-EOF
  #!/bin/bash
  set -o xtrace
  echo ECS_CLUSTER=${aws_ecs_cluster.trainer.name} >> /etc/ecs/ecs.config;
  EOF
}

resource "aws_launch_template" "trainer" {
  name = "trainer"

  ...省略
  
  user_data = base64encode(format(local.node_group_user_data))
}

Training が 20 時間程度で終わってしまう

環境を構築してから何回か training を実行しましたが、どうしても 20 時間程度で勝手に training が終了してしまうという現象に遭遇しました。

最初はメモリーなどを疑いましたが試しに無限に sleep をし続ける training task を作って実行しても同様の結果になってしまいました。

調査した結果、どうやらスポットインスタンスを使っている事が原因だったようです。

スポットインスタンスの設定を削除した結果数日に及ぶようなトレーニングでも実行を行える事が確認できました。

terraform ではこの辺りになります。

resource "aws_launch_template" "trainer" {
  
  ... 省略

  instance_market_options {
    market_type = "spot" # spot の方が金額は安いが長時間の学習では spot だと止まってしまう事があった
  }

}

おわりに

ML をやっていると GPU を使いたくなる事は多々あり、かつ本番環境への Deploy などを考えると自動のパイプラインとして GPU によるトレーニングを組みたくなる事は多いと思います。

一方で GPU インスタンスは非常に高価であるため、使っていない時は落としておきたいと思うのが人情だと思います。

この記事では ECS で GPU を使った ML 系 Task の実行環境のセットアップについて解説しました。特に必要のない時にはインスタンス数を 0 にしておける設定なので、特にコストにシビアなスタートアップの方などに少しでも参考になれば嬉しいです。

PHPカンファレンス2023にゴールドスポンサーとして協賛します

EMの加川(@shinpr_p)です。
DROBE は PHP Conference Japan 2023 にゴールドスポンサーとして協賛します。

PHPカンファレンスは、2000年より年に一度開催されている日本最大のPHPのイベントです。WEBサーバにインストールされているシェア8割を超える人気言語のイベントとして、初心者から上級者まで幅広い層のWEB系エンジニアが参加します。

今回はオフラインでの開催となる予定ですので、DROBEからも数名のメンバーが参加します。ブース出展も行いますので、ご参加される皆さまぜひ交流させてください。

PHP Conference Japan 2023 の概要

phpcon.php.gr.jp

開催日

October 8 - 8, 2023.

開催地

大田区産業プラザPiO.

さいごに

DROBE は「すべての人がポジティブに意思決定し、自分を楽しめる世界」というビジョンを目指し、パーソナルスタイリングサービス「DROBE」を提供しています。

技術コミュニティが創出した成果を活用することで、DROBEというプロダクトを素早くデリバリーすることが可能となっています。技術コミュニティの恩恵をただ享受するだけでなく、業界の発展のために微力ながら貢献していきたいと考え、活動の一環として技術イベントへのスポンサードを行っています。
今後も当社の活用する技術領域にまつわるスポンサードや情報発信を継続的に行っていきます!

当社の紹介

DROBEのプロダクト開発組織に関する情報をまとめておりますので、少しでも興味を持っていただけたらぜひ参照ください。 info.drobe.co.jp

LambdaとSQSを使ってWebhookによるスパイクに対応する

はじめに

CTOの都筑(@tsuzukit2)です。
DROBE では様々な外部サービスを利用していますが、事前に設定しておく事で外部サービス側で特定のイベントが発生した際に DROBE 側に HTTP のリクエストを送ってくれる仕組みを多く利用しています。

Webhook のわかりやすい例としては、例えば SendGrid のような外部サービスでメールの配信を行っている場合に、実際にメールの配信結果がどうだったかをアプリケーションで確実に検知したいといった場合に、SendGrid での配信完了イベントをトリガーとした Webhook を設定します。

SendGrid の Webhook を使ってメールの配信完了を Application で検知する例

こういった仕組みを使う事で、アプリケーション側ではメールの送信をリクエストした事だけではなく、メールがしっかりと配信された事、もしくはエラーとなってしまった事などを検知し DB に保存したり CS に役立てたりなどという事が出来ます。

この記事では Webhook を使う事によって短期間に大量のアクセスが返ってきてしまうような場合に DROBE がどのように対応しているかを解説します。

課題

そもそも Webhook で大量のアクセスが返ってきてしまう場合とはどういった場合でしょうか?先ほどの SendGrid の例で言うと、アプリケーションの機能として任意のタイミングでユーザー全員にメールマガジンを配信したい場合などが考えられます。

例えばユーザーが 100 万人のサービスで一斉に SendGrid のようなメール送信を行う外部サービスに送信リクエストを送ったらどうなるでしょうか? もちろんサービスの規約や内部実装に依存すると思いますが、多くの大規模なサービスではある程度大量のリクエストが来ても特に問題なく捌き素直に webhook を返すという挙動になると思います。その際にアプリケーション側では大量の HTTP リクエストが一気に来る事になるので (この場合では 100 万人分の Webhook が)、自身が作り出した DDOS 攻撃にアプリケーションがされされるというような状況になります。

Lambda と SQS でリクエストをスロットリングする

DROBE ではこの問題に対処するために Lambda と SQS を用いてリクエストをスロットリングする proxy を作って対処しています。

Webhook proxy の概要図

挙動はいたってシンプルで、外部サービスからのリクエストが来たら AWS Lambda によってリクエストを受けて Header と Body をそのまま SQS に入れていきます。同時に別の Lambda を SQS からのリクエストをデキューし、Header と Body を使って HTTP リクエストをアプリケーションに投げなおします。

ここで API Gateway からのリクエストを SQS に入れていくラムダ (api2sqs と呼んでいます) は並列で動作するように設定しておきつつ、SQS からメッセージをデキューしてアプリケーションに HTTP リクエストを投げ直すラムダ (sqs2upstream と呼んでいます) の並列実行数を制限しておきます。

こうする事で、アプリケーションサーバーに一切の変更を加える事なく、Webhook など大量の外部サービスからのアクセスによるアクセス増を一定のレートに制限する事が可能になります。(もちろん SQS の詰まり具合に応じて Webhook がアプリケーションサーバーに届くまでに一定のタイムラグがあるのでリアルタイム性が重要な機能には使えないという欠点があります。)

Terraform によるセットアップ

ここからは上記の webhook proxy の terraform によるセットアップを解説していきます。

前提として、API Gateway で利用するドメインはすでに準備が出来ていて zone までは手動で切ってあり、ドメインに来たアクセスは zone にくるようにセットアップが出来ているものとします。また SSL のための Certificate も ACM を使って事前に準備しているものとしています。

Lambda のソースコード

まずは API Gateway からリクエストを受け取って SQS に入れる所のラムダのソースコードを書きます。ここでは js で書いていますがどんな言語でも問題ありません。

const https = require('https');
var AWS = require('aws-sdk');

exports.handler = async (event) => {
  const sqsUrl = process.env.SQS_URL # ここでは terraform 側から変数として渡している

  const messageBody = {
    'resource': event.resource,
    'headers': JSON.parse(event.headers),
    'body': JSON.parse(event.body)
  }
  
  var params = {
    MessageBody: JSON.stringify(messageBody),
    QueueUrl: sqsUrl,
   };
  const sqs = new AWS.SQS();
  const queueResponse = await sqs.sendMessage(params).promise()
  
  return {
      "statusCode": 200
  };
};

次に SQS からデキューしてアプリケーション側に HTTP リクエストを投げ直す処理を書きます。

const https = require('https');

exports.handler = event => {
  const upstream = process.env.UPSTREAM # ここも terraform 側から変数として渡している
  const path = process.env.PATH

  event.Records.forEach(record => {
    
    const { body } = record;
    const parsedBody = JSON.parse(body)

    const data = JSON.stringify(parsedBody.body)
    const options = {
        host: upstream,
        path: path,
        port: 443,
        headers: parsedBody.headers
        method: 'POST',
    }

    const req = https.request(options, res => {
      console.log(`statusCode: ${res.statusCode}`)
    }).on('error', error => {
      console.error(error)
    })

    req.write(data)
    req.end()
  });

};

Lambda

次に Terraform で Lambda を作ります。

locals {
  function_name_sqs2upstream = "lambda_proxy_sqs2upstream-${var.app_env}"
}

locals {
  function_name_api2sqs = "lambda_proxy_api2sqs-${var.app_env}"
}

data "aws_iam_policy_document" "lambda_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]

    principals {
      type = "Service"
      identifiers = [
        "lambda.amazonaws.com",
      ]
    }
  }
}

resource "aws_iam_role" "lambda_proxy_sqs2upstream" {
  name               = "lambda_proxy-sqs2upstream-${var.app_env}"
  assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
}

resource "aws_iam_role" "lambda_proxy_api2sqs" {
  name               = "lambda_proxy-api2sqs-${var.app_env}"
  assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
}

data "aws_iam_policy" "lambda_execution_policy" {
  arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

# SQS からメッセージを受けたり、メッセージを削除する権限を付与する
data "aws_iam_policy_document" "sqs2upstream_policy_document" {
  source_json = data.aws_iam_policy.lambda_execution_policy.policy

  statement {
    effect = "Allow"

    actions = [
      "sqs:ReceiveMessage",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes"
    ]

    resources = [aws_sqs_queue.sqs_proxy.arn]
  }
}

resource "aws_iam_policy" "sqs2upstream_policy" {
  name   = "sqs2upstream_policy-${var.app_env}"
  policy = data.aws_iam_policy_document.sqs2upstream_policy_document.json
}

resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream" {
  role       = aws_iam_role.lambda_proxy_sqs2upstream.name
  policy_arn = aws_iam_policy.sqs2upstream_policy.arn
}

resource "aws_iam_role_policy_attachment" "lambda_proxy_sqs2upstream_vpc_execution_role" {
  role       = aws_iam_role.lambda_proxy_sqs2upstream.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
}

# SQS にメッセージを送ったり削除したりする権限を付与する
data "aws_iam_policy_document" "api2sqs_policy_document" {
  source_json = data.aws_iam_policy.lambda_execution_policy.policy

  statement {
    effect = "Allow"

    actions = [
      "sqs:SendMessage",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes"
    ]

    resources = [aws_sqs_queue.sqs_proxy.arn]
  }
}

resource "aws_iam_policy" "api2sqs_policy" {
  name   = "api2sqs_policy-${var.app_env}"
  policy = data.aws_iam_policy_document.api2sqs_policy_document.json
}

resource "aws_iam_role_policy_attachment" "lambda_proxy_api2sqs" {
  role       = aws_iam_role.lambda_proxy_api2sqs.name
  policy_arn = aws_iam_policy.api2sqs_policy.arn
}

data "archive_file" "lambda_proxy_sqs2upstream" {
  type        = "zip"
  source_file = "lambda-src/sqs2upstream/index.js"
  output_path = "lambda-src/sqs2upstream/index.zip"
}

resource "aws_lambda_function" "lambda_proxy_sqs2upstream" {
  filename         = data.archive_file.lambda_proxy_sqs2upstream.output_path
  function_name    = local.function_name_sqs2upstream
  role             = aws_iam_role.lambda_proxy_sqs2upstream.arn
  handler          = "index.handler"
  source_code_hash = data.archive_file.lambda_proxy_sqs2upstream.output_base64sha256
  runtime          = "nodejs12.x"

  publish                        = true
  reserved_concurrent_executions = var.reserved_concurrent_executions # 並列度は変数で渡す

  memory_size = 128
  timeout     = 3
  environment {
    variables = {
      UPSTREAM = var.upstream
    }
  }
}

resource "aws_cloudwatch_log_group" "lambda_proxy_sqs2upstream" {
  name              = "/aws/lambda/${local.function_name_sqs2upstream}"
  retention_in_days = 1
}

data "archive_file" "lambda_proxy_api2sqs" {
  type        = "zip"
  source_file = "lambda-src/api2sqs/index.js"
  output_path = "lambda-src/api2sqs/index.zip"
}

resource "aws_lambda_function" "lambda_proxy_api2sqs" {
  filename         = data.archive_file.lambda_proxy_api2sqs.output_path
  function_name    = local.function_name_api2sqs
  role             = aws_iam_role.lambda_proxy_api2sqs.arn
  handler          = "index.handler"
  source_code_hash = data.archive_file.lambda_proxy_api2sqs.output_base64sha256
  runtime          = "nodejs12.x"

  publish = true

  memory_size = 128
  timeout     = 3
  environment {
    variables = {
      SQS_URL = aws_sqs_queue.sqs_proxy.id
    }
  }
}

resource "aws_cloudwatch_log_group" "lambda_proxy_api2sqs" {
  name              = "/aws/lambda/${local.function_name_api2sqs}"
  retention_in_days = 1
}

SQS

SQS を作ります。設定は非常にシンプルです。

resource "aws_sqs_queue" "sqs_proxy" {
  name = "proxy-queue-${var.app_env}"
  tags = {
    Environment = var.app_env
  }
}

# この設定により SQS にイベントが入ってきたら sqs2upstream 関数が呼び出される
resource "aws_lambda_event_source_mapping" "sqs_proxy" {
  batch_size       = 1 # ここでは 1 にしているがデキューの並列度をあげたい場合はもっと大きな値にする
  event_source_arn = aws_sqs_queue.sqs_proxy.arn
  function_name    = aws_lambda_function.lambda_proxy_sqs2upstream.arn
}

API Gateway

最後に API Gateway を作ります。

resource "aws_apigatewayv2_api" "gw_api_proxy" {
  name          = "gateway_api_proxy-${var.app_env}"
  protocol_type = "HTTP"
}

resource "aws_apigatewayv2_stage" "gw_state_proxy" {
  api_id = aws_apigatewayv2_api.gw_api_proxy.id

  name        = "gateway_state_proxy-${var.app_env}"
  auto_deploy = true

  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.gw_lg_proxy.arn

    format = jsonencode({
      requestId               = "$context.requestId"
      sourceIp                = "$context.identity.sourceIp"
      requestTime             = "$context.requestTime"
      protocol                = "$context.protocol"
      httpMethod              = "$context.httpMethod"
      resourcePath            = "$context.resourcePath"
      routeKey                = "$context.routeKey"
      status                  = "$context.status"
      responseLength          = "$context.responseLength"
      integrationErrorMessage = "$context.integrationErrorMessage"
      }
    )
  }
}

resource "aws_apigatewayv2_integration" "gw_integration_proxy" {
  api_id = aws_apigatewayv2_api.gw_api_proxy.id

  integration_uri    = aws_lambda_function.lambda_proxy_api2sqs.invoke_arn
  integration_type   = "AWS_PROXY"
  integration_method = "POST"
}

resource "aws_apigatewayv2_route" "gw_route_proxy" {
  api_id = aws_apigatewayv2_api.gw_api_proxy.id

  route_key = "POST /"
  target    = "integrations/${aws_apigatewayv2_integration.gw_integration_proxy.id}"
}

resource "aws_cloudwatch_log_group" "gw_lg_proxy" {
  name              = "/aws/api_gw/${aws_apigatewayv2_api.gw_api_proxy.name}"
  retention_in_days = 1
}

resource "aws_lambda_permission" "api_gw" {
  statement_id  = "AllowExecutionFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.lambda_proxy_api2sqs.function_name
  principal     = "apigateway.amazonaws.com"

  source_arn = "${aws_apigatewayv2_api.gw_api_proxy.execution_arn}/*/*"
}

resource "aws_apigatewayv2_domain_name" "api_gw_domain" {
  domain_name = "${var.domain_name}"

  domain_name_configuration {
    certificate_arn = var.domain_cert_arn
    endpoint_type   = "REGIONAL"
    security_policy = "TLS_1_2"
  }
}

resource "aws_route53_record" "api_gw_domain_record" {
  name    = aws_apigatewayv2_domain_name.api_gw_domain.domain_name
  type    = "A"
  zone_id = var.zone_id

  alias {
    evaluate_target_health = true
    name                   = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].target_domain_name
    zone_id                = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].hosted_zone_id
  }
}

resource "aws_apigatewayv2_api_mapping" "api_gw_domain_mapping" {
  api_id      = aws_apigatewayv2_api.gw_api_proxy.id
  stage       = aws_apigatewayv2_stage.gw_state_proxy.id
  domain_name = aws_apigatewayv2_domain_name.api_gw_domain.id
}

変数の設定

最後に変数の定義を以下のように書いておしまいです。

app_env                        = "prod"
upstream                       = "example.com" # 向き先のドメイン
reserved_concurrent_executions = 1 # ここで並列度の定義をする
zone_id                        = "zone_id" # 事前に作成した ZONEid
domain_cert_arn                = "arn_of_domain_cert" # ACM で作った CertificateARN

まとめ

この記事では AWS Lambda と SQS を用いて、外部サービスによる大量アクセスに対応する方法について解説しました。高いリアルタイム性が要求されるようなアプリケーションには向かない方法ですが、既存のアプリケーションに一切手を加える事なく実現出来るので、状況によっては使いやすい手法なのではと考えています。

開発プロセス版ADRで振り返る開発チームの改善活動

こんにちは。EMの加川(@shinpr_p)です。
本記事は、開発プロセスの変更を残したADRを眺めながら、スクラムの適用など開発チームの改善と向き合ってきた2-3ヶ月を振り返ります。

はじめに

前提知識としてADRについて説明します。
ADRは Architecture Decision Records の略で、アーキテクチャの決定を記録したドキュメント群です。
「意思決定の過程を残すこと」が特徴で、「検討したが導入しなかったもの」や「運用を進めることで適切ではなくなったため廃止したもの」、「技術選定の過程で選ばれなかった選択肢」など、よく歴史的経緯と言われがちな過程の情報が残ることが好きで、私はここ数年よく利用しています。

cloud.google.com

実際のADR

実際のADRがこちらです。
意思決定の関係者はby nameが含まれるので、公開用に加工しています。
以下、それぞれの意思決定について振り返っていきます。

実際のADR

イベント(会議体)の運用

これまでは、よくある「スクラム風な開発プロセス」でした。
この提案から、「継続的に価値を届けられるよう成果やプロセスを点検できるチームになる」ことをテーマに、開発プロセスの見直しを行なっています。
この時点では明確に「スクラムのフレームワークを導入する」とは言っておらず、「スクラムのエッセンスを盛り込む」という表現をしていました。

変更したこと

  • タイムボックスを1週間から2週間にする
  • プランニングMTGの進行を見直す
  • 成果報告、進捗共有、各種振り返りと散らばっていたMTGを整理し、Sprint Review / Retrospective に集約

タイムボックスの変更

個人的には1週間のままで良かったのですが、スクラムイベントをしっかりやろうとしたときに「MTGの時間が多くなりすぎる」と言われがちなので、細かくやっていたMTGの時間と同等にできる2週間をリズムとすることにしました。
最近はステークホルダーとのコミュニケーションやデモを見ることの重要性をチームが感じ始めてきているため、いずれ1週間に変更ということもあり得るかもしれません。

プランニングMTGの進行を見直す

これまでは、やることの共有とタスクアサインの決定、定期リリースのためのリリース計画の確認を行なっていました。
それを、Sprint PlanningとしてSprint Goalの決定とSprint Backlogの作成、完了条件(Acceptance Criteria: AC)の認識合わせを行うように変更しました。
ステークホルダーの召集もアジェンダには入れていましたが、当初はあまり召集されていませんでした。直近のスプリントで一度招集し意義が感じられたことで、最近のスプリントでは積極的に召集が検討されるようになってきています。

Sprint Review / Retrospective

「週次のやったこと共有」「週次or隔週の障害振り返り」「月次の数値共有」「月次のKPT」と、レビュー・振り返りの要素が分散した状態で運営されていました。そこで、「リズムを作り、学びを次に活かしやすくする」ために、Sprint Review / Retrospectiveとして統廃合を行いました。
Reviewでは新たに「ゴールの共有」「できなかったことの共有」「デモ・ステークホルダーの召集」が盛り込まれ、振り返りはKPTからYWTに形を変えつつ「ネクストアクションを必ず出す」ことに重きをおくようにしました。

www.jmac.co.jp

印象に残っていること

「スクラムのエッセンスを盛り込む」と表現しているように、比較的ゆるめにフレームワークを適用しています。
そんな中、粒度が大きいPBIに取り組むときに「折角やるのだから守破離の守でMVPを意識した分割をしよう」というムーブメントが発生したことが印象に残っています。

AWS のコスト最適化について

チーム運営とは関係ないため本記事では割愛します

QAの振る舞いの変更

これまでは、コードレビューが終わったものをQAが品質保証し、保証されたものがリリースされるというフローでした。
これを、以下のように変更しようとしています。

  • プロダクトにとって重要な機能を定義し、重要な機能はQAが品質保証をし、リリースする
  • 重要な機能以外は、基本的にはエンジニアが品質保証をし、リリースする
  • QAの基本的な振る舞いを次のように定義する
    • 完了条件に関与し、品質保証の観点が盛り込まれている状態にする
    • コードレビューに関与し、品質保証の観点でフィードバックをする
  • ゆくゆくは、PdM/エンジニアが品質の知識を身に付け、QAではなくチームが品質を保証できるようにする(QA Enabling)

現状はまだまだ道半ばですが、少しずつでも前進していきます。

DROBEのリリースフローの変更について

手動で作業が必要な箇所を一部自動化し、リリースにかかる時間を短縮しました。
また、直近Four Keysの計測が始まったこともあり、これまでは定期リリースだったものが、ある程度まとめてリリースになり、現在ではリリースできる状態になったら順次リリースが行われるようになっています。

デプロイ頻度の推移

今後は「変更のリードタイム」を改善していく予定です。Sprint Backlogでのタスク分解にトライしつつ、放置気味だったPRのCloseや古くからあり対応に苦慮していたタスクのリリースに向けた作業に取り組んでいます。

MLが関係する開発のプロセス

MLの領域は専門性が高く、透明性が担保しづらい(実験や実装が特定個人に閉じてしまい、他のメンバーが内容を把握できていない状態)という課題がありました。
また、MLと機能開発のチームがゆるく分離しており、活動の母体が2つあるような状態となっていました。これにより、何もしないと足並みを揃えることが難しいという課題もありました。

これらの課題を緩和すべく、以下の変更を加えました。

  • MLの活動リズムも2週間にし、タイムボックスの終わりにReview(MLエンジニアの活動状況や実験結果の共有)とRetrospective(MLにまつわるプロセス・システムを眺め、改善することがないか探る)を実施することにした
  • Discoveryという概念を作り、機能開発との連携に活用するようにした
    • 実験の状況(実験したい項目がいくつあり、どれに取り組んでいて、どのPBIと関係しているのか)が分かりやすくなる
    • 実験で明らかにする観点が明確になる

ReviewにはPdMも招待しており、MLによってできることの理解やPdMが困っていることの把握といった相互理解を育むことも期待しています。
私が入社する以前にはML開発チームがステークホルダーとコミュニケーションを取りながらMLで何をやろうかというディスカッションをしていたらしいですが、最近では実施されなくなってしまっていました。
MLとビジネス/ユーザー価値を繋げていかないと価値の種が枯渇してしまうため、本活動をブラッシュアップさせていくことが急務だと感じています。

ビルドトラップを回避するため、Story Issueのテンプレートとフローを変更する

MLの活動をきっかけに、Discoveryの概念を機能開発チーム(以下スクラムチーム)全体に適用することにしました。
端的に書くと、「以下の観点を満たさないものはProduct Backlogに積まないので、Discoveryという過程を経てそれらを明らかにしてね」ということを強制するようになりました。

  • ビジネス的な価値(複利の効くInvestの削減 or Investを上回るReturn)があるか
  • ユーザーにとって価値があるか
  • ユーザーが使い方がわかるか
  • 現実的な工数での実現が可能か

Issue(当社ではGitHub Issueを用いている)のテンプレートに4つの観点をチェックボックスとして追加し、POは4つの観点が満たされていると判断したらPBIに優先順位をつけてProduct Backlogに積むようにしています。
まだ試行錯誤中のため少々Product Backlogが心許なくなっていますが、協力し乗り越えていきたいと考えています。

Sprint Goalに関する変更

Sprint Goalを廃止する

直近のSprint Planningの中で、「このPBIをやることがSprint Goalと同義であるため、あえて抽象的なゴールを作ると混乱するのではないか?」という課題提起があり、それに対する明確な対案が見出せなかったことから、Sprint Goalを廃止したスプリントに取り組んでみることにしました。
PlanningのMTGの中で「Sprint Goalを合意する」アジェンダに要する時間がそれなりの比重となっていたことも、意思決定を後押しした要素となっています。

Sprint Goalを導入する

Sprint Goalの無いスプリントに取り組む中で、「Sprint Goalがないことの課題 / あることのメリット」の言語化が進みました。その説明をし、結果的にSprint Goalは設けようという結論になり、最近のスプリントからはゴールを考えるようになっています。

  • 優先順位としてはROIが高いものが上がりやすく、1つの施策(タスク)で大きい効果が見込まれるもの(Returnが高く、Investが低いもの)から順に着手することになりがちである
    • Sprint Goalというテーマがあることで、相対的にROIが高くない(Return > Investであるのは必須)タスク群に取り組むことも可能になるのではないか?
  • ゴールが無いことで「ゴールを達成するためにこれをやろう」ではなく「このタスク群を終わらせることがゴール」になってしまい、創意工夫の余地がなくなってしまう
    • 変化に弱くなってしまう
    • 不確実性を取り込むことが困難である

最近好きなスライドがあって、ここに書かれている考え方も参考にしました。

www.docswell.com

今後は「Sprint Goalを合意するために全員が集まる時間をどう短縮するか」と「Sprint Goalを達成するためのタスク群を如何に早く作り込むか」という課題に向き合っていきます。

さいごに

ADRを眺めながら、この2-3ヶ月を振り返りました。
チームはまだまだ改善の真っ只中ですが、変化が起きていることや試行錯誤できていることをポジティブに感じています。
今後も自己管理されたチームを目指して改善を続けていきます。


DROBE開発組織の紹介
組織情報ポータル