DROBEプロダクト開発ブログ

DROBEのプロダクト開発周辺の知見や考え方の共有をしていきます

LLM をサービスから使うために Lambda で API Wrapper を作る

はじめに

こんにちわ、DROBE の都筑です。

OpenAI の提供する API を始めとして LLM をサービスで活用されている、もしくはこれから活用しようとしている方は多いと思います。 一方で、OpenAI が公式に提供している Library は Python と Node.js のみなので既存のサービスに直接インテグレーションする事が難しい場合も多いのではないでしょうか。

そういったケースに対応するために、 AWS Lambda で API の Wrapper 関数を作り公開し、サービスからはその API を叩くという構成のシステムをセットアップしたのでこの記事で解説します。

作るもの

以下のような構成のものを GitHub actions と terraform で構築していきます。

全体の構成

Lambda を採用した理由は以下になります。

  • LLM を利用した機能を実装するサーバーになるが、入出力を揃えて API を叩くだけで重たい処理を行わない
  • ECS などのサービスを利用して常時 API を立てておくよりも金額的なメリットがある
  • 弊社が AWS を中心にインフラを構築しているので使いやすい

Lambda は python の container を利用します。Lambda の実行時に SSM から OpenAI の API Key を取得して API を叩く構成です。 クライアントからはカスタムドメインを貼った API Gateway 経由で Lambda にアクセスする構成としています。Log は Cloudwatch に、Metrics は Datadog に送り Lambda の実行回数などをモニタリングできるようにします。

解説

では、それぞれ解説していきます。

Lambda 関数の設計

まず Lambda 関数の設計ですが、以下のような要件を満たせるようにしたいと考えました。

  • LLM を利用したい機能は複数出てくる事が想定されるので、新機能を作るたびにインフラを作らなくて良いようにしたい
  • OpenAI 以外の API も気軽に試せるような構成にしたい

これらを踏まえて、以下のような Clean Architecture 風な File 構成で Lambda の内部を書きました。

├── main.py
└── src
    ├── domain // interface の定義
    ├── infra // interface の実装
    │   └── openai // openai に依存した API の定義 (prompt もこの下で定義)
    └── usecase
        └── interactor.py

main.py には lambda の entry point となる関数を書きます。 main.py の主な役割は DI (interactor の準備) と request の validation と response の整形です。

domain に usecase で使う処理の interface を定義します。例として与えられた入力から特定の商品情報を抜き出す処理の interface は以下のような定義にしました。

RelatedItemExtractionResponse = TypedDict(
    "RelatedItemExtractionResponse", {"items": List[str]}
)


class IFRelatedItemExtraction(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def handle(self, request_text: str) -> RelatedItemExtractionResponse:
        pass

interactor 内部で business logic を呼び出す処理を書きます。ここでは LLM の呼び出し処理を行います。もちろん、このレイヤーは infra には直接依存しないレイヤーになります。

class Interactor:
    def __init__(
        self,
        repo: IFRelatedItemExtraction,
    ):
        self.repo = repo

    def extract(self, request_text: str) -> RelatedItemExtractionResponse:
        return self.repo.handle(request_text)

実際に API を呼び出す処理や prompt の定義は infra 以下に書きます。特定の商品情報を抜き出す処理の実装はこのような形です。

MODEL_NAME = "gpt-3.5-turbo-16k" # gpt-4 なども指定できる。Application に合わせて指定する。

SYSTEM_PROMPT = "" # システムプロンプト を書く

PROMPT = """
{passage}
"""

class OpenAIRelatedItemExtraction:
    @timeout_decorator.timeout(60) # timeout させるための decoration
    def chat_complete(self, request_text: str):
        return openai.ChatCompletion.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": PROMPT.format(passage=request_text)},
            ],
            max_tokens=4098,
            temperature=0,
        )

    def handle(self, request_text: str) -> RelatedItemExtractionResponse:
        output = self.chat_complete(request_text)
        content = output["choices"][0]["message"]["content"]
        return json.loads(content)

ポイントはモデルの指定やプロンプトをここに書いている事です。プロンプトなどは実装の詳細なので、ifra 以外のレイヤーでは意識しなくて良くなり、プロンプトの修正や呼び出し方の変更 (例えば Function calling を使うように修正するなど) をしても、infra レイヤー以外の修正は行わなくて良くなります。

また、こういった構成にしておくことで、例えば OpenAI をやめて別の LLM に変更するなどをする場合でも、domain で定義された interface に沿って infra を新規で作り main.py で interactor を作る時にどちらを使うかを選べるため、 比較コードの修正量を抑えながら AB テストなどを行う事ができます。

Lamda へのリクエストは以下のような形に統一しています。

{
  "type": "function_name",
  "input": "input_text"
}

type によって interactor のどの処理を呼び出すかを決定するようにします。 この構成により、新しく Lambda を Deploy しなくても Lambda のソースコードを変更することで新しいを機能を追加できます。 もちろんコンテナのサイズが肥大化していくという懸念はありますが、API の Wrapper であることを想定しているのである程度までは機能を増やしても問題ないと考えました。

CI/CD

コンテナのビルドと Lambda の更新は GitHub Actions で行います。

