DROBEプロダクト開発ブログ

DROBEのプロダクト開発周辺の知見や考え方の共有をしていきます

DROBE Engineer Night #5 "質とスピードを追求する開発チームでの取り組み" を開催しました

こんにちは。DROBEの角田です。

2023年9月5日に、DROBEが主催するテックイベント『DROBE Engineer Night』の第5回を開催しました。今回は、"質とスピードを追求する開発チームでの取り組み" というタイトルで、株式会社リンケージ・ピクシブ株式会社と弊社の3社合同で開催しました。

drobe.connpass.com

現地・オンラインとも非常にたくさんの方にお越しいただき、盛況な会となりました。

弊社からは、『素早いバリュー提供のための
DROBEの新戦略』というタイトルで、現在行なっている顧客へ素早く価値を届ける試行錯誤についてご紹介させていただきました。

speakerdeck.com

リンケージさん・ピクシブさんには非常に積極的にご協力いただき、発表内容も開発・本番環境の差分を減らすであったり、防火壁を持つという考え方など、弊社にも取り込める内容が散りばめられた素敵なお話で、大変参考になりました。ありがとうございました。

今後も定期的にテックイベントを開催し技術的な発信をしてまいりますので、ご期待ください!

LLM で長文から構造化データを抽出する

はじめに

こんにちは、DROBE の都筑です。

みなさん LLM 使っていますか。今回は LLM を利用して長文から構造化データを抽出する手法について記載します。

構造化データの抽出

LLM を利用して構造化データを抽出することを Extraction と呼びます。 Extraction は以下のようなユースケースが考えられます。

  • テキスト情報から構造化したデータを抽出し DB にインサートする
  • 外部 API を呼ぶために入力を解釈してパラメータを抽出する

Extraction は非常に有用ですが、元となるテキストの最大長は利用する LLM の最大 token 数に依存します。

LLM と長文の処理

長文を LLM で扱うユースケースとしては文章要約がアプリケーションとして想定されることが多く、いくつかの方法が考案されています。LangChain の公式ドキュメントを覗くと、以下の 3 つの手法が提案されていました。

  1. Stuff

  2. Map-Reduce

  3. Refine

1 つめの Staff とは単純に全入力を LLM に突っ込むという手法です。2023年9月現在、最も手軽に使える OpenAI の gpt3.5-turbo では最大 16k token が使えるのでそれで充分であれば特に難しいことを考える必要はありません。また anthropic 社の Claude2 など 100k token に対応しているモデルも登場しており、近い将来 token の事を気にする必要はなくなりそうです。

2 つめの Map-Reduce は、入力をチャンクに分割しそれぞれに対して要約、最後に全ての要約を入力として一つの要約に纏めるというものです。Map-Reduce という名前でピンとくる方もいると思いますが、この手法は処理を並列化できて後述の Refine に比べて高速に動かすことが出来るという利点があります。

3 つめの Refine は、入力をチャンクに分割し、順番に要約をしていくのですが、最初のチャンクの要約を次のチャンクの要約時に入力として一緒に与えてあげるということをします。この手法では直前のチャンクの要約結果を待たないと次のチャンクの要約を行えないので処理を並列化することが出来ずに遅くなりますが、Map-Reduce に比べて内容をより正確に要約できるとのことです。

長文データから構造化データを抽出する手法

DROBE では長文のデータからタグを抽出して DB に保存する、というタスクを行おうとしていますが、LangChain の思想を参考にして以下のようなデータパイプラインを設計しました。

長文データからの抽出タスクパイプライン

以下が大まかなステップです。

  1. 文章をチャンクに分割する
  2. 分割された文章からタグを抽出する
  3. 抽出されたタグをマージしユニークな配列にする

最初のステップとして長文のデータをチャンクに分割していますが、これにより token 数がすくない LLM でも情報の抽出が可能になります。チャンクへの分割は一見簡単なように見えて難解なトピックです。一般的には以下のような挙動が期待されます。

  1. 文章の意味が失われないように分割を行う
  2. 分割されたチャンクを指定したサイズを超えるまで結合していく
  3. チャンクが指定サイズに達したら、次のチャンクを作る

この処理は、日本語の場合はなかなか難しい問題です。普通に実装する難易度が高いと考え、今回は Text Split は gpt3.5 にやらせることにしました。精度の必要なタグの抽出は gpt4 にやらせ、分割はより多くの token を扱える gpt3.5 で行うといった工夫です。

ちなみに、LangChain では TextSplitter というクラスが用意されています。

python.langchain.com

実装を覗いてみると、正規表現などを使った Splitter の他に、 all-mpnet-base-v2 model を利用した splitter も準備されていました。

huggingface.co

英語であれば LangChain が準備してくれている Splitter を使ってみると良さそうに思いました。

おわりに

長文のデータから構造化データを取得する方法を紹介しました。LLM の進化によりこういった手法などはいずれ必要なくなると考えていますが、現時点ではまだまだ扱える token サイズに制限があるためにこういったやりかたを考える必要があります。同じように困っている方の参考になれば幸いです。

参考

Extraction | 🦜️🔗 Langchain

Summarization | 🦜️🔗 Langchain

SAチーム化も見据え開発チームにLeSSを導入したはなし

