Skip to content

Commit

Permalink
community[feat]: Add OpenSearch Chat Message History
Browse files Browse the repository at this point in the history
  • Loading branch information
leedotpang committed Sep 3, 2024
1 parent fa8402e commit 5040082
Show file tree
Hide file tree
Showing 3 changed files with 316 additions and 0 deletions.
162 changes: 162 additions & 0 deletions docs/docs/integrations/memory/opensearch_chat_message_history.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "683953b3",
"metadata": {
"id": "683953b3"
},
"source": [
"# OpenSearch\n",
"\n",
">[OpenSearch](https://opensearch.org) is a distributed, RESTful search and analytics engine, capable of performing both vector and lexical search, derived from Elasticsearch 7.10.2. \n",
"\n",
"This notebook shows how to use chat message history functionality with `OpenSearch`."
]
},
{
"cell_type": "markdown",
"id": "3c7720c3",
"metadata": {},
"source": [
"## Set up OpenSearch\n",
"\n",
"There are two main ways to set up an OpenSearch instance:\n",
"\n",
"1. **AWS.** AWS offers a managed OpenSearch service. You can test it with their [free credits](https://aws.amazon.com/opensearch-service/).\n",
"\n",
"2. **Local OpenSearch installation.** Get started with OpenSearch by running it locally. The easiest way is to use the official OpenSearch Docker image. See the [OpenSearch Docker documentation](https://opensearch.org/docs/latest/install-and-configure/install-opensearch/docker/) for more information."
]
},
{
"cell_type": "markdown",
"id": "cdf1d2b7",
"metadata": {},
"source": [
"## Install dependencies"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e5bbffe2",
"metadata": {},
"outputs": [],
"source": [
"%pip install --upgrade --quiet opensearch-py langchain langchain-community"
]
},
{
"cell_type": "markdown",
"id": "c46c216c",
"metadata": {},
"source": [
"## Authentication\n",
"\n",
"### Use the Username/password\n",
"\n",
"```python\n",
"opensearch_username = os.environ.get(\"OPENSEARCH_USERNAME\", \"opensearch\")\n",
"opensearch_password = os.environ.get(\"OPENSEARCH_PASSWORD\", \"change me...\")\n",
"\n",
"history = OpenSearchChatMessageHistory(\n",
" opensearch_url=opensearch_url,\n",
" opensearch_user=opensearch_username,\n",
" opensearch_password=opensearch_password,\n",
" index=\"test-history\",\n",
" session_id=\"test-session\"\n",
")\n",
"```\n",
"\n",
"```\n",
"NOTE: \n",
"If you want to instantiate the opensearch client separately, \n",
"pass it in with the `opensearch_connection` keyword argument.\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "8be8fcc3",
"metadata": {},
"source": [
"## Initialize OpenSearch client and chat message history"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "8e2ee0fa",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from langchain_community.chat_message_histories import OpenSearchChatMessageHistory\n",
"\n",
"opensearch_url = os.environ.get(\"OPENSEARCH_URL\", \"http://localhost:9200\")\n",
"history = OpenSearchChatMessageHistory(\n",
" opensearch_url=opensearch_url, index=\"test-history\", session_id=\"test-session\"\n",
")"
]
},
{
"cell_type": "markdown",
"id": "a63942e2",
"metadata": {},
"source": [
"## Use the chat message history"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "c1c7be79",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[HumanMessage(content='hi!'),\n",
" AIMessage(content='whats up?'),\n",
" HumanMessage(content='hi!'),\n",
" AIMessage(content='whats up?')]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"history.add_user_message(\"hi!\")\n",
"history.add_ai_message(\"whats up?\")\n",
"history.messages"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
from langchain_community.chat_message_histories.neo4j import (
Neo4jChatMessageHistory,
)
from langchain_community.chat_message_histories.opensearch import (
OpenSearchChatMessageHistory,
)
from langchain_community.chat_message_histories.postgres import (
PostgresChatMessageHistory,
)
Expand Down Expand Up @@ -101,6 +104,7 @@
"MomentoChatMessageHistory",
"MongoDBChatMessageHistory",
"Neo4jChatMessageHistory",
"OpenSearchChatMessageHistory",
"PostgresChatMessageHistory",
"RedisChatMessageHistory",
"RocksetChatMessageHistory",
Expand All @@ -127,6 +131,7 @@
"MomentoChatMessageHistory": "langchain_community.chat_message_histories.momento",
"MongoDBChatMessageHistory": "langchain_community.chat_message_histories.mongodb",
"Neo4jChatMessageHistory": "langchain_community.chat_message_histories.neo4j",
"OpenSearchChatMessageHistory": "langchain_community.chat_message_histories.opensearch",
"PostgresChatMessageHistory": "langchain_community.chat_message_histories.postgres",
"RedisChatMessageHistory": "langchain_community.chat_message_histories.redis",
"RocksetChatMessageHistory": "langchain_community.chat_message_histories.rocksetdb",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import json
import logging
from time import time
from typing import TYPE_CHECKING, List

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict

if TYPE_CHECKING:
from opensearchpy import OpenSearch

logger = logging.getLogger(__name__)

DEFAULT_INDEX_NAME = "chat-history"
IMPORT_OPENSEARCH_PY_ERROR = (
"Could not import OpenSearch. Please install it with `pip install opensearch-py`."
)


def _default_message_mapping() -> dict:
return {"mappings": {"properties": {"SessionId": {"type": "keyword"}}}}


class OpenSearchChatMessageHistory(BaseChatMessageHistory):
"""Chat message history that stores history in OpenSearch.
Args:
opensearch_url: connection string to connect to OpenSearch.
session_id: Arbitrary key that is used to store the messages
of a single chat session.
index: Name of the index to use.
"""

def __init__(
self,
opensearch_url: str = None,
session_id: str = None,
index: str = DEFAULT_INDEX_NAME,
**kwargs,
):
self.opensearch_url = opensearch_url
self.index = index
self.session_id = session_id

opensearch_connection = kwargs.get("opensearch_connection")

try:
from opensearchpy import OpenSearch
except ImportError as e:
raise ImportError(IMPORT_OPENSEARCH_PY_ERROR) from e

# Initialize OpenSearch client from passed client arg or connection info
if not opensearch_url and not opensearch_connection:
raise ValueError("OpenSearch connection or URL is required.")

try:
if opensearch_connection:
self.client = kwargs.get("opensearch_connection").options(
headers={"user-agent": self.get_user_agent()}
)
else:
self.client = OpenSearch(
[opensearch_url],
**kwargs,
)
except ValueError as e:
raise ValueError(
"Your OpenSearch client string is mis-formatted. Got error: {e}"
) from e

if self.client.indices.exists(index=index):
logger.debug(
"Chat history index %s already exists, skipping creation.", index
)
else:
logger.debug("Creating index %s for storing chat history.", index)

self.client.indices.create(
index=index,
body=_default_message_mapping(),
)

@staticmethod
def get_user_agent() -> str:
from langchain_community import __version__

return f"langchain-py-ms/{__version__}"

@property
def messages(self) -> List[BaseMessage]:
"""Retrieve the messages from OpenSearch"""
try:
from opensearchpy import RequestError

result = self.client.search(
index=self.index,
body={"query": {"match": {"SessionId": self.session_id}}},
)
except RequestError as err:
logger.error("Could not retrieve messages from OpenSearch: %s", err)
raise err

if result and len(result["hits"]["hits"]) > 0:
items = [
json.loads(document["_source"]["History"])
for document in result["hits"]["hits"]
]
else:
items = []

return messages_from_dict(items)

@messages.setter
def messages(self, messages: List[BaseMessage]) -> None:
raise NotImplementedError(
"Direct assignment to 'messages' is not allowed."
" Use the 'add_messages' instead."
)

def add_message(self, message: BaseMessage) -> None:
"""Add a message to the chat session in OpenSearch"""
try:
from opensearchpy import RequestError

self.client.index(
index=self.index,
body={
"SessionId": self.session_id,
"Created_At": round(time() * 1000),
"History": json.dumps(message_to_dict(message)),
},
refresh=True,
)
except RequestError as err:
logger.error("Could not add message to OpenSearch: %s", err)
raise err

def clear(self) -> None:
"""Clear session memory in OpenSearch"""
try:
from opensearchpy import RequestError

self.client.delete_by_query(
index=self.index,
body={"SessionId": self.session_id},
)
except RequestError as err:
logger.error("Could not clear session memory in OpenSearch: %s", err)
raise err

0 comments on commit 5040082

Please sign in to comment.