NFLabs. エンジニアブログ

セキュリティやソフトウェア開発に関する情報を発信する技術者向けのブログです。

チャットボットの会話履歴を保存したい

はじめに

NFLaboratories Advent Calendar 2024 - Adventar 16日目 最終日の記事です。
こんにちは! 研究開発部の田島です。

現在NFLabs.ではセキュリティ技術トレーニングプラットフォーム(TPF)1を開発しています。
この記事ではTPFに搭載されているチャットボットの紹介と、その機能拡張について説明します。

チャットボットについて

チャットボット
画像に見えているのが、TPFのチャットボットです。

TPFではセキュリティ技術の学習のためユーザーがコンテンツを解いていきます。
チャットボットはコンテンツを解くユーザーが行き詰まった際に適切なアドバイスを出し、学習をサポートしてくれます。

チャットボットとの会話

背景

これまではチャットボットとの会話履歴が保存されていませんでした。
アドバイスの質の改善や機能拡張のためには、過去に行った会話データが必要になります。
これを解決することでユーザーや管理者の負担を減らす実装に繋げられ、より学習に集中できる環境が整います。

TPFアプリケーションはAWSのECS上で実行され、チャットボットを動かすためにAzureOpenAIのアシスタントが使われています。
ユーザーからの質問を受け取ると、ElasticCloudにあるユーザー行動ログとコンテンツの情報に基づき回答します。
AzureOpenAIのAPI2で直接会話履歴を取得できるのですが、スレッド3が削除されてしまうと会話履歴も取得できなくなります。
スレッド削除に依存せずより長い期間の永続化を実現するために保存基盤を用意し、全文検索についても想定しておく必要がありました。

会話履歴保存方法

1つ目はECSで動いているアプリケーションのログを収集しETLする方法、2つ目はバックエンドでチャットメッセージのストリーミングデータを変換して保存する方法があります。
前者はコンテナの標準出力をCloudWatchやS3に転送し、AthenaやDuckDBで分析することを考えていました。
運用におけるデバッグがしやすくなるという利点もありそうですが、会話履歴以外のデータが混じり、実装コストも重くなります。
ここでは後者を選択しました。

会話履歴保存先

保存方法が決まったので、保存先のリソースをどれにするかを考えます。

候補1: DynamoDB
会話履歴保存の実装を検索したときに、AWS公式の構成例4や記事でこのリソースが使われているのを目にします。
無料枠で25GBのストレージを利用でき、テーブル設計次第では単体で全文検索に近いことができるようです。

候補2: S3
保存形式にJSONを用いたりするのであればS3も使えます。
S3標準では5GBのストレージを無料枠 (12ヶ月間)として利用でき、単体では全文検索機能を提供していませんがAthenaなどと連携し全文検索を行うことができます。
また、ストレージに保存するデータがDynamoDBの無料枠を超えてくると、S3に保存した方がコストを抑えられる場合があります。

候補3: ElasticCloud
TPFですでに別機能のために利用されているリソースです。
契約中のプランと利用状況から20GBほどストレージに空きがあり、Elasticsearchで分析ができるため全文検索に優れています。

コスト算出

想定している会話履歴データの保存容量は、1日に200人が20ターン5会話する場合
2byte * 400文字 * 20ターン * 200人 = 3200KB

1ヶ月あたり 3200KB * 30日 = 96,000,000 = 96MB ほどになり、20GBを使い切るにも208.3ヶ月ほどかかるのでかなり余裕があります。

リクエスト数に関しても、書き込み回数が 20ターン * 30日 * 200人 = 12万回 として 読み込み回数が書き込み回数の倍で考えて24万回を想定します。

保存容量とリクエスト数でコスト算出したところ、保存料金は多くて数十円で、規模が小さいとどれも低額です。
そのため実装において管理リソースが増えず、全文検索に優れたElasticCloudの利用を決めました。

実装

バックエンド側で質問と回答を変換し、ElasticCloudで保存することを決めました。
実装ではLangChainを使用することで、インデックスの作成からメッセージの追加、会話履歴の取得までたった数行のコードで実現できます。

from langchain_elasticsearch import (
    ElasticsearchChatMessageHistory,
)

# クライアント兼index作成
history = ElasticsearchChatMessageHistory(
    es_url="http://localhost:9200", index="test-history", session_id="test-session"
)

history.add_user_message("こんにちは!")
history.add_ai_message("何か質問はありますか?")

ローカルで検証します。
docker-compose でElasticsearchとKibanaを立ち上げた後に、上記のコードを実行します。

version: '3'
services:
  es01:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.16.1
    container_name: es01
    ports:
      - 9200:9200
    networks:
      - elastic
    mem_limit: 1G
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false

  kib01:
    image: docker.elastic.co/kibana/kibana:8.16.1
    container_name: kib01
    ports:
      - 5601:5601
    networks:
      - elastic
    depends_on:
      - es01
    environment:
      - ELASTICSEARCH_HOSTS=http://es01:9200