CI/CD

Actions は以下のような yaml で定義しました。

name: Build

on:
  push:
    branches:
      - main

env:
  CONTAINER_NAME: xxx # make の中で定義しているコンテナ名
  REPO_NAME: yyy # ECR repo の名前
  LAMBDA_FUNCTION_NAME: zzz # terraform で定義する Lambda 関数の名前

jobs:
  build:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.ML_AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.ML_AWS_SECRET_ACCESS_KEY }}
          aws-region: ap-northeast-1

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Build and push to ecr
        run: |
          make build # コンテナを build するためのコマンド。コンテナ名はこの内部で定義している。
          docker tag ${CONTAINER_NAME}:latest ${REPO_NAME}:latest
          docker push <repo-name>:latest
          docker tag ${CONTAINER_NAME}:latest ${REPO_NAME}:${GITHUB_SHA}
          docker push ${REPO_NAME}:${GITHUB_SHA}
  
      - name: Deploy to lambda
        run: |
          aws lambda update-function-code --function-name ${LAMBDA_FUNCTION_NAME} --image-uri ${REPO_NAME}:latest

環境の構築

Lambda の実行環境は terraform で作ります。 それぞれのリソースに合わせて file を分割して作りました。 ここでは point となる lambda function と api gateway の部分だけピックアップして説明します。

まず lambda function の定義です。

resource "aws_lambda_function" "service_llm_lambda" {
  depends_on = [
    aws_cloudwatch_log_group.service_llm_lambda, # cloudwatch の log group を定義しておきます
  ]

  function_name = "service-llm-lambda-${var.app_env}"
  package_type  = "Image"
  image_uri     = "${aws_ecr_repository.ecr_repo.repository_url}:latest" # ECR repository の名前です
  role          = aws_iam_role.lambda_role.arn # Lambda の実行時の権限です。後述しますが SSM から機密情報を読み出せる権限を渡しました

  memory_size = 128
  timeout     = 60 # OpenAI の API は通常の API よりも実行時間が長い傾向があるので長めにしておきます

  lifecycle {
    ignore_changes = [
      image_uri, last_modified
    ]
  }

  image_config {
    command = ["datadog_lambda.handler.handler"] # 後述しますが Datadog にメトリクスを送るためにコマンドを上書きしておきます。
  }

  environment {
    variables = {
      DD_LAMBDA_HANDLER = "main.lambda_handler", # コマンドを上書しているので、実際のハンドラーはここに記載します。
      DD_SITE           = "datadoghq.com",
      DD_API_KEY        = data.aws_ssm_parameter.dd_api_key.value, ## Datadog の API key を parameter store から渡します。この書き方だとコンソールから鍵が見えてしまうので実際はそうならないように工夫が必要です。
      DD_TRACE_ENABLED  = "true"
    }
  }

}

api gateway はこんな感じです。HTTP モードでの設定とし、あらかじめ Route53 に作っておいたカスタムドメインと紐付けます。

resource "aws_apigatewayv2_api" "gw_api" {
  name          = "gateway_api_service-llm-lambda-${var.app_env}"
  protocol_type = "HTTP"
  cors_configuration {
    allow_origins = ["*"] # FE から直接呼び出したいというニーズがある場合には CORS 設定をしておく。実際には `*` ではなくちゃんとドメインを指定しましょう
    allow_methods = ["POST", "GET", "OPTIONS"]
    allow_headers = ["*"]
    max_age       = 300
  }
}

resource "aws_apigatewayv2_stage" "gw_stage" {
  api_id = aws_apigatewayv2_api.gw_api.id

  name        = "gateway_api_service-llm-lambda-${var.app_env}"
  auto_deploy = true

  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.gw_lg.arn
}

resource "aws_apigatewayv2_integration" "gw_integration" {
  api_id = aws_apigatewayv2_api.gw_api.id

  integration_uri    = aws_lambda_function.service_llm_lambda.invoke_arn
  integration_type   = "AWS_PROXY"
  integration_method = "POST"
}

resource "aws_apigatewayv2_route" "gw_route" {
  api_id = aws_apigatewayv2_api.gw_api.id

  route_key = "POST /invoke"
  target    = "integrations/${aws_apigatewayv2_integration.gw_integration.id}"
}

resource "aws_cloudwatch_log_group" "gw_lg" {
  name              = "/aws/api_gw/service-llm-lambda"
  retention_in_days = 1
}

resource "aws_lambda_permission" "api_gw" {
  statement_id  = "AllowExecutionFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.service_llm_lambda.function_name
  principal     = "apigateway.amazonaws.com"

  source_arn = "${aws_apigatewayv2_api.gw_api.execution_arn}/*/*"
}

resource "aws_apigatewayv2_domain_name" "api_gw_domain" {
  domain_name = "${var.app_env}.xxx" # あらかじめ route53 に作っておいた domain 名

  domain_name_configuration {
    certificate_arn = var.domain_cert_arn # あらかじめ ACM で取得しておきた certificate の ARN を指定
    endpoint_type   = "REGIONAL"
    security_policy = "TLS_1_2"
  }
}

