AWS Lambdaのレスポンスストリーミングを使ってChatGPTっぽいUIにする

| 12 min read
Author: noboru-kudo noboru-kudoの画像

2023-04-07にAWS Lambdaのレスポンスストリーミングのサポートが発表されました。

AWS Lambdaでは、同期型のHTTP通信はリクエスト・レスポンスモデルのみで、Lambdaの実行時間に比例してユーザーへの応答時間も長くなります。
今回サポートされたレスポンスストリーミングでは、部分的でもレスポンスの準備ができ次第すぐに返せますので、UXはかなり改善しそうです。
また、記事によるとレスポンスストリーミングでは、レスポンスサイズの制約も6MBから20MB(ソフトリミット)へと緩和されるとのことです[1]

今回は、このLambdaレスポンスストリーミングを使って、OpenAIのChat Completion APIのレスポンスをChatGPT風に段階的に表示されるようにしてみたいと思います。

なお、ここで実装したソースコードはGitHubレポジトリに格納しています。

SAMプロジェクトのセットアップ

#

今回は冒頭の記事で使用されているAWS SAMでプロジェクトをセットアップします[2]

SAMのCLIで用意されているinitコマンドで以下を実行します。

sam init --name lambda-response-stream-sample \
--app-template hello-world-typescript --runtime nodejs18.x

現時点でレスポンスストリーミングがサポートされているマネージドランタイム環境はNode.jsのみです。このためランタイム環境はnodejs18.xを選択しました。

また、テンプレートはシンプルなTypeScriptプロジェクト(hello-world-typescript)を指定しました。

続いて必要なライブラリを依存関係に追加します。

npm install openai @aws-sdk/client-ssm

OpenAI APIの公式ライブラリに加えて、AWS SSMのSDKも追加しました。
これは、SSMパラメータストアに保存したOpenAI APIのAPIキーを取得するために必要なものです[3]

レスポンスストリーミングでLambda関数を実装する

#

続いてLambda関数の実装です。SAMのinitコマンドで生成されたapp.tsを以下のように書き換えました。

import * as AWS from '@aws-sdk/client-ssm';
import { Configuration, OpenAIApi } from 'openai';
import { IncomingMessage } from 'http';

import { pipeline as pipelineSync, Transform, TransformOptions } from 'stream';
import { promisify } from 'util';
const pipeline = promisify(pipelineSync);

// OpenAIのストリームレスポンスをChatメッセージに変換するカスタムTransformer
class ChunkTransformer extends Transform {
constructor(options?: TransformOptions) {
super(options);
}
_transform(chunk: Buffer, encoding: BufferEncoding, callback: (error?: Error | null, data?: any) => void): void {
console.log(chunk.toString());
const payloads = chunk.toString().split('\n\n');
for (const payload of payloads) {
if (payload.includes('[DONE]')) {
break;
}
if (payload.startsWith('data:')) {
const data = payload.replaceAll(/(\n)?^data:\s*/g, '');
try {
const delta = JSON.parse(data.trim());
this.push(delta.choices[0].delta?.content || '');
} catch (error) {
console.log(error);
}
}
}
callback();
}
}

// SSMパラメータストアからAPIキーを取得
const ssmClient = new AWS.SSM({});
const ssmParam = ssmClient.getParameter({ Name: '/openai/api-key', WithDecryption: true });

// Lambdaイベントハンドラー本体
export const lambdaHandler = awslambda.streamifyResponse(async (event, responseStream, context) => {
const json = JSON.parse(event.body || '{}');
if (!json.message) {
responseStream.write('no message');
responseStream.end();
return;
}

try {
// OpenAI APIの実行
const apiKey = (await ssmParam).Parameter?.Value || '';
const configuration = new Configuration({
apiKey
});
const openai = new OpenAIApi(configuration);
const response = await openai.createChatCompletion({
model: 'gpt-3.5-turbo',
messages: [{
role: 'user',
content: json.message
}],
max_tokens: 1024,
temperature: 0.7,
stream: true // enable stream
}, {
responseType: 'stream' // axios option(required)
});

// Node.jsのPipelineに連携
const stream = response.data as unknown as IncomingMessage;
await pipeline(stream, new ChunkTransformer(), responseStream);
} catch (err) {
console.log(err);
responseStream.write('エラーが発生しました!');
responseStream.end();
}
});

ポイントはイベントハンドラーで、awslambda.streamifyResponse(...)内に関数をラップしているところです。
最初はこのawslambdaが何者か分からず混乱しましたが、LambdaのNode.jsランタイム環境に組み込まれているもののようです。
公式ドキュメントには以下のように記述されています。

Wrap your function with the awslambda.streamifyResponse() decorator that the native Node.js runtimes provide.

ここでTypeScriptの型チェックでエラーにならないように、以下のd.tsファイルを用意しました。

import { APIGatewayProxyEvent, Context, Handler } from 'aws-lambda';

declare global {
namespace awslambda {
function streamifyResponse(f: (event: APIGatewayProxyEvent,
responseStream: NodeJS.WritableStream,
context: Context) => Promise<void>): Handler;
}
}