はじめに

EMの加川(@shinpr_p)です。
ここ数週間で、DROBEの開発チーム体制に変化がありました。今回は体制変更の背景や考慮した事、どのようにチーム体制を考えたかを記事にしてみました。
チームを取り巻く状況によって選択肢は異なりますが、1つの事例としてチーム体制の参考となれば嬉しいです。

※ 余談ですが、この文章は社内向けに体制変更を説明した際に書いたものとある程度同じ内容になっています。

体制変更の背景

現在のDROBE開発チームは、組織観点においてふたつの課題を抱えています。

ひとつは、開発チームメンバーの多さです。

全員が集まるMTGとなると参加者が10名を超えてきます。10名を超えるとコミュニケーションパスが複雑になり過ぎ、性質に依らず話しづらさを感じたり話さなくなったりする人が出てきます。この状態はスクラムの三本柱である「検査」「適応」「透明性」が損なわれることを意味し、課題発見の遅延によってアジリティが失われてしまいます。

参考: ミラーの法則

makitani.net

もうひとつは、開発チームが関心を持つ責務領域です。

注力するビジネスミッションが複数ありそれぞれに担当PdMが存在するが、開発チームはひとつである。プロダクトは事業全体に関係するが、ビジネスミッションの集合では事業全体をカバーできない(取捨選択の観点で意図的にそうしている)。結果として、ビジネスミッションにも優先順位をつけることになり、重要なはずなのに開発着手されない状態が続くことがありました。一方で、開発チームが取り組むビジネスミッションが切り替わるタイミングでは、チームは連続性を失い、価値を積み上げている実感が薄くなってしまうという課題もありました。

また、プロダクトとしては稼働しており利用者もいるがビジネスミッションからは外れる領域に対して、関心を向けることが許容されにくい状態になっていたことも課題です。限られた開発リソースを取捨選択することは重要ですが、関心ごとの境界としてはハードであり優先順位をつける俎上に載せることすらできないという認識を持つ人が多くなってしまっていました。

これらの課題を解決するため、開発体制の変更を行うことにしました。

体制変更の詳細

今回の体制変更では、チームの分割を行います。チーム分割によってふたつの課題に対処していこうと考えています。
チーム体制は以下のようにしました。

LeSS

この体制に至った経緯

以下、どのような思考プロセスを経てこの体制にしたのかを説明していきます。

1. バリューストリームによるPdMの責務領域の整理

体制を考えるにあたり、DROBEにおけるバリューストリームを可視化する活動が行われました。より正確に表現すると、チーム分割をしたいと考えていた中でPdMの責務領域を整理するタイミングがあり、バリューストリームに沿って責務領域(以下、ビジネスドメイン)を提案しました。

このプロセスによって可視化されたバリューストリームマップは以下の通りです。 本項周辺についてはPO長井による別記事が近日公開予定です。詳細はそちらを参照 & お楽しみに...

VSM

参考:バリューストリーム / バリューストリームマッピングとは

www.atlassian.com

2. Team Topologiesによる将来のチーム体制の明示

可視化されたバリューストリームと担当PdMの情報をもとに、Team Topologiesの考え方でチームの構造を定義しました。

ちいとぽ

参考:チームトポロジーとは

www.ryuzee.com

点線で表現された箱は現時点でまだ実態として存在していない概念です。

  • xOpsはエンジニア採用が進捗し開発キャパシティが増加したタイミングでビジネスオペレーションのプラットフォームとして、中長期的なミッションを持ったチームの組成を検討したいと考えています
    • ミッションは「一連のビジネスオペレーションを包括した、全社のスループット向上」です
  • 倉庫の領域は現時点で両チームに関係していますが専門チームを作るほどの開発キャパシティはないため、当面は両チームで分担して開発・運用を行っていきます
  • MLについては、プラットフォームチーム化した上でプラットフォーム + イネーブリングという振る舞いに変えていこうと考えています

今回の「チーム体制の変更」で対象となるのは以下の2つです。これらは、チームとして自律的に活動できるようになった時点で、2つのストリームアラインドチーム(以下、SAチーム)として独立させることも選択肢として考えています。

  • 商品供給チーム
    • DROBE事業の商品調達から商品価値向上に伴う販売の促進を主務として担うチーム
  • 顧客体験チーム
    • DROBE事業のマーケットからの顧客獲得および購入体験向上伴う販売の促進を主務として担うチーム

3. スクラムの運用を考える

2の後半で示した通りチームとして自律的に活動できるようになった時点で物理的なチーム分割を検討します。スクラム的に言うとふたつのスクラムチームになります。裏を返すと現時点ですぐにふたつのスクラムチームにすることはできないということです。そのため、冒頭で示したLarge Scale Scrum(以下、LeSS)でのチーム分割手法を採用しました。

再掲: チーム体制

LeSS

参考: Large Scale Scrumとは www.atlassian.com

DROBE開発チームでは2週間スプリントでスクラムを運用しています。LeSSにすることで、イベントの流れが以下のようになります。

イベントの流れ

