From f17c6122265da416c4f39d221f25d0e532c8648c Mon Sep 17 00:00:00 2001 From: leedotpang Date: Fri, 30 Aug 2024 15:56:16 -0400 Subject: [PATCH 1/5] community[feat]: Add OpenSearch Chat Message History --- .../opensearch_chat_message_history.ipynb | 162 ++++++++++++++++++ .../chat_message_histories/__init__.py | 5 + .../chat_message_histories/opensearch.py | 149 ++++++++++++++++ 3 files changed, 316 insertions(+) create mode 100644 docs/docs/integrations/memory/opensearch_chat_message_history.ipynb create mode 100644 libs/community/langchain_community/chat_message_histories/opensearch.py diff --git a/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb b/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb new file mode 100644 index 0000000000000..c76dc40813c8a --- /dev/null +++ b/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb @@ -0,0 +1,162 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "683953b3", + "metadata": { + "id": "683953b3" + }, + "source": [ + "# OpenSearch\n", + "\n", + ">[OpenSearch](https://opensearch.org) is a distributed, RESTful search and analytics engine, capable of performing both vector and lexical search, derived from Elasticsearch 7.10.2. \n", + "\n", + "This notebook shows how to use chat message history functionality with `OpenSearch`." + ] + }, + { + "cell_type": "markdown", + "id": "3c7720c3", + "metadata": {}, + "source": [ + "## Set up OpenSearch\n", + "\n", + "There are two main ways to set up an OpenSearch instance:\n", + "\n", + "1. **AWS.** AWS offers a managed OpenSearch service. You can test it with their [free credits](https://aws.amazon.com/opensearch-service/).\n", + "\n", + "2. **Local OpenSearch installation.** Get started with OpenSearch by running it locally. The easiest way is to use the official OpenSearch Docker image. See the [OpenSearch Docker documentation](https://opensearch.org/docs/latest/install-and-configure/install-opensearch/docker/) for more information." + ] + }, + { + "cell_type": "markdown", + "id": "cdf1d2b7", + "metadata": {}, + "source": [ + "## Install dependencies" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e5bbffe2", + "metadata": {}, + "outputs": [], + "source": [ + "%pip install --upgrade --quiet opensearch-py langchain langchain-community" + ] + }, + { + "cell_type": "markdown", + "id": "c46c216c", + "metadata": {}, + "source": [ + "## Authentication\n", + "\n", + "### Use the Username/password\n", + "\n", + "```python\n", + "opensearch_username = os.environ.get(\"OPENSEARCH_USERNAME\", \"opensearch\")\n", + "opensearch_password = os.environ.get(\"OPENSEARCH_PASSWORD\", \"change me...\")\n", + "\n", + "history = OpenSearchChatMessageHistory(\n", + " opensearch_url=opensearch_url,\n", + " opensearch_user=opensearch_username,\n", + " opensearch_password=opensearch_password,\n", + " index=\"test-history\",\n", + " session_id=\"test-session\"\n", + ")\n", + "```\n", + "\n", + "```\n", + "NOTE: \n", + "If you want to instantiate the opensearch client separately, \n", + "pass it in with the `opensearch_connection` keyword argument.\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "8be8fcc3", + "metadata": {}, + "source": [ + "## Initialize OpenSearch client and chat message history" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "8e2ee0fa", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "from langchain_community.chat_message_histories import OpenSearchChatMessageHistory\n", + "\n", + "opensearch_url = os.environ.get(\"OPENSEARCH_URL\", \"http://localhost:9200\")\n", + "history = OpenSearchChatMessageHistory(\n", + " opensearch_url=opensearch_url, index=\"test-history\", session_id=\"test-session\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "a63942e2", + "metadata": {}, + "source": [ + "## Use the chat message history" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "c1c7be79", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[HumanMessage(content='hi!'),\n", + " AIMessage(content='whats up?'),\n", + " HumanMessage(content='hi!'),\n", + " AIMessage(content='whats up?')]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "history.add_user_message(\"hi!\")\n", + "history.add_ai_message(\"whats up?\")\n", + "history.messages" + ] + } + ], + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/libs/community/langchain_community/chat_message_histories/__init__.py b/libs/community/langchain_community/chat_message_histories/__init__.py index fc20cacacceab..0e59bcbabc1df 100644 --- a/libs/community/langchain_community/chat_message_histories/__init__.py +++ b/libs/community/langchain_community/chat_message_histories/__init__.py @@ -55,6 +55,9 @@ from langchain_community.chat_message_histories.neo4j import ( Neo4jChatMessageHistory, ) + from langchain_community.chat_message_histories.opensearch import ( + OpenSearchChatMessageHistory, + ) from langchain_community.chat_message_histories.postgres import ( PostgresChatMessageHistory, ) @@ -101,6 +104,7 @@ "MomentoChatMessageHistory", "MongoDBChatMessageHistory", "Neo4jChatMessageHistory", + "OpenSearchChatMessageHistory", "PostgresChatMessageHistory", "RedisChatMessageHistory", "RocksetChatMessageHistory", @@ -127,6 +131,7 @@ "MomentoChatMessageHistory": "langchain_community.chat_message_histories.momento", "MongoDBChatMessageHistory": "langchain_community.chat_message_histories.mongodb", "Neo4jChatMessageHistory": "langchain_community.chat_message_histories.neo4j", + "OpenSearchChatMessageHistory": "langchain_community.chat_message_histories.opensearch", "PostgresChatMessageHistory": "langchain_community.chat_message_histories.postgres", "RedisChatMessageHistory": "langchain_community.chat_message_histories.redis", "RocksetChatMessageHistory": "langchain_community.chat_message_histories.rocksetdb", diff --git a/libs/community/langchain_community/chat_message_histories/opensearch.py b/libs/community/langchain_community/chat_message_histories/opensearch.py new file mode 100644 index 0000000000000..8a48d82a466c8 --- /dev/null +++ b/libs/community/langchain_community/chat_message_histories/opensearch.py @@ -0,0 +1,149 @@ +import json +import logging +from time import time +from typing import TYPE_CHECKING, List + +from langchain_core.chat_history import BaseChatMessageHistory +from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict + +if TYPE_CHECKING: + from opensearchpy import OpenSearch + +logger = logging.getLogger(__name__) + +DEFAULT_INDEX_NAME = "chat-history" +IMPORT_OPENSEARCH_PY_ERROR = ( + "Could not import OpenSearch. Please install it with `pip install opensearch-py`." +) + + +def _default_message_mapping() -> dict: + return {"mappings": {"properties": {"SessionId": {"type": "keyword"}}}} + + +class OpenSearchChatMessageHistory(BaseChatMessageHistory): + """Chat message history that stores history in OpenSearch. + + Args: + opensearch_url: connection string to connect to OpenSearch. + session_id: Arbitrary key that is used to store the messages + of a single chat session. + index: Name of the index to use. + """ + + def __init__( + self, + opensearch_url: str = None, + session_id: str = None, + index: str = DEFAULT_INDEX_NAME, + **kwargs, + ): + self.opensearch_url = opensearch_url + self.index = index + self.session_id = session_id + + opensearch_connection = kwargs.get("opensearch_connection") + + try: + from opensearchpy import OpenSearch + except ImportError as e: + raise ImportError(IMPORT_OPENSEARCH_PY_ERROR) from e + + # Initialize OpenSearch client from passed client arg or connection info + if not opensearch_url and not opensearch_connection: + raise ValueError("OpenSearch connection or URL is required.") + + try: + if opensearch_connection: + self.client = kwargs.get("opensearch_connection").options( + headers={"user-agent": self.get_user_agent()} + ) + else: + self.client = OpenSearch( + [opensearch_url], + **kwargs, + ) + except ValueError as e: + raise ValueError( + "Your OpenSearch client string is mis-formatted. Got error: {e}" + ) from e + + if self.client.indices.exists(index=index): + logger.debug( + "Chat history index %s already exists, skipping creation.", index + ) + else: + logger.debug("Creating index %s for storing chat history.", index) + + self.client.indices.create( + index=index, + body=_default_message_mapping(), + ) + + @staticmethod + def get_user_agent() -> str: + from langchain_community import __version__ + + return f"langchain-py-ms/{__version__}" + + @property + def messages(self) -> List[BaseMessage]: + """Retrieve the messages from OpenSearch""" + try: + from opensearchpy import RequestError + + result = self.client.search( + index=self.index, + body={"query": {"match": {"SessionId": self.session_id}}}, + ) + except RequestError as err: + logger.error("Could not retrieve messages from OpenSearch: %s", err) + raise err + + if result and len(result["hits"]["hits"]) > 0: + items = [ + json.loads(document["_source"]["History"]) + for document in result["hits"]["hits"] + ] + else: + items = [] + + return messages_from_dict(items) + + @messages.setter + def messages(self, messages: List[BaseMessage]) -> None: + raise NotImplementedError( + "Direct assignment to 'messages' is not allowed." + " Use the 'add_messages' instead." + ) + + def add_message(self, message: BaseMessage) -> None: + """Add a message to the chat session in OpenSearch""" + try: + from opensearchpy import RequestError + + self.client.index( + index=self.index, + body={ + "SessionId": self.session_id, + "Created_At": round(time() * 1000), + "History": json.dumps(message_to_dict(message)), + }, + refresh=True, + ) + except RequestError as err: + logger.error("Could not add message to OpenSearch: %s", err) + raise err + + def clear(self) -> None: + """Clear session memory in OpenSearch""" + try: + from opensearchpy import RequestError + + self.client.delete_by_query( + index=self.index, + body={"SessionId": self.session_id}, + ) + except RequestError as err: + logger.error("Could not clear session memory in OpenSearch: %s", err) + raise err From 16d6ac6143d09a90ec628a06216a6c873832592f Mon Sep 17 00:00:00 2001 From: leedotpang Date: Tue, 3 Sep 2024 12:20:30 -0400 Subject: [PATCH 2/5] Fixed linting and tests --- .../langchain_community/chat_message_histories/__init__.py | 2 +- .../langchain_community/chat_message_histories/opensearch.py | 5 +---- .../tests/unit_tests/chat_message_histories/test_imports.py | 1 + 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/libs/community/langchain_community/chat_message_histories/__init__.py b/libs/community/langchain_community/chat_message_histories/__init__.py index 0e59bcbabc1df..f7823efbb13bf 100644 --- a/libs/community/langchain_community/chat_message_histories/__init__.py +++ b/libs/community/langchain_community/chat_message_histories/__init__.py @@ -131,7 +131,7 @@ "MomentoChatMessageHistory": "langchain_community.chat_message_histories.momento", "MongoDBChatMessageHistory": "langchain_community.chat_message_histories.mongodb", "Neo4jChatMessageHistory": "langchain_community.chat_message_histories.neo4j", - "OpenSearchChatMessageHistory": "langchain_community.chat_message_histories.opensearch", + "OpenSearchChatMessageHistory": "langchain_community.chat_message_histories.opensearch", # noqa: E501 "PostgresChatMessageHistory": "langchain_community.chat_message_histories.postgres", "RedisChatMessageHistory": "langchain_community.chat_message_histories.redis", "RocksetChatMessageHistory": "langchain_community.chat_message_histories.rocksetdb", diff --git a/libs/community/langchain_community/chat_message_histories/opensearch.py b/libs/community/langchain_community/chat_message_histories/opensearch.py index 8a48d82a466c8..6a2e564d13410 100644 --- a/libs/community/langchain_community/chat_message_histories/opensearch.py +++ b/libs/community/langchain_community/chat_message_histories/opensearch.py @@ -1,14 +1,11 @@ import json import logging from time import time -from typing import TYPE_CHECKING, List +from typing import List from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict -if TYPE_CHECKING: - from opensearchpy import OpenSearch - logger = logging.getLogger(__name__) DEFAULT_INDEX_NAME = "chat-history" diff --git a/libs/community/tests/unit_tests/chat_message_histories/test_imports.py b/libs/community/tests/unit_tests/chat_message_histories/test_imports.py index 4c14a0efd9a56..3175258d41f93 100644 --- a/libs/community/tests/unit_tests/chat_message_histories/test_imports.py +++ b/libs/community/tests/unit_tests/chat_message_histories/test_imports.py @@ -12,6 +12,7 @@ "MomentoChatMessageHistory", "MongoDBChatMessageHistory", "Neo4jChatMessageHistory", + "OpenSearchChatMessageHistory", "PostgresChatMessageHistory", "RedisChatMessageHistory", "RocksetChatMessageHistory", From 67ea7ef4987ef8c22c25ca9e4e416aab257d16c5 Mon Sep 17 00:00:00 2001 From: leedotpang Date: Tue, 3 Sep 2024 12:32:46 -0400 Subject: [PATCH 3/5] Fixed clear method and notebook output --- .../memory/opensearch_chat_message_history.ipynb | 13 +++++++++---- .../chat_message_histories/opensearch.py | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb b/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb index c76dc40813c8a..71b0855fa67a7 100644 --- a/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb +++ b/docs/docs/integrations/memory/opensearch_chat_message_history.ipynb @@ -117,10 +117,7 @@ { "data": { "text/plain": [ - "[HumanMessage(content='hi!'),\n", - " AIMessage(content='whats up?'),\n", - " HumanMessage(content='hi!'),\n", - " AIMessage(content='whats up?')]" + "[HumanMessage(content='hi!'), AIMessage(content='whats up?')]" ] }, "execution_count": 4, @@ -133,6 +130,14 @@ "history.add_ai_message(\"whats up?\")\n", "history.messages" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2e4710c0", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/libs/community/langchain_community/chat_message_histories/opensearch.py b/libs/community/langchain_community/chat_message_histories/opensearch.py index 6a2e564d13410..8653e7987af01 100644 --- a/libs/community/langchain_community/chat_message_histories/opensearch.py +++ b/libs/community/langchain_community/chat_message_histories/opensearch.py @@ -139,7 +139,7 @@ def clear(self) -> None: self.client.delete_by_query( index=self.index, - body={"SessionId": self.session_id}, + body={"query": {"term": {"SessionId": self.session_id}}}, ) except RequestError as err: logger.error("Could not clear session memory in OpenSearch: %s", err) From 44e77c6527cf50cf81f84f3ebd2ebdade15f7abc Mon Sep 17 00:00:00 2001 From: leedotpang Date: Tue, 3 Sep 2024 12:42:37 -0400 Subject: [PATCH 4/5] More linting --- .../chat_message_histories/opensearch.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/libs/community/langchain_community/chat_message_histories/opensearch.py b/libs/community/langchain_community/chat_message_histories/opensearch.py index 8653e7987af01..a501964476177 100644 --- a/libs/community/langchain_community/chat_message_histories/opensearch.py +++ b/libs/community/langchain_community/chat_message_histories/opensearch.py @@ -1,7 +1,7 @@ import json import logging from time import time -from typing import List +from typing import Any, List from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict @@ -30,16 +30,16 @@ class OpenSearchChatMessageHistory(BaseChatMessageHistory): def __init__( self, - opensearch_url: str = None, - session_id: str = None, + opensearch_url: str, + session_id: str, index: str = DEFAULT_INDEX_NAME, - **kwargs, + **kwargs: Any, ): self.opensearch_url = opensearch_url self.index = index self.session_id = session_id - opensearch_connection = kwargs.get("opensearch_connection") + opensearch_connection: "OpenSearch" | None = kwargs.get("opensearch_connection") try: from opensearchpy import OpenSearch @@ -52,7 +52,7 @@ def __init__( try: if opensearch_connection: - self.client = kwargs.get("opensearch_connection").options( + self.client = opensearch_connection.options( headers={"user-agent": self.get_user_agent()} ) else: From 96098d539fca60eefe84b02e2d4da3d2b591eebb Mon Sep 17 00:00:00 2001 From: leedotpang Date: Tue, 3 Sep 2024 16:13:51 -0400 Subject: [PATCH 5/5] Fix Union for 3.8 --- .../chat_message_histories/opensearch.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libs/community/langchain_community/chat_message_histories/opensearch.py b/libs/community/langchain_community/chat_message_histories/opensearch.py index a501964476177..19c063662c955 100644 --- a/libs/community/langchain_community/chat_message_histories/opensearch.py +++ b/libs/community/langchain_community/chat_message_histories/opensearch.py @@ -1,7 +1,7 @@ import json import logging from time import time -from typing import Any, List +from typing import Any, List, Union from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.messages import BaseMessage, message_to_dict, messages_from_dict @@ -39,7 +39,9 @@ def __init__( self.index = index self.session_id = session_id - opensearch_connection: "OpenSearch" | None = kwargs.get("opensearch_connection") + opensearch_connection: Union["OpenSearch", None] = kwargs.get( + "opensearch_connection" + ) try: from opensearchpy import OpenSearch