networks:
  elastic:
    driver: bridge

結果は画像の通りです。

あらかじめindexを作成せずともメッセージが保存できることが確認できます。
また、よくみると会話内容のうち非ASCIIの文字がエスケープされています。
ElasticsearchChatMessageHistoryのデフォルトオプションでは ensure_ascii がTrueになっているので、Falseにすると直ります。

history = ElasticsearchChatMessageHistory(
    es_url="http://localhost:9200",
    index="test-history",
    session_id="test-session",
    ensure_ascii=False, # <- ここ
)

日本語でデータが入っていることを確認できます。
ここでもう一つ気づくこととして、jsonがdumpされて格納されています6
history.messages などで会話履歴を取得可能ですが、メッセージの内容で全文検索したい時に不便です。

そこで、インデックスを作成しなおし、メッセージ追加時に使用されるメソッドを上書きしてdumpされないようにします。

PUT /test-history
{
  "mappings": {
    "properties": {
      "session_id": {
        "type": "keyword"
      },
      "created_at": {
        "type": "date"
      },
      "history": {
        "properties": {
          "data": {
            "properties": {
              "additional_kwargs": {
                "type": "object"
              },
              "content": {
                "type": "text"
              },
              "example": {
                "type": "boolean"
              },
              "id": {
                "type": "keyword"
              },
              "name": {
                "type": "keyword"
              },
              "response_metadata": {
                "type": "object"
              },
              "type": {
                "type": "keyword"
              }
            }
          },
          "type": {
            "type": "keyword"
          }
        }
      }
    }
  }
}
import logging
from time import time
from typing import Optional

from elasticsearch import ApiError, Elasticsearch
from langchain_core.messages import (
    BaseMessage,
    message_to_dict,
)
from langchain_elasticsearch import (
    ElasticsearchChatMessageHistory,
)

logger = logging.getLogger(__name__)

class CustomElasticsearchChatMessageHistory(ElasticsearchChatMessageHistory):
    def __init__(
        self,
        index: str,
        session_id: str,
        *,
        es_connection: Optional[Elasticsearch] = None,
        es_url: Optional[str] = None,
        es_cloud_id: Optional[str] = None,
        es_user: Optional[str] = None,
        es_api_key: Optional[str] = None,
        es_password: Optional[str] = None,
        ensure_ascii: Optional[bool] = True,
    ):
        super().__init__(
            index,
            session_id,
            es_connection=es_connection,
            es_url=es_url,
            es_cloud_id=es_cloud_id,
            es_user=es_user,
            es_api_key=es_api_key,
            es_password=es_password,
            ensure_ascii=ensure_ascii,
        )

    def add_message(self, message: BaseMessage) -> None:
        """Add a message to the chat session in Elasticsearch"""
        try:
            self.client.index(
                index=self.index,
                document={
                    "session_id": self.session_id,
                    "created_at": round(time() * 1000),
                    "history": message_to_dict(message),
                },
                refresh=True,
            )
        except ApiError as err:
            logger.error(f"Could not add message to Elasticsearch: {err}")
            raise err


history = ElasticsearchChatMessageHistory(
    es_url="http://localhost:9200",
    index="test-history",
    session_id="test-session",
)

history.add_user_message("こんにちは!")
history.add_ai_message("何か質問はありますか?")

実行結果

階層構造が維持され、メッセージに対して全文検索が行えるようになりました。

おわりに

ほぼ同時並行で行われていた移行作業があり、どれだけそこに影響を出さないようにできるか、管理に手間がかからないようにするかで悩みました。
そういった意味でLangChainのMessage histories7機能は今回のユースケースにピッタリでした。
幅広くDBなどに対応していますし、保存から取り出しまで少ないコードで実装できます。(json.dumpsへ対応が必要でしたが)
今後もプロダクトの成長を楽しみにしていてください。


  1. 5分でわかるプロダクト(トレーニングプラットフォーム)の紹介 - NFLabs. エンジニアブログ
  2. https://platform.openai.com/docs/api-reference/messages/listMessages
  3. アシスタントとユーザー間の会話セッションを管理するためのオブジェクトです。
  4. https://aws.amazon.com/jp/cdp/ai-chatapp/?p=cdp&z=4
  5. 一問一答を1ターンとしています。
  6. https://github.com/langchain-ai/langchain-elastic/blob/main/libs/elasticsearch/langchain_elasticsearch/_sync/chat_history.py#L96 https://github.com/langchain-ai/langchain-elastic/blob/main/libs/elasticsearch/langchain_elasticsearch/_sync/chat_history.py#L145
  7. https://python.langchain.com/docs/integrations/memory/