以下が、我々のチーム固有の状況に合わせたカスタマイズになります。

  • 各ビジネスドメインの担当PdMのことを、「代表者」と定義しています。スクラムにはPdMという役割が存在しないため、代表者にマッピングをしました。SAチームとして独立する際には、代表者ではなくPOとなり、スクラムチームから生み出される価値を最大化することに説明責任を持つようになる想定です。
  • チームは担当ビジネスドメインの専門家になることが期待されています。一方で、今回はハードなチーム分割をしない選択肢を取っているため、プロダクト全体を薄く把握した上で担当ビジネスドメインを習熟していくことを期待しています。Sprint Goalによっては他チームのヘルプに行くこともありますし、全体把握を目的に相手チームが担当するビジネスドメインのタスクを受け持つこともあります。
  • Sprint Goalは、事前にPOが素案を提出することで方向性を示します。Backlog Refinement、Sprint Planning1を経て今スプリントで取り組むPBIが明確になるため、Sprint Planning2にてそれらを持ち寄り、各チームがSprint Goalを決定します。Planning2にて方向性を見直したいとなった場合は、POとすり合わせを行いフィックスさせます。
  • 「トラベラー」は、各チームには所属せず各チームでその専門性が必要とされた時に支援に入ります
    • デザイナーは1名しかおらず両チームに所属できないため、デザインの観点でイベントやMTGに参加してほしい時に声をかける運用となります
    • xOps担当PdMはOpsのIssueを都度Planning1に持ち込み、優先順位をつけスプリントに組み込んでいく活動をします。また、必要に応じて各チームに所属しxOps関連タスクのデリバリーを支援します。

体制変更後の状態

最後に、体制変更から1スプリントが経過した現在(2023/08/29)の所感を書いて、本記事を締めたいと思います。 現時点でいくつかポジティブな変化が見えています。

  • チーム内の発言量が増しました
  • 意識するビジネスドメインが狭まったことで、コンテキストが揃っている前提で会話ができるようになりました
  • デプロイ頻度と変更のリードタイムが改善しています
    • ここは他の要因があるかもしれないのですが、タイミング的には一致しているので経過を見守っていきたいと考えています
      デプロイ頻度と変更のリードタイム

一方で、改善したいこともいくつか見えてきています。

  • 1つのゴールに向き合う開発者が減った中で、これまでの進め方だと結果非効率になるケースが散見されています
    • リソース効率で動いていることがあり、フロー効率で見ると改善の余地があります
    • チームの境界が曖昧になっている時が稀にあり、チームのゴール達成やチームを成長させるために頭を使う(新しい価値を作る)ことをもっとやれるはず
  • チームが別れリモートワークも相まって別チームのメンバーとのコミュニケーションが減ってしまった
    • 開発の観点ではポジティブかつ想定される効果ですが、分断はさせたくないためバランスを取る必要があります
    • 夕方に任意で雑談を行う「Coffee Time」を設けてみましたが、効果のほどはまだ不明です
  • 新しい概念をそれなりに取り入れているため、まだ理解が浸透していない(それはそう)
    • ここはSM(含む自分)ががんばっていきます

どんなフレームワークも運用がキモなので、これから数ヶ月よりよい状態に持っていけるように取り組んでいきます!!

PyCon APAC 2023にシルバースポンサーとして協賛します

EMの加川(@shinpr_p)です。
DROBE は PyCon APAC 2023 にシルバースポンサーとして協賛します。

PyCon APACは、プログラミング言語「Python」を中心としたボランティアによる非営利の年次カンファレンスです。このカンファレンスの目的は、Pythonプログラミング言語とその周辺技術を探求し、議論・実践できる場を提供することです。日本での開催は2013年以来の10年ぶりとなります。

DROBEからも数名のメンバーが参加します。ブース出展も行い、ノベルティも現在鋭意製作中です!当日は現地で交流できることを楽しみにしています。

PyCon APAC 2023 の概要

2023-apac.pycon.jp

開催日

2023.10.27(Fri)-28(Sat)

開催地

TOC有明コンベンションホール
東京都江東区有明3丁目5番7号

さいごに

DROBE は「すべての人がポジティブに意思決定し、自分を楽しめる世界」というビジョンを目指し、パーソナルスタイリングサービス「DROBE」を提供しています。

技術コミュニティが創出した成果を活用することで、DROBEというプロダクトを素早くデリバリーすることが可能となっています。技術コミュニティの恩恵をただ享受するだけでなく、業界の発展のために微力ながら貢献していきたいと考え、活動の一環として技術イベントへのスポンサードを行っています。
今後も当社の活用する技術領域にまつわるスポンサードや情報発信を継続的に行っていきます!

当社の紹介

DROBEのプロダクト開発組織に関する情報をまとめておりますので、少しでも興味を持っていただけたらぜひ参照ください。 info.drobe.co.jp

golang で Headless Browser によって動的に画像を生成する

はじめに

こんにちは、DROBE の都筑です。 この記事では Go 言語によって動的に画像を生成する Micro service の開発について解説します。

モチベーション

Web サービスを運用していると、メディアサイトなどで SNS の共有のための OG Image の生成などを行うために、動的に画像を生成したいというニーズが出てくるがあると思います。

DROBE でも通知などに使うために動的な画像生成のニーズがあります。

画像の生成方法