この関数シグニチャで重要なのはresponseStreamです。
レスポンスストリーミングは通常のLambda関数とは違い、ここに結果(チャンク)を書き込んでいくスタイルになります。

これのやり方として冒頭の記事や公式ドキュメントでは、Node.jsのpipelineを使うことを推奨しています。

we recommend that you use pipeline() wherever possible. Using pipeline() ensures that the writable stream is not overwhelmed by a faster readable stream.

これに従い、以下のストリームパイプラインを構築しました。

  1. 入力: OpenAI APIのストリームレスポンス(streamオプション有効化指定) ※下記コラム参照
  2. 変換: ストリームレスポンスのチャンクをチャットメッセージ変換(ChunkTransformer)
  3. 出力: Lambdaレスポンスストリーミング(responseStream)
OpenAI APIのストリームレスポンスの内容

本題から外れるので詳細は省略しましたが、OpenAI自体もstreamオプションを有効にすることでChatGPTのように段階的にチャンクメッセージを受け取ることができます。

各チャンクは以下のようにdata:に続いてJSONメッセージが設定されます。終了時にはJSONの代わりに[DONE]が出力されるようになっています。

  • 最初のチャンク
data: 
{
    "id": "chatcmpl-78ILvs4XFg967lP17phrjVWCpRssF",
    "object": "chat.completion.chunk",
    "created": 1682211527,
    "model": "gpt-3.5-turbo-0301",
    "choices": [
        {
            "delta": {
                "role": "assistant"
            },
            "index": 0,
            "finish_reason": null
        }
    ]
}
  • 2番目以降のチャンク
data: 
{
    "id": "chatcmpl-78ILvs4XFg967lP17phrjVWCpRssF",
    "object": "chat.completion.chunk",
    "created": 1682211527,
    "model": "gpt-3.5-turbo-0301",
    "choices": [
        {
            "delta": {
                "content": "Hello"
            },
            "index": 0,
            "finish_reason": null
        }
    ]
}
  • 最後のチャンク
data: 
{
    "id": "chatcmpl-78ILvs4XFg967lP17phrjVWCpRssF",
    "object": "chat.completion.chunk",
    "created": 1682211527,
    "model": "gpt-3.5-turbo-0301",
    "choices": [
        {
            "delta": {},
            "index": 0,
            "finish_reason": "stop"
        }
    ]
}


data: [DONE]

今回カスタムで作成したChunkTransformerは、この形式のレスポンスをチャットメッセージのチャンクとしてLambdaのresponseStreamに流れるようにしています。

Lambda関数をビルド・デプロイする

#

それでは、これをデプロイします。
ここではSAMの初期化で生成されたtemplate.yamlを以下のように修正しました。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: lambda-response-stream-sample

Resources:
# Lambdaの実行ロール
ChatFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: SSMOpenAIApiKeyRead
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "ssm:GetParameter"
Resource:
- !Sub "arn:aws:ssm:*:${AWS::AccountId}:parameter/openai/api-key"
Path: "/"

# Lambda関数本体
ChatFunction:
Type: AWS::Serverless::Function
Properties:
Role: !GetAtt ChatFunctionRole.Arn
CodeUri: chat/dist
Handler: app.lambdaHandler
Runtime: nodejs18.x
Timeout: 60
Architectures:
- x86_64
FunctionUrlConfig:
AuthType: NONE
InvokeMode: RESPONSE_STREAM # LambdaレスポンスストリームON
Cors:
AllowCredentials: true
AllowMethods: ["POST"]
AllowOrigins: ["*"]
AllowHeaders: ["*"]

Outputs:
ChatApi:
Description: "Chat Lambda Function URL"
Value: !GetAtt ChatFunctionUrl.FunctionUrl
ChatFunction:
Description: "Chat Lambda Function ARN"
Value: !GetAtt ChatFunction.Arn

ポイントは、ChatFunctionのFunctionUrlConfigです。
現時点では、レスポンスストリーミングはAPI GatewayやALB(Application Load Balancer)をサポートしていません。ここではLambda Function URLを有効にします。
このFunctionUrlConfigのInvokeModeRESPONSE_STREAMを指定することで、Lambdaのレスポンスストリーミングが有効になります。
また、この後でローカル環境で動作する簡単なUIも作成しますので、CORSの許可設定もしておきます。

続いてビルドですが、当初はSAMのesbuildサポートを使うつもりでしたがうまく動きませんでした。
ここでは、個別にesbuildでバンドルする方法を使います(やり方に問題があったかもしれませんので、後日リトライしてみようと思います)。

以下のesbuild用のスクリプト(build.js)を別途用意しました。

require('esbuild').build({
entryPoints: ['app.ts'],
bundle: true,
minify: true,
sourcemap: true,
outfile: 'dist/app.js',
target: 'node18',
platform: 'node'
}).catch((e) => console.log(e) && process.exit(1));

Lambda関数本体のapp.tsをエントリーポイントとしてdist/app.jsにバンドルします。これがLambdaのデプロイ対象になります。

