Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

community[feat]: Add OpenSearch Chat Message History #25898

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions docs/docs/integrations/memory/opensearch_chat_message_history.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "683953b3",
"metadata": {
"id": "683953b3"
},
"source": [
"# OpenSearch\n",
"\n",
">[OpenSearch](https://opensearch.org) is a distributed, RESTful search and analytics engine, capable of performing both vector and lexical search, derived from Elasticsearch 7.10.2. \n",
"\n",
"This notebook shows how to use chat message history functionality with `OpenSearch`."
]
},
{
"cell_type": "markdown",
"id": "3c7720c3",
"metadata": {},
"source": [
"## Set up OpenSearch\n",
"\n",
"There are two main ways to set up an OpenSearch instance:\n",
"\n",
"1. **AWS.** AWS offers a managed OpenSearch service. You can test it with their [free credits](https://aws.amazon.com/opensearch-service/).\n",
"\n",
"2. **Local OpenSearch installation.** Get started with OpenSearch by running it locally. The easiest way is to use the official OpenSearch Docker image. See the [OpenSearch Docker documentation](https://opensearch.org/docs/latest/install-and-configure/install-opensearch/docker/) for more information."
]
},
{
"cell_type": "markdown",
"id": "cdf1d2b7",
"metadata": {},
"source": [
"## Install dependencies"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e5bbffe2",
"metadata": {},
"outputs": [],
"source": [
"%pip install --upgrade --quiet opensearch-py langchain langchain-community"
]
},
{
"cell_type": "markdown",
"id": "c46c216c",
"metadata": {},
"source": [
"## Authentication\n",
"\n",
"### Use the Username/password\n",
"\n",
"```python\n",
"opensearch_username = os.environ.get(\"OPENSEARCH_USERNAME\", \"opensearch\")\n",
"opensearch_password = os.environ.get(\"OPENSEARCH_PASSWORD\", \"change me...\")\n",
"\n",
"history = OpenSearchChatMessageHistory(\n",
" opensearch_url=opensearch_url,\n",
" opensearch_user=opensearch_username,\n",
" opensearch_password=opensearch_password,\n",
" index=\"test-history\",\n",
" session_id=\"test-session\"\n",
")\n",
"```\n",
"\n",
"```\n",
"NOTE: \n",
"If you want to instantiate the opensearch client separately, \n",
"pass it in with the `opensearch_connection` keyword argument.\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "8be8fcc3",
"metadata": {},
"source": [
"## Initialize OpenSearch client and chat message history"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "8e2ee0fa",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from langchain_community.chat_message_histories import OpenSearchChatMessageHistory\n",
"\n",
"opensearch_url = os.environ.get(\"OPENSEARCH_URL\", \"http://localhost:9200\")\n",
"history = OpenSearchChatMessageHistory(\n",
" opensearch_url=opensearch_url, index=\"test-history\", session_id=\"test-session\"\n",
")"
]
},
{
"cell_type": "markdown",
"id": "a63942e2",
"metadata": {},
"source": [
"## Use the chat message history"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "c1c7be79",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[HumanMessage(content='hi!'), AIMessage(content='whats up?')]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"history.add_user_message(\"hi!\")\n",
"history.add_ai_message(\"whats up?\")\n",
"history.messages"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2e4710c0",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
from langchain_community.chat_message_histories.neo4j import (
Neo4jChatMessageHistory,
)
from langchain_community.chat_message_histories.opensearch import (
OpenSearchChatMessageHistory,
)
from langchain_community.chat_message_histories.postgres import (
PostgresChatMessageHistory,
)
Expand Down Expand Up @@ -101,6 +104,7 @@
"MomentoChatMessageHistory",
"MongoDBChatMessageHistory",
"Neo4jChatMessageHistory",
"OpenSearchChatMessageHistory",
"PostgresChatMessageHistory",
"RedisChatMessageHistory",
"RocksetChatMessageHistory",
Expand All @@ -127,6 +131,7 @@
"MomentoChatMessageHistory": "langchain_community.chat_message_histories.momento",
"MongoDBChatMessageHistory": "langchain_community.chat_message_histories.mongodb",
"Neo4jChatMessageHistory": "langchain_community.chat_message_histories.neo4j",
"OpenSearchChatMessageHistory": "langchain_community.chat_message_histories.opensearch", # noqa: E501
"PostgresChatMessageHistory": "langchain_community.chat_message_histories.postgres",
"RedisChatMessageHistory": "langchain_community.chat_message_histories.redis",
"RocksetChatMessageHistory": "langchain_community.chat_message_histories.rocksetdb",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import json
import logging
from time import time
from typing import Any, List, Union

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict

logger = logging.getLogger(__name__)

DEFAULT_INDEX_NAME = "chat-history"
IMPORT_OPENSEARCH_PY_ERROR = (
"Could not import OpenSearch. Please install it with `pip install opensearch-py`."
)


def _default_message_mapping() -> dict:
return {"mappings": {"properties": {"SessionId": {"type": "keyword"}}}}


class OpenSearchChatMessageHistory(BaseChatMessageHistory):
"""Chat message history that stores history in OpenSearch.

Args:
opensearch_url: connection string to connect to OpenSearch.
session_id: Arbitrary key that is used to store the messages
of a single chat session.
index: Name of the index to use.
"""

def __init__(
self,
opensearch_url: str,
session_id: str,
index: str = DEFAULT_INDEX_NAME,
**kwargs: Any,
):
self.opensearch_url = opensearch_url
self.index = index
self.session_id = session_id

opensearch_connection: Union["OpenSearch", None] = kwargs.get(
"opensearch_connection"
)

try:
from opensearchpy import OpenSearch
except ImportError as e:
raise ImportError(IMPORT_OPENSEARCH_PY_ERROR) from e

# Initialize OpenSearch client from passed client arg or connection info
if not opensearch_url and not opensearch_connection:
raise ValueError("OpenSearch connection or URL is required.")

try:
if opensearch_connection:
self.client = opensearch_connection.options(
headers={"user-agent": self.get_user_agent()}
)
else:
self.client = OpenSearch(
[opensearch_url],
**kwargs,
)
except ValueError as e:
raise ValueError(
"Your OpenSearch client string is mis-formatted. Got error: {e}"
) from e

if self.client.indices.exists(index=index):
logger.debug(
"Chat history index %s already exists, skipping creation.", index
)
else:
logger.debug("Creating index %s for storing chat history.", index)

self.client.indices.create(
index=index,
body=_default_message_mapping(),
)

@staticmethod
def get_user_agent() -> str:
from langchain_community import __version__

return f"langchain-py-ms/{__version__}"

@property
def messages(self) -> List[BaseMessage]:
"""Retrieve the messages from OpenSearch"""
try:
from opensearchpy import RequestError

result = self.client.search(
index=self.index,
body={"query": {"match": {"SessionId": self.session_id}}},
)
except RequestError as err:
logger.error("Could not retrieve messages from OpenSearch: %s", err)
raise err

if result and len(result["hits"]["hits"]) > 0:
items = [
json.loads(document["_source"]["History"])
for document in result["hits"]["hits"]
]
else:
items = []

return messages_from_dict(items)

@messages.setter
def messages(self, messages: List[BaseMessage]) -> None:
raise NotImplementedError(
"Direct assignment to 'messages' is not allowed."
" Use the 'add_messages' instead."
)

def add_message(self, message: BaseMessage) -> None:
"""Add a message to the chat session in OpenSearch"""
try:
from opensearchpy import RequestError

self.client.index(
index=self.index,
body={
"SessionId": self.session_id,
"Created_At": round(time() * 1000),
"History": json.dumps(message_to_dict(message)),
},
refresh=True,
)
except RequestError as err:
logger.error("Could not add message to OpenSearch: %s", err)
raise err

def clear(self) -> None:
"""Clear session memory in OpenSearch"""
try:
from opensearchpy import RequestError

self.client.delete_by_query(
index=self.index,
body={"query": {"term": {"SessionId": self.session_id}}},
)
except RequestError as err:
logger.error("Could not clear session memory in OpenSearch: %s", err)
raise err
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"MomentoChatMessageHistory",
"MongoDBChatMessageHistory",
"Neo4jChatMessageHistory",
"OpenSearchChatMessageHistory",
"PostgresChatMessageHistory",
"RedisChatMessageHistory",
"RocksetChatMessageHistory",
Expand Down
Loading