画像を動的に生成するには技術的にはいくつかの選択肢がありますが、画像処理系のライブラリを利用して画像生成を行うというのがまず思いつくと思います。

php であれば GD や ImageMagic などを使う形になります。

www.php.net

Go であれば、image package を使うなどが考えられます。

pkg.go.dev

画像生成の課題

画像生成の時にはデザイン性のある画像を作りたいというニーズがありますが、画像処理系のライブラリを使う場合には x, y 座標を考えて調整していく必要があります。

そのため、ワークフローとして

  1. デザイナーがデザイン
  2. エンジニアが実装 (ここが中々重いのと、デザインの再現度はエンジニアの力量に依存する)
  3. デザイナーが実装結果を確認
  4. 必要があれば 2 と 3 を繰り返す

Headless Browser による画像生成サービスの実装

モチベーション

画像処理系のライブラリを使う以外の方法として Headless Browser に CSS 描画を行い、スクリーンショットを撮り画像として保存する、とう手法があります。

この手法の場合は Headless Browser を準備し操作するという面倒さはありますが、デザインは CSS で行えるため、デザイナーによるデザインの実装をイテレーション無しで高い再現度で実現できます。

DROBE ではデザイナーとの作業効率やデザインの再現性の高さを重視して Haedless Browser による手法を採用しました。

構成

Headless Browser をメインのアプリケーションコンテナ内部で準備する事はコンテナサイズの肥大化や管理コストの増大などを招くため、画像生成のサービスをマイクロサービスとする事にしました。コンテナの内部で chromium が動く環境を準備し、それを制御する go のプログラムを走らせる形です。

全体の構成は以下のようになります。

Headless browser を利用した画像生成サーバー

ここで golang による chromium の制御は chromedp というライブラリに依存しています。

github.com

中身の解説

golang の中で 2 つの net.http サーバーを動かしています。

func initialize() {

    // create a WaitGroup
    wg := new(sync.WaitGroup)

    // add two goroutines to `wg` WaitGroup
    wg.Add(2)

    // chronium がアクセスする用のサーバー
    staticServerRouter := chi.NewRouter()
    staticImageHandler := controller.NewStaticImageHandler()
    staticServerRouter.Handle("/", staticImageHandler)
    go func() {
        log.Println("start static image server on :8080")
        http.ListenAndServe(":8080", staticServerRouter)
        wg.Done()
    }()

    // 外部が叩く用のサーバー
    apiServerRouter := chi.NewRouter()
    interactor := interactor.NewCreateImageInteractor(
        imaging.NewChromedpClient(),
        storage.NewS3Client(),
    )
    apiHandler := controller.NewApiHandler(
        interactor,
    )
    apiServerRouter.Handle("/", apiHandler)
    go func() {
        log.Println("start api server on :8081")
        http.ListenAndServe(":8081", apiServerRouter)
        wg.Done()
    }()

    // wait until WaitGroup is done
    wg.Wait()
}

http:8080 サーバーの方は内部で chromium がアクセスするサーバーです。

デザインをしていた html と css を使ってWebページを表示しスクショを取る構造になっています。

外部が叩く api server は一般的な clean architecture のイメージで書いています。

処理の流れは以下のようになります。

  • 外部からリクエストが来る
  • 8081 で動いている api server で処理を受け付ける
  • api server は chromedp を操作し、8080 にWebページを表示、スクショを取る
  • 取ったスクショを S3 にあげる (S3 バケットは公開設定にしておく)
  • S3 の url を api response として返す

chromedp を制御する部分は infra レイヤーに書いています。

package imaging

import (
    "context"
    "fmt"
    "io/ioutil"
    "log"
    "time"

    "github.com/chromedp/chromedp"
)

type ChromedpClient struct {
}

func NewChromedpClient() *ChromedpClient {
    return &ChromedpClient{}
}

func (c *ChromedpClient) TakeScreenshot(url string) (string, error) {
    opts := append(chromedp.DefaultExecAllocatorOptions[:],
        chromedp.DisableGPU,
        chromedp.WindowSize(1500, 1500),
    )
    allocCtx, cancel1 := chromedp.NewExecAllocator(context.Background(), opts...)
    ctx, cancel2 := chromedp.NewContext(allocCtx, chromedp.WithLogf(log.Printf))
    ctx, cancel3 := context.WithTimeout(ctx, 5*time.Second) // set timeout
    for _, cancel := range []context.CancelFunc{cancel1, cancel2, cancel3} {
        defer cancel()
    }

    var buf []byte
    task := chromedp.Tasks{
        chromedp.Navigate(url),
        chromedp.WaitVisible("#target", chromedp.ByID),
        chromedp.Screenshot("#target", &buf, chromedp.NodeVisible),
    }

    if err := chromedp.Run(ctx, task); err != nil {
        return "", err
    }

    fileName := fmt.Sprintf("%d.png", time.Now().UnixNano())
    if err := ioutil.WriteFile(fileName, buf, 0644); err != nil {
        return "", err
    }

    return fileName, nil
}

コンテナとして動かす際の注意点

この構成でサーバーを Deploy しましたが、一つ大きくハマってしまったポイントがあるのでご紹介します。

具体的には memory 使用量がどんどん増え続けていくという挙動が観測されました。