以下スクリプトでアプリケーションをバンドルし、SAMテンプレートをビルドします。

# Lambda関数ソースコードディレクトリに移動
pushd chat
# esbuildでバンドル
node build.js
# プロジェクトルートに戻る
popd
# SAM CLIでビルド
sam build

最後にデプロイします。

sam deploy

実行が終わるとLambda関数がデプロイされます。
以下はAWS管理コンソールから見た状態です。

lambda

関数URL(Lambda Function URL)のエンドポイントが作成されているのが分かります。

また、関数URL自体の設定を見てみると、呼出モード(InvokeMode)にテンプレートで指定したRESPONSE_STREAMが設定されていることが確認できます。

function url

試しにcurlでリクエストを送ってみました。

curl https://xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.lambda-url.ap-northeast-1.on.aws/ \
-H 'Content-Type:application/json' -d '{"message": "こんにちわ!あなたは誰ですか?"}'
> こんにちは!私はAIアシスタントのGPT-3です。あなたのサポートになります。

これくらいのレスポンスだと、ストリーミングしている感はありませんが問題なさそうです。

レスポンスストリーミング向けのUIを作成する

#

ここで、レスポンスストリーミングを活用したUIを作成します。今回はNuxt3を使います。
以下でプロジェクトを作成します。

npx nuxi init nuxt-app
cd nuxt-app
npm install

今回はサンプルなので、ページコンポーネントは作成せずapp.vueを以下のように修正しました。

<script setup lang="ts">
const message = ref('');
const response = ref('');
const ask = async () => {
response.value = '';
const config = useRuntimeConfig()
const stream = await $fetch(config.public.API_ENDPOINT, {
method: 'POST',
body: { message: message.value },
responseType: 'stream'
});
const reader = (stream as ReadableStream).getReader();
let done = false;
const decoder = new TextDecoder();

while (!done) {
const { value, done: isDone } = await reader.read();
if (isDone) {
done = true;
break;
}
const newData = new Uint8Array(value);
response.value += decoder.decode(newData);
}
}
</script>

<template>
<div>
<textarea v-model="message" cols="50" rows="5" style="display: block;margin-bottom: 5px"></textarea>
<button @click="ask">送信!!</button>
<div style="margin-top: 5px"></div>
</div>
</template>

送信ボタンがクリックされたら、Nuxt3の$fetch APIで先程のLambda関数をstreamタイプで呼出しています。
このレスポンスはストリームとなりますので、レスポンスが完全に終了するまでresponse変数に追記していきます。
このresponse変数はref関数でリアクティブにしているので、更新がある度に最新化され、ChatGPTのような見た目になるはずです。

設定情報(RuntimeConfig)で使っているAPI_ENDPOINTはLambda関数のエンドポイントです。
nuxt.config.tsで用意しています。

export default defineNuxtConfig({
runtimeConfig: {
public: {
API_ENDPOINT: 'https://xxxxxxxxxxxxxxxxxxxx.lambda-url.ap-northeast-1.on.aws'
}
}
})

もちろんここには先程デプロイしたLambda関数のURLを設定します。

後は実行するだけです。開発モードでローカルから実行します。

npm run dev

ブラウザからhttp://localhost:3000/にアクセスします。
以下動画です。

ChatGPTのように、段階的にメッセージが追記されている様子が分かりますね。

最後に

#

今回は結構試行錯誤しながらやりましたが、無事Lambdaのレスポンスストリーミングを活用したUIを実装できました。
現時点で利用できる環境は限られますが、レスポンスストリーミングは実行時間が長いもの以外でも、レスポンスサイズが大きくなりがちなファイルダウンロード等、アイデア次第で用途は広そうです。

今後の動向をチェックしていきたいなと思いました。


参考資料


  1. ただし、6MBを超えた部分のストリームはそのサイズに応じて課金対象となるようです。 ↩︎

  2. 執筆時点ではAWS CDKやServerless Frameworkでレスポンスストリーミングはまだサポートされていませんでした。 ↩︎

  3. こちらの記事で使用したパラメータを使います。 ↩︎

豆蔵デベロッパーサイト - 先週のアクセスランキング
  1. 基本から理解するJWTとJWT認証の仕組み (2022-12-08)
  2. AWS認定資格を12個すべて取得したので勉強したことなどをまとめます (2022-12-12)
  3. Nuxt3入門(第4回) - Nuxtのルーティングを理解する (2022-10-09)
  4. Nuxt3入門(第1回) - Nuxtがサポートするレンダリングモードを理解する (2022-09-25)
  5. Nuxt3入門(第8回) - Nuxt3のuseStateでコンポーネント間で状態を共有する (2022-10-28)
  6. Jest再入門 - 関数・モジュールモック編 (2022-07-03)
  7. 自然言語処理初心者が「GPT2-japanese」で遊んでみた (2022-07-08)
  8. IoT を使ってみる(その6:MQTTブローカー Mosquitto編) (2022-10-08)
  9. Nuxt3入門(第3回) - ユニバーサルフェッチでデータを取得する (2022-10-06)
  10. 統計学で避けて通れない自由度の話 (2022-06-20)