diff --git a/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb b/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb new file mode 100644 index 0000000000000..71b0855fa67a7 --- /dev/null +++ b/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb @@ -0,0 +1,167 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "683953b3", + "metadata": { + "id": "683953b3" + }, + "source": [ + "# OpenSearch\n", + "\n", + ">[OpenSearch](https://opensearch.org) is a distributed, RESTful search and analytics engine, capable of performing both vector and lexical search, derived from Elasticsearch 7.10.2. \n", + "\n", + "This notebook shows how to use chat message history functionality with `OpenSearch`." + ] + }, + { + "cell_type": "markdown", + "id": "3c7720c3", + "metadata": {}, + "source": [ + "## Set up OpenSearch\n", + "\n", + "There are two main ways to set up an OpenSearch instance:\n", + "\n", + "1. **AWS.** AWS offers a managed OpenSearch service. You can test it with their [free credits](https://aws.amazon.com/opensearch-service/).\n", + "\n", + "2. **Local OpenSearch installation.** Get started with OpenSearch by running it locally. The easiest way is to use the official OpenSearch Docker image. See the [OpenSearch Docker documentation](https://opensearch.org/docs/latest/install-and-configure/install-opensearch/docker/) for more information." + ] + }, + { + "cell_type": "markdown", + "id": "cdf1d2b7", + "metadata": {}, + "source": [ + "## Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e5bbffe2", + "metadata": {}, + "outputs": [], + "source": [ + "%pip install --upgrade --quiet opensearch-py langchain langchain-community" + ] + }, + { + "cell_type": "markdown", + "id": "c46c216c", + "metadata": {}, + "source": [ + "## Authentication\n", + "\n", + "### Use the Username/password\n", + "\n", + "```python\n", + "opensearch_username = os.environ.get(\"OPENSEARCH_USERNAME\", \"opensearch\")\n", + "opensearch_password = os.environ.get(\"OPENSEARCH_PASSWORD\", \"change me...\")\n", + "\n", + "history = OpenSearchChatMessageHistory(\n", + " opensearch_url=opensearch_url,\n", + " opensearch_user=opensearch_username,\n", + " opensearch_password=opensearch_password,\n", + " index=\"test-history\",\n", + " session_id=\"test-session\"\n", + ")\n", + "```\n", + "\n", + "```\n", + "NOTE: \n", + "If you want to instantiate the opensearch client separately, \n", + "pass it in with the `opensearch_connection` keyword argument.\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "8be8fcc3", + "metadata": {}, + "source": [ + "## Initialize OpenSearch client and chat message history" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "8e2ee0fa", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "from langchain_community.chat_message_histories import OpenSearchChatMessageHistory\n", + "\n", + "opensearch_url = os.environ.get(\"OPENSEARCH_URL\", \"http://localhost:9200\")\n", + "history = OpenSearchChatMessageHistory(\n", + " opensearch_url=opensearch_url, index=\"test-history\", session_id=\"test-session\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "a63942e2", + "metadata": {}, + "source": [ + "## Use the chat message history" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "c1c7be79", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[HumanMessage(content='hi!'), AIMessage(content='whats up?')]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "history.add_user_message(\"hi!\")\n", + "history.add_ai_message(\"whats up?\")\n", + "history.messages" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2e4710c0", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/libs/community/langchain_community/chat_message_histories/__init__.py b/libs/community/langchain_community/chat_message_histories/__init__.py index fc20cacacceab..f7823efbb13bf 100644 --- a/libs/community/langchain_community/chat_message_histories/__init__.py +++ b/libs/community/langchain_community/chat_message_histories/__init__.py @@ -55,6 +55,9 @@ from langchain_community.chat_message_histories.neo4j import ( Neo4jChatMessageHistory, ) + from langchain_community.chat_message_histories.opensearch import ( + OpenSearchChatMessageHistory, + ) from langchain_community.chat_message_histories.postgres import ( PostgresChatMessageHistory, ) @@ -101,6 +104,7 @@ "MomentoChatMessageHistory", "MongoDBChatMessageHistory", "Neo4jChatMessageHistory", + "OpenSearchChatMessageHistory", "PostgresChatMessageHistory", "RedisChatMessageHistory", "RocksetChatMessageHistory", @@ -127,6 +131,7 @@ "MomentoChatMessageHistory": "langchain_community.chat_message_histories.momento", "MongoDBChatMessageHistory": "langchain_community.chat_message_histories.mongodb", "Neo4jChatMessageHistory": "langchain_community.chat_message_histories.neo4j", + "OpenSearchChatMessageHistory": "langchain_community.chat_message_histories.opensearch", # noqa: E501 "PostgresChatMessageHistory": "langchain_community.chat_message_histories.postgres", "RedisChatMessageHistory": "langchain_community.chat_message_histories.redis", "RocksetChatMessageHistory": "langchain_community.chat_message_histories.rocksetdb", diff --git a/libs/community/langchain_community/chat_message_histories/opensearch.py b/libs/community/langchain_community/chat_message_histories/opensearch.py new file mode 100644 index 0000000000000..19c063662c955 --- /dev/null +++ b/libs/community/langchain_community/chat_message_histories/opensearch.py @@ -0,0 +1,148 @@ +import json +import logging +from time import time +from typing import Any, List, Union + +from langchain_core.chat_history import BaseChatMessageHistory +from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict + +logger = logging.getLogger(__name__) + +DEFAULT_INDEX_NAME = "chat-history" +IMPORT_OPENSEARCH_PY_ERROR = ( + "Could not import OpenSearch. Please install it with `pip install opensearch-py`." +) + + +def _default_message_mapping() -> dict: + return {"mappings": {"properties": {"SessionId": {"type": "keyword"}}}} + + +class OpenSearchChatMessageHistory(BaseChatMessageHistory): + """Chat message history that stores history in OpenSearch. + + Args: + opensearch_url: connection string to connect to OpenSearch. + session_id: Arbitrary key that is used to store the messages + of a single chat session. + index: Name of the index to use. + """ + + def __init__( + self, + opensearch_url: str, + session_id: str, + index: str = DEFAULT_INDEX_NAME, + **kwargs: Any, + ): + self.opensearch_url = opensearch_url + self.index = index + self.session_id = session_id + + opensearch_connection: Union["OpenSearch", None] = kwargs.get( + "opensearch_connection" + ) + + try: + from opensearchpy import OpenSearch + except ImportError as e: + raise ImportError(IMPORT_OPENSEARCH_PY_ERROR) from e + + # Initialize OpenSearch client from passed client arg or connection info + if not opensearch_url and not opensearch_connection: + raise ValueError("OpenSearch connection or URL is required.") + + try: + if opensearch_connection: + self.client = opensearch_connection.options( + headers={"user-agent": self.get_user_agent()} + ) + else: + self.client = OpenSearch( + [opensearch_url], + **kwargs, + ) + except ValueError as e: + raise ValueError( + "Your OpenSearch client string is mis-formatted. Got error: {e}" + ) from e + + if self.client.indices.exists(index=index): + logger.debug( + "Chat history index %s already exists, skipping creation.", index + ) + else: + logger.debug("Creating index %s for storing chat history.", index) + + self.client.indices.create( + index=index, + body=_default_message_mapping(), + ) + + @staticmethod + def get_user_agent() -> str: + from langchain_community import __version__ + + return f"langchain-py-ms/{__version__}" + + @property + def messages(self) -> List[BaseMessage]: + """Retrieve the messages from OpenSearch""" + try: + from opensearchpy import RequestError + + result = self.client.search( + index=self.index, + body={"query": {"match": {"SessionId": self.session_id}}}, + ) + except RequestError as err: + logger.error("Could not retrieve messages from OpenSearch: %s", err) + raise err + + if result and len(result["hits"]["hits"]) > 0: + items = [ + json.loads(document["_source"]["History"]) + for document in result["hits"]["hits"] + ] + else: + items = [] + + return messages_from_dict(items) + + @messages.setter + def messages(self, messages: List[BaseMessage]) -> None: + raise NotImplementedError( + "Direct assignment to 'messages' is not allowed." + " Use the 'add_messages' instead." + ) + + def add_message(self, message: BaseMessage) -> None: + """Add a message to the chat session in OpenSearch""" + try: + from opensearchpy import RequestError + + self.client.index( + index=self.index, + body={ + "SessionId": self.session_id, + "Created_At": round(time() * 1000), + "History": json.dumps(message_to_dict(message)), + }, + refresh=True, + ) + except RequestError as err: + logger.error("Could not add message to OpenSearch: %s", err) + raise err + + def clear(self) -> None: + """Clear session memory in OpenSearch""" + try: + from opensearchpy import RequestError + + self.client.delete_by_query( + index=self.index, + body={"query": {"term": {"SessionId": self.session_id}}}, + ) + except RequestError as err: + logger.error("Could not clear session memory in OpenSearch: %s", err) + raise err diff --git a/libs/community/tests/unit_tests/chat_message_histories/test_imports.py b/libs/community/tests/unit_tests/chat_message_histories/test_imports.py index 4c14a0efd9a56..3175258d41f93 100644 --- a/libs/community/tests/unit_tests/chat_message_histories/test_imports.py +++ b/libs/community/tests/unit_tests/chat_message_histories/test_imports.py @@ -12,6 +12,7 @@ "MomentoChatMessageHistory", "MongoDBChatMessageHistory", "Neo4jChatMessageHistory", + "OpenSearchChatMessageHistory", "PostgresChatMessageHistory", "RedisChatMessageHistory", "RocksetChatMessageHistory",