メモリーリークが観測された

典型的なメモリーリークの挙動に見えたので pprof などを利用して golang のサーバーでの memory leak などを疑いましたが特に leak は観測されませんでした。

pkg.go.dev

go 側で leak していないとすると、コンテナ内部で他の process が悪さをしていることが想定されます。

実際に top で process を確認してみると zombie process が発生している事を確認できました。

Mem: 3810340K used, 197548K free, 4352K shrd, 135288K buff, 2115840K cached
CPU:   5% usr   3% sys   0% nic  90% idle   0% io   0% irq   0% sirq
Load average: 0.44 1.55 0.87 2/570 150
  PID  PPID USER     STAT   VSZ %VSZ CPU %CPU COMMAND
   64     1 root     S    4802m 122%   1   0% dlv exec --listen=:2345 --headless=tr
   69    64 root     S     716m  18%   0   0% /go/src/build/main
    1     0 root     S     698m  18%   1   0% air -c .air.toml
   31     0 root     S     1672   0%   0   0% ash
  150    31 root     R     1600   0%   0   0% top
  109     1 root     Z        0   0%   0   0% [chromium]
  100     1 root     Z        0   0%   1   0% [chromium]
   82     1 root     Z        0   0%   0   0% [chromium]
   83     1 root     Z        0   0%   1   0% [chromium]
  136     1 root     Z        0   0%   0   0% [chromium]

zombi process の chromium はサーバーを使えば使うほど増えていくことも確認できました。

Zombie process への対策

この blog を参考に PID1 が zombie を kill してくれるように —init flag をつけて docker を起動したところ screen shot を取った後に zombie process が生まれない事が確認できました。

blog.phusion.nl

終わりに

go を用いた画像を動的に生成するマイクロサービスをご紹介しました。

特にメモリー周りはあまり知見もなかったので同じように chromedp をコンテナで動かそうと考えている方の参考になれば幸いです。

LLM で行う独自シソーラスに対応した校正機能

はじめに

こんにちわ、DROBE の都筑です。 この記事では LLM で行う独自シソーラスに対応した校正機能について解説します。

LLM で校正してもらう

文章校正とは文章内の誤字・脱字・誤植や文法ミスを修正して正しく書き直すことを指します。 一般的には文章の「てにをは」やタイポなどを修正する機能をイメージされる方が多いと思いますが、実際に業務の現場では独自の言い回しや単語などがありライティングのガイドラインがある場合も多いと思います。

LLM による校正で非常に強力なのは、そういった独自ルールやガイドラインを踏まえた上で校正をしてくれる機能をプロンプトを作成するだけで簡単に実装できる点にあると思います。

DROBE でのユースケース

DROBE ではお客さまに商品を発送する際に スタイリングカルテ と呼んでいるスタイリストからの提案をまとめた印刷物を同梱しています。 カルテに記載する文言は全てスタイリストが自分で書いていますが、サービスに特徴的な単語の言い回しなどに一定のルールがあります。

今回はこのスタイリングカルテの文言の校正機能を LLM を用いて開発してみました。

機能の概要

実装としては非常にシンプルで、OpenAI の API を叩く Lambda を先に用意しておき、それを React から直接叩きます。

機能の概要図

UI 的には、校正ボタンを準備しておき、ボタンをクリックしたら API を叩いて結果が返ってきたら修正差分を表示する、という簡易なものを作りました。

実際にサービスに組み込んでみた

Lamda の構成については別で記事にしているので興味があればご参照ください。

tech.drobe.co.jp

プロンプト

Lambda 側のプロンプトを記載します。ここは色々な工夫のしようがあると思います。

あなたは文書を校正するアシスタントです。
意味を変えないように、与えられた文章を以下のポイントに気をつけて訂正して出力してください。
- タイポ
- 文法間違い

以下の点に関しては訂正しないでください。
1. 敬称としての使う「様」には平仮名の「さま」を使ってください
... 省略

表記を間違えやすい単語や表現があるので気をつけてください
以下に例を示します。

===============
正しい表記: DROBE
間違った表記: Drobe

正しい表記: お客さま
間違った表記: お客様、ユーザー

...省略
===============

これらを踏まえて以下の文章を訂正してください。
===============
{passage}
===============

model は gpt-3.5-turbogpt-4 を試しましたが、この場合は圧倒的に gpt-4 の方が自然な校正結果が返ってきました。

パフォーマンスに関して

パフォーマンスに関しては Datadog で処理時間を計測しています。 コンソールを見ると大体 15sec 程度ではレスポンスを返せていそうです。

Datadog のコンソール

一般的な API だと考えるとあり得ないくらい遅いですが、校正という機能を考えると及第点かなと思います。

検証方法

検証を行うために ChatGPT に間違った表記で文章をいくつか作ってもらい、それを校正 API に流して検証しました。

ChatGPT による検証問題の作成

上手くシステム化すれば、検証や評価もある程度自動化できるのではと思いました。

さいごに

OpenAI の LLM である GPT-4 を利用した文章校正ツールについて解説しました。 一般的な校正だけではなく、独自ルールやガイドラインを加味してくれるツールをサッと作れるのが非常に強力だと思います。