resource "aws_route53_record" "api_gw_domain_record" { # ここで Route53 の ZONE に API Gateway にリクエストを向けるための A record を追加
  name    = aws_apigatewayv2_domain_name.api_gw_domain.domain_name
  type    = "A"
  zone_id = var.zone_id

  alias {
    evaluate_target_health = true
    name                   = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].target_domain_name
    zone_id                = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].hosted_zone_id
  }
}

resource "aws_apigatewayv2_api_mapping" "api_gw_domain_mapping" {
  api_id      = aws_apigatewayv2_api.gw_api.id
  stage       = aws_apigatewayv2_stage.gw_stage.id
  domain_name = aws_apigatewayv2_domain_name.api_gw_domain.id
}

設定のポイント

Timeout について

OpenAI の API を通常の API よりも実行時間が長い傾向があります。(公式の python sdk の設定 は 2023年8月現在、default の timeout はなんと 600sec です!)

そのため Lambda の実行時間を 60sec など長めにとっておくのが良いと思います。

また、コード自体でも timeout を適切に入れておくのが望ましいです。 今回は OpenAI の SDK に依存する形ではなく timeout-decorator という library を使って時前で timeout させるようにしました。

@timeout_decorator.timeout(60)
    def chat_complete(self, request_text: str):
        return openai.ChatCompletion.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": PROMPT.format(passage=request_text)},
            ],
            max_tokens=2048,
            temperature=0,
        )

API Key をどこから取り出すか

API Key は機密情報なので Lambda の内部で ssm を経由して取り出すようにしました。 Lambda の Execution role に SSM の該当リソースを取得してデコードする権限を渡しておきます。

data "aws_iam_policy_document" "lambda_execution_policy" {

  ... 省略

  statement {
    effect = "Allow"
    actions = [
      "kms:Decrypt",
      "ssm:GetParameter"
    ]
    resources = [
      "arn:aws:ssm:*:*:parameter/xxx" # parameter 名
    ]
  }
}

python 側では Lambda 起動時に以下のような処理が走るような実装にしています。ただしこれは OpenAI のみで必要な処理なので infra レイヤー内部に実装を入れています。

import boto3
import openai

ssm = boto3.client("ssm", region_name="ap-northeast-1")
response = ssm.get_parameter(
    Name="xxx", WithDecryption=True
)
openai.api_key = response["Parameter"]["Value"]

Datadog との接続

Datadog にメトリクスを送るには、以下の 2 点を行う必要があります。

  1. コンテナに Datadog に関連するライブラリを入れておく
  2. コンテナな起動設定でエントリポイントを Datadog のライブラリに上書きし、必要な環境変数を設定する

1 に関しては Dockerfile に public.ecr.aws/datadog/lambda-extension:latest から設定をコピーする設定を書きつつ、 datadog-lambda ライブラリをインストールします。今回は python のパッケージ管理を poetry で行っているので、 pyproject.toml に設定を追記します。

FROM public.ecr.aws/lambda/python:3.10

... 省略

COPY --from=public.ecr.aws/datadog/lambda-extension:latest /opt/extensions/ /opt/extensions

... 省略

RUN poetry install --no-root
[tool.poetry.dependencies]
python = "^3.10"
openai = "^0.27.8"
boto3 = "^1.28.22"
timeout-decorator = "^0.5.0"
datadog-lambda = "^4.78.0" # ここ

詳しい設定はこちらを参考にしてください。

docs.datadoghq.com

設定が完了した上で Lambda を実行すると、コンソールの Menu で Infrastructure -> Serverless 内で図のようにメトリクスが表示されるようになります。

Datadog のコンソール

ハマりポイント

Lambda 関数の event について

api gateway で integration した場合には event の中身が変化するという事を理解できておらず、ややハマりました。

コンソールからのテストでは正常に動きましたが、postman などで api を試すと internal serveer error が返ってきてしまいました。

{"message":"Internal Server Error"}

調べていくと、公式に以下のような解説がありました。

ペイロード形式バージョンでは、API Gateway が Lambda 統合に送信するデータの形式と、API Gateway が Lambda からのレスポンスをどのように解釈するかを指定します。ペイロード形式バージョンを指定しない場合、AWS Management Console はデフォルトで最新バージョンを使用します。

docs.aws.amazon.com

Api Gateway からの呼び出しの際には event の構造が変化するので、それに対応するように event からパラメータを取り出したり、レスポンスの形式を揃える必要があります。

インフラ的に解決する道もあると思いますが、簡易的に以下の関数を main.py に定義して対応しました。

def get_request(event):
    if "type" not in event:
        event = json.loads(event["body"])

    request_type = event["type"]
    request_text = event["input"]

    return request_type, request_text

def create_response(response: dict):
    return {
        "statusCode": 200,
        "body": json.dumps(response),
    }

さいごに

OpenAI などの LLM を活用するための Lambda の構成について解説しました。 がっつり使う場合はまた別の構成があると思いますが、LLM をちょっとした機能で使う場合には出来る限りインフラを追加で作らずに機能を追加できるような構成にしておく事にはメリットがあると思います。


DROBE開発組織の紹介
組織情報ポータル