参考にさせていただいた資料

以下を非常に参考にさせていただきました。ありがとうございます。

LLM をサービスから使うために Lambda で API Wrapper を作る

はじめに

こんにちわ、DROBE の都筑です。

OpenAI の提供する API を始めとして LLM をサービスで活用されている、もしくはこれから活用しようとしている方は多いと思います。 一方で、OpenAI が公式に提供している Library は Python と Node.js のみなので既存のサービスに直接インテグレーションする事が難しい場合も多いのではないでしょうか。

そういったケースに対応するために、 AWS Lambda で API の Wrapper 関数を作り公開し、サービスからはその API を叩くという構成のシステムをセットアップしたのでこの記事で解説します。

作るもの

以下のような構成のものを GitHub actions と terraform で構築していきます。

全体の構成

Lambda を採用した理由は以下になります。

  • LLM を利用した機能を実装するサーバーになるが、入出力を揃えて API を叩くだけで重たい処理を行わない
  • ECS などのサービスを利用して常時 API を立てておくよりも金額的なメリットがある
  • 弊社が AWS を中心にインフラを構築しているので使いやすい

Lambda は python の container を利用します。Lambda の実行時に SSM から OpenAI の API Key を取得して API を叩く構成です。 クライアントからはカスタムドメインを貼った API Gateway 経由で Lambda にアクセスする構成としています。Log は Cloudwatch に、Metrics は Datadog に送り Lambda の実行回数などをモニタリングできるようにします。

解説

では、それぞれ解説していきます。

Lambda 関数の設計

まず Lambda 関数の設計ですが、以下のような要件を満たせるようにしたいと考えました。

  • LLM を利用したい機能は複数出てくる事が想定されるので、新機能を作るたびにインフラを作らなくて良いようにしたい
  • OpenAI 以外の API も気軽に試せるような構成にしたい

これらを踏まえて、以下のような Clean Architecture 風な File 構成で Lambda の内部を書きました。

├── main.py
└── src
    ├── domain // interface の定義
    ├── infra // interface の実装
    │   └── openai // openai に依存した API の定義 (prompt もこの下で定義)
    └── usecase
        └── interactor.py

main.py には lambda の entry point となる関数を書きます。 main.py の主な役割は DI (interactor の準備) と request の validation と response の整形です。

domain に usecase で使う処理の interface を定義します。例として与えられた入力から特定の商品情報を抜き出す処理の interface は以下のような定義にしました。

RelatedItemExtractionResponse = TypedDict(
    "RelatedItemExtractionResponse", {"items": List[str]}
)


class IFRelatedItemExtraction(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def handle(self, request_text: str) -> RelatedItemExtractionResponse:
        pass

interactor 内部で business logic を呼び出す処理を書きます。ここでは LLM の呼び出し処理を行います。もちろん、このレイヤーは infra には直接依存しないレイヤーになります。

class Interactor:
    def __init__(
        self,
        repo: IFRelatedItemExtraction,
    ):
        self.repo = repo

    def extract(self, request_text: str) -> RelatedItemExtractionResponse:
        return self.repo.handle(request_text)

実際に API を呼び出す処理や prompt の定義は infra 以下に書きます。特定の商品情報を抜き出す処理の実装はこのような形です。

MODEL_NAME = "gpt-3.5-turbo-16k" # gpt-4 なども指定できる。Application に合わせて指定する。

SYSTEM_PROMPT = "" # システムプロンプト を書く

PROMPT = """
{passage}
"""

class OpenAIRelatedItemExtraction:
    @timeout_decorator.timeout(60) # timeout させるための decoration
    def chat_complete(self, request_text: str):
        return openai.ChatCompletion.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": PROMPT.format(passage=request_text)},
            ],
            max_tokens=4098,
            temperature=0,
        )

    def handle(self, request_text: str) -> RelatedItemExtractionResponse:
        output = self.chat_complete(request_text)
        content = output["choices"][0]["message"]["content"]
        return json.loads(content)

ポイントはモデルの指定やプロンプトをここに書いている事です。プロンプトなどは実装の詳細なので、ifra 以外のレイヤーでは意識しなくて良くなり、プロンプトの修正や呼び出し方の変更 (例えば Function calling を使うように修正するなど) をしても、infra レイヤー以外の修正は行わなくて良くなります。

また、こういった構成にしておくことで、例えば OpenAI をやめて別の LLM に変更するなどをする場合でも、domain で定義された interface に沿って infra を新規で作り main.py で interactor を作る時にどちらを使うかを選べるため、 比較コードの修正量を抑えながら AB テストなどを行う事ができます。

Lamda へのリクエストは以下のような形に統一しています。

{
  "type": "function_name",
  "input": "input_text"
}

type によって interactor のどの処理を呼び出すかを決定するようにします。 この構成により、新しく Lambda を Deploy しなくても Lambda のソースコードを変更することで新しいを機能を追加できます。 もちろんコンテナのサイズが肥大化していくという懸念はありますが、API の Wrapper であることを想定しているのである程度までは機能を増やしても問題ないと考えました。

CI/CD

コンテナのビルドと Lambda の更新は GitHub Actions で行います。

CI/CD

Actions は以下のような yaml で定義しました。

name: Build

on:
  push:
    branches:
      - main

env:
  CONTAINER_NAME: xxx # make の中で定義しているコンテナ名
  REPO_NAME: yyy # ECR repo の名前
  LAMBDA_FUNCTION_NAME: zzz # terraform で定義する Lambda 関数の名前

jobs:
  build:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout
        uses: actions/checkout@v2

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.ML_AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.ML_AWS_SECRET_ACCESS_KEY }}
          aws-region: ap-northeast-1

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Build and push to ecr
        run: |
          make build # コンテナを build するためのコマンド。コンテナ名はこの内部で定義している。
          docker tag ${CONTAINER_NAME}:latest ${REPO_NAME}:latest
          docker push <repo-name>:latest
          docker tag ${CONTAINER_NAME}:latest ${REPO_NAME}:${GITHUB_SHA}
          docker push ${REPO_NAME}:${GITHUB_SHA}
  
      - name: Deploy to lambda
        run: |
          aws lambda update-function-code --function-name ${LAMBDA_FUNCTION_NAME} --image-uri ${REPO_NAME}:latest

環境の構築

Lambda の実行環境は terraform で作ります。 それぞれのリソースに合わせて file を分割して作りました。 ここでは point となる lambda function と api gateway の部分だけピックアップして説明します。

まず lambda function の定義です。

resource "aws_lambda_function" "service_llm_lambda" {
  depends_on = [
    aws_cloudwatch_log_group.service_llm_lambda, # cloudwatch の log group を定義しておきます
  ]

  function_name = "service-llm-lambda-${var.app_env}"
  package_type  = "Image"
  image_uri     = "${aws_ecr_repository.ecr_repo.repository_url}:latest" # ECR repository の名前です
  role          = aws_iam_role.lambda_role.arn # Lambda の実行時の権限です。後述しますが SSM から機密情報を読み出せる権限を渡しました

  memory_size = 128
  timeout     = 60 # OpenAI の API は通常の API よりも実行時間が長い傾向があるので長めにしておきます

  lifecycle {
    ignore_changes = [
      image_uri, last_modified
    ]
  }

  image_config {
    command = ["datadog_lambda.handler.handler"] # 後述しますが Datadog にメトリクスを送るためにコマンドを上書きしておきます。
  }

  environment {
    variables = {
      DD_LAMBDA_HANDLER = "main.lambda_handler", # コマンドを上書しているので、実際のハンドラーはここに記載します。
      DD_SITE           = "datadoghq.com",
      DD_API_KEY        = data.aws_ssm_parameter.dd_api_key.value, ## Datadog の API key を parameter store から渡します。この書き方だとコンソールから鍵が見えてしまうので実際はそうならないように工夫が必要です。
      DD_TRACE_ENABLED  = "true"
    }
  }

}

api gateway はこんな感じです。HTTP モードでの設定とし、あらかじめ Route53 に作っておいたカスタムドメインと紐付けます。

resource "aws_apigatewayv2_api" "gw_api" {
  name          = "gateway_api_service-llm-lambda-${var.app_env}"
  protocol_type = "HTTP"
  cors_configuration {
    allow_origins = ["*"] # FE から直接呼び出したいというニーズがある場合には CORS 設定をしておく。実際には `*` ではなくちゃんとドメインを指定しましょう
    allow_methods = ["POST", "GET", "OPTIONS"]
    allow_headers = ["*"]
    max_age       = 300
  }
}

resource "aws_apigatewayv2_stage" "gw_stage" {
  api_id = aws_apigatewayv2_api.gw_api.id

  name        = "gateway_api_service-llm-lambda-${var.app_env}"
  auto_deploy = true

  access_log_settings {
    destination_arn = aws_cloudwatch_log_group.gw_lg.arn
}

resource "aws_apigatewayv2_integration" "gw_integration" {
  api_id = aws_apigatewayv2_api.gw_api.id

  integration_uri    = aws_lambda_function.service_llm_lambda.invoke_arn
  integration_type   = "AWS_PROXY"
  integration_method = "POST"
}

resource "aws_apigatewayv2_route" "gw_route" {
  api_id = aws_apigatewayv2_api.gw_api.id

  route_key = "POST /invoke"
  target    = "integrations/${aws_apigatewayv2_integration.gw_integration.id}"
}

resource "aws_cloudwatch_log_group" "gw_lg" {
  name              = "/aws/api_gw/service-llm-lambda"
  retention_in_days = 1
}

resource "aws_lambda_permission" "api_gw" {
  statement_id  = "AllowExecutionFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.service_llm_lambda.function_name
  principal     = "apigateway.amazonaws.com"

  source_arn = "${aws_apigatewayv2_api.gw_api.execution_arn}/*/*"
}

resource "aws_apigatewayv2_domain_name" "api_gw_domain" {
  domain_name = "${var.app_env}.xxx" # あらかじめ route53 に作っておいた domain 名

  domain_name_configuration {
    certificate_arn = var.domain_cert_arn # あらかじめ ACM で取得しておきた certificate の ARN を指定
    endpoint_type   = "REGIONAL"
    security_policy = "TLS_1_2"
  }
}

resource "aws_route53_record" "api_gw_domain_record" { # ここで Route53 の ZONE に API Gateway にリクエストを向けるための A record を追加
  name    = aws_apigatewayv2_domain_name.api_gw_domain.domain_name
  type    = "A"
  zone_id = var.zone_id

  alias {
    evaluate_target_health = true
    name                   = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].target_domain_name
    zone_id                = aws_apigatewayv2_domain_name.api_gw_domain.domain_name_configuration[0].hosted_zone_id
  }
}

resource "aws_apigatewayv2_api_mapping" "api_gw_domain_mapping" {
  api_id      = aws_apigatewayv2_api.gw_api.id
  stage       = aws_apigatewayv2_stage.gw_stage.id
  domain_name = aws_apigatewayv2_domain_name.api_gw_domain.id
}

設定のポイント

Timeout について

OpenAI の API を通常の API よりも実行時間が長い傾向があります。(公式の python sdk の設定 は 2023年8月現在、default の timeout はなんと 600sec です!)

そのため Lambda の実行時間を 60sec など長めにとっておくのが良いと思います。

また、コード自体でも timeout を適切に入れておくのが望ましいです。 今回は OpenAI の SDK に依存する形ではなく timeout-decorator という library を使って時前で timeout させるようにしました。

@timeout_decorator.timeout(60)
    def chat_complete(self, request_text: str):
        return openai.ChatCompletion.create(
            model=MODEL_NAME,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": PROMPT.format(passage=request_text)},
            ],
            max_tokens=2048,
            temperature=0,
        )

API Key をどこから取り出すか

API Key は機密情報なので Lambda の内部で ssm を経由して取り出すようにしました。 Lambda の Execution role に SSM の該当リソースを取得してデコードする権限を渡しておきます。

data "aws_iam_policy_document" "lambda_execution_policy" {

  ... 省略

  statement {
    effect = "Allow"
    actions = [
      "kms:Decrypt",
      "ssm:GetParameter"
    ]
    resources = [
      "arn:aws:ssm:*:*:parameter/xxx" # parameter 名
    ]
  }
}

python 側では Lambda 起動時に以下のような処理が走るような実装にしています。ただしこれは OpenAI のみで必要な処理なので infra レイヤー内部に実装を入れています。

import boto3
import openai

ssm = boto3.client("ssm", region_name="ap-northeast-1")
response = ssm.get_parameter(
    Name="xxx", WithDecryption=True
)
openai.api_key = response["Parameter"]["Value"]

Datadog との接続

Datadog にメトリクスを送るには、以下の 2 点を行う必要があります。

  1. コンテナに Datadog に関連するライブラリを入れておく
  2. コンテナな起動設定でエントリポイントを Datadog のライブラリに上書きし、必要な環境変数を設定する

1 に関しては Dockerfile に public.ecr.aws/datadog/lambda-extension:latest から設定をコピーする設定を書きつつ、 datadog-lambda ライブラリをインストールします。今回は python のパッケージ管理を poetry で行っているので、 pyproject.toml に設定を追記します。

FROM public.ecr.aws/lambda/python:3.10

... 省略

COPY --from=public.ecr.aws/datadog/lambda-extension:latest /opt/extensions/ /opt/extensions

... 省略

RUN poetry install --no-root
[tool.poetry.dependencies]
python = "^3.10"
openai = "^0.27.8"
boto3 = "^1.28.22"
timeout-decorator = "^0.5.0"
datadog-lambda = "^4.78.0" # ここ

詳しい設定はこちらを参考にしてください。

docs.datadoghq.com

設定が完了した上で Lambda を実行すると、コンソールの Menu で Infrastructure -> Serverless 内で図のようにメトリクスが表示されるようになります。

Datadog のコンソール

ハマりポイント

Lambda 関数の event について

api gateway で integration した場合には event の中身が変化するという事を理解できておらず、ややハマりました。

コンソールからのテストでは正常に動きましたが、postman などで api を試すと internal serveer error が返ってきてしまいました。

{"message":"Internal Server Error"}

調べていくと、公式に以下のような解説がありました。

ペイロード形式バージョンでは、API Gateway が Lambda 統合に送信するデータの形式と、API Gateway が Lambda からのレスポンスをどのように解釈するかを指定します。ペイロード形式バージョンを指定しない場合、AWS Management Console はデフォルトで最新バージョンを使用します。

docs.aws.amazon.com

Api Gateway からの呼び出しの際には event の構造が変化するので、それに対応するように event からパラメータを取り出したり、レスポンスの形式を揃える必要があります。

インフラ的に解決する道もあると思いますが、簡易的に以下の関数を main.py に定義して対応しました。

def get_request(event):
    if "type" not in event:
        event = json.loads(event["body"])

    request_type = event["type"]
    request_text = event["input"]

    return request_type, request_text

def create_response(response: dict):
    return {
        "statusCode": 200,
        "body": json.dumps(response),
    }

さいごに

OpenAI などの LLM を活用するための Lambda の構成について解説しました。 がっつり使う場合はまた別の構成があると思いますが、LLM をちょっとした機能で使う場合には出来る限りインフラを追加で作らずに機能を追加できるような構成にしておく事にはメリットがあると思います。


DROBE開発組織の紹介
組織情報ポータル