Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added FalkorDB Chat History Support i.e implemenation, tests, and example notebook #26657

Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e78508f
Added FalkorDB Chat History Support i.e implemenation, tests, and exa…
kingtroga Sep 19, 2024
dc918a9
Added review changes
kingtroga Oct 2, 2024
57f5814
added an index on the id property
kingtroga Oct 4, 2024
2eb1327
removed the redundant WHERE clauses
kingtroga Oct 7, 2024
b74d0d1
removed session_id
kingtroga Oct 10, 2024
b4d527e
Merge branch 'master' into falkordbchatmessagehistorycreation
efriis Nov 7, 2024
1b497f9
Merge branch 'master' into falkordbchatmessagehistorycreation
kingtroga Nov 9, 2024
a9831ae
Fixed the CI and CD errors
kingtroga Nov 9, 2024
e076684
Fix more errors
kingtroga Nov 9, 2024
2ce8425
added redis
kingtroga Nov 9, 2024
1c3eca8
trying to fix the import error
kingtroga Nov 9, 2024
c95ccdd
here we go again
kingtroga Nov 9, 2024
73edb06
last try before I go to bed
kingtroga Nov 9, 2024
ebc4732
FIX CI CD ERRORS AGAIN
kingtroga Nov 11, 2024
4d67430
😑
kingtroga Nov 11, 2024
8ac5d8b
Fix again
kingtroga Nov 11, 2024
c297bb7
Fix Cd libs/community/ make test errors
kingtroga Nov 11, 2024
e36031d
Merge branch 'master' into falkordbchatmessagehistorycreation
kingtroga Nov 17, 2024
102f826
last fix for tonight
kingtroga Nov 17, 2024
983dc71
Merge branch 'master' into falkordbchatmessagehistorycreation
kingtroga Nov 19, 2024
2a2d1fc
Fix Redis error
kingtroga Nov 19, 2024
78dbae6
Fixed
kingtroga Nov 24, 2024
c3b770d
Merge branch 'master' into falkordbchatmessagehistorycreation
kingtroga Nov 24, 2024
fc32743
Poetry Lock fix
kingtroga Nov 24, 2024
aae00ff
Poetry lock fix
kingtroga Nov 24, 2024
e2b08f9
Poetry lock fix
kingtroga Nov 24, 2024
d6f1b96
Merge branch 'master' into falkordbchatmessagehistorycreation
kingtroga Nov 27, 2024
012a350
Merge branch 'master' into falkordbchatmessagehistorycreation
kingtroga Nov 27, 2024
8fb4424
fix poetry lock
kingtroga Nov 27, 2024
b9b5208
Merge branch 'master' into falkordbchatmessagehistorycreation
kingtroga Nov 27, 2024
2d7e138
fix poetry lock
kingtroga Nov 27, 2024
6fdb10a
Merge branch 'master' into falkordbchatmessagehistorycreation
efriis Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions docs/docs/integrations/memory/falkordb_chat_message_history.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# FalkorDB\n",
"\n",
"<a href='https://docs.falkordb.com/' target='_blank'>FalkorDB</a> is an open-source graph database management system, renowned for its efficient management of highly connected data. Unlike traditional databases that store data in tables, FalkorDB uses a graph structure with nodes, edges, and properties to represent and store data. This design allows for high-performance queries on complex data relationships.\n",
"\n",
"This notebook goes over how to use `FalkorDB` to store chat message history\n",
"\n",
"**NOTE**: You can use FalkorDB locally or use FalkorDB Cloud. <a href='https://docs.falkordb.com/' target='blank'>See installation instructions</a>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# For this example notebook we will be using FalkorDB locally\n",
"host = 'localhost'\n",
"port = 6379"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from langchain_community.chat_message_histories import FalkorDBChatMessageHistory\n",
"\n",
"history = FalkorDBChatMessageHistory(\n",
" host=host,\n",
" port=port,\n",
" session_id=\"session_id_1\"\n",
")\n",
"\n",
"history.add_user_message(\"hi!\")\n",
"\n",
"history.add_ai_message(\"whats up?\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"history.messages"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
217 changes: 217 additions & 0 deletions libs/community/langchain_community/chat_message_histories/falkordb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
from typing import List, Optional, Union
import os
import redis.exceptions

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
)

from langchain_community.graphs import FalkorDBGraph


class FalkorDBChatMessageHistory(BaseChatMessageHistory):
"""Chat message history stored in a Falkor database.

This class handles storing and retrieving chat messages in a FalkorDB database.
It creates a session and stores messages in a message chain, maintaining a link
between subsequent messages.

Args:
session_id (Union[str, int]): The session ID for storing and retrieving messages
also the name of the database.
username (Optional[str]): Username for authenticating with FalkorDB.
password (Optional[str]): Password for authenticating with FalkorDB.
host (str): Host where the FalkorDB is running. Defaults to 'localhost'.
port (int): Port number where the FalkorDB is running. Defaults to 6379.
node_label (str): Label for the session node in the graph. Defaults to "Session".
window (int): The number of messages to retrieve when querying the history. Defaults to 3.
ssl (bool): Whether to use SSL for connecting to the database. Defaults to False.
graph (Optional[FalkorDBGraph]): Optionally provide an existing FalkorDBGraph object for connecting.

Example:
.. code-block:: python
from langchain_community.chat_message_histories import FalkorDBChatMessageHistory

history = FalkorDBChatMessageHistory(
session_id="1234",
host="localhost",
port=6379,
)
history.add_message(HumanMessage(content="Hello!"))
"""

def __init__(
self,
session_id: Union[str, int],
username: Optional[str] = None,
password: Optional[str] = None,
host: str = "localhost",
port: int = 6379,
node_label: str = "Session",
window: int = 3,
ssl: bool = False,
*,
graph: Optional[FalkorDBGraph] = None,
) -> None:
"""Initialize the FalkorDBChatMessageHistory class with the session and connection details."""
try:
import falkordb
except ImportError:
raise ImportError(
"Could not import falkordb python package."
"Please install it with `pip install falkordb`."
)

if not session_id:
raise ValueError("Please ensure that the session_id parameter is provided.")

if graph:
self._database = graph._graph
self._driver = graph._driver
else:
self._host = host
self._port = port
self._username = username or os.environ.get("FALKORDB_USERNAME")
self._password = password or os.environ.get("FALKORDB_PASSWORD")
self._ssl = ssl

try:
self._driver = falkordb.FalkorDB(
host=self._host,
port=self._port,
username=self._username,
password=self._password,
ssl=self._ssl,
)
except redis.exceptions.ConnectionError:
raise ValueError(
"Could not connect to FalkorDB database. "
"Please ensure that the host and port are correct."
)
except redis.exceptions.AuthenticationError:
raise ValueError(
"Could not connect to FalkorDB database. "
"Please ensure that the username and password are correct."
)

self._database = self._driver.select_graph(session_id)
self._session_id = session_id
self._node_label = node_label
self._window = window

self._database.query(
f"MERGE (s:{self._node_label} {{id:$session_id}})",
{"session_id": self._session_id},
)

try:
self._database.create_node_vector_index(
f"{self._node_label}", "id", dim=5, similarity_function="cosine"
)
except Exception as e:
if "already indexed" in e:
raise ValueError(f"{self._node_label} has already been indexed")

def _process_records(self, records: list) -> list:
"""Process the records from FalkorDB and convert them into BaseMessage objects.

Args:
records (list): The raw records fetched from the FalkorDB query.

Returns:
list: A list of `BaseMessage` objects.
"""
messages = []

for record in records:
content = record[0].get("data", {}).get("content", "")
message_type = record[0].get("type", "").lower()

if message_type == "human":
messages.append(
HumanMessage(
content=content, additional_kwargs={}, response_metadata={}
)
)
elif message_type == "ai":
messages.append(
AIMessage(
content=content, additional_kwargs={}, response_metadata={}
)
)
else:
print(f"Unknown message type: {message_type}")

return messages

@property
def messages(self) -> List[BaseMessage]:
"""Retrieve the messages from FalkorDB for the session.

Returns:
List[BaseMessage]: A list of messages in the current session.
"""
query = (
f"MATCH (s:{self._node_label})-[:LAST_MESSAGE]->(last_message) "
"MATCH p=(last_message)<-[:NEXT*0.."
f"{self._window*2}]-() WITH p, length(p) AS length "
"ORDER BY length DESC LIMIT 1 UNWIND reverse(nodes(p)) AS node "
"RETURN {data:{content: node.content}, type:node.type} AS result"
)

records = self._database.query(
query, {"session_id": self._session_id}
kingtroga marked this conversation as resolved.
Show resolved Hide resolved
).result_set

messages = self._process_records(records)
return messages

@messages.setter
def messages(self, messages: List[BaseMessage]) -> None:
"""Block direct assignment to 'messages' to prevent misuse."""
raise NotImplementedError(
"Direct assignment to 'messages' is not allowed."
" Use the 'add_message' method instead."
)

def add_message(self, message: BaseMessage) -> None:
"""Append a message to the session in FalkorDB.

Args:
message (BaseMessage): The message object to add to the session.
"""
create_query = (
f"MATCH (s:{self._node_label}) WHERE s.id = $session_id "
kingtroga marked this conversation as resolved.
Show resolved Hide resolved
"CREATE (new:Message {type: $type, content: $content}) "
"WITH s, new "
"OPTIONAL MATCH (s)-[lm:LAST_MESSAGE]->(last_message:Message) "
"FOREACH (_ IN CASE WHEN last_message IS NULL THEN [] ELSE [1] END | "
" MERGE (last_message)-[:NEXT]->(new)) "
"MERGE (s)-[:LAST_MESSAGE]->(new) "
)

self._database.query(
create_query,
{
"type": message.type,
"content": message.content,
"session_id": self._session_id,
kingtroga marked this conversation as resolved.
Show resolved Hide resolved
},
)

def clear(self) -> None:
"""Clear all messages from the session in FalkorDB.

Deletes all messages linked to the session and resets the message history.

Raises:
ValueError: If there is an issue with the query or the session.
"""
query = (
f"MATCH (s:{self._node_label})-[:LAST_MESSAGE|NEXT*0..]->(m:Message) "
"WITH m DELETE m"
)
self._database.query(query, {"session_id": self._session_id})
kingtroga marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Integration tests for FalkorDB Chat History/Memory functionality.

Note:
These tests are conducted using a local FalkorDB instance but can also
be run against a Cloud FalkorDB instance. Ensure that appropriate host,port
cusername, and password configurations are set up
kingtroga marked this conversation as resolved.
Show resolved Hide resolved
before running the tests.

Test Cases:
1. test_add_messages: Test basic functionality of adding and retrieving
chat messages from FalkorDB.
2. test_add_messages_graph_object: Test chat message functionality
when passing the FalkorDB driver through a graph object.

"""
from langchain_core.messages import AIMessage, HumanMessage

from langchain_community.chat_message_histories.falkordb import FalkorDBChatMessageHistory
from langchain_community.graphs import FalkorDBGraph

def test_add_messages() -> None:
"""Basic testing: add messages to the FalkorDBChatMessageHistory."""
message_store = FalkorDBChatMessageHistory("500daysofSadiya")
message_store.clear()
assert len(message_store.messages) == 0
message_store.add_user_message("Hello! Language Chain!")
message_store.add_ai_message("Hi Guys!")

# create another message store to check if the messages are stored correctly
message_store_another = FalkorDBChatMessageHistory("Shebrokemyheart")
message_store_another.clear()
assert len(message_store_another.messages) == 0
message_store_another.add_user_message("Hello! Bot!")
message_store_another.add_ai_message("Hi there!")
message_store_another.add_user_message("How's this pr going?")

# Now check if the messages are stored in the database correctly
assert len(message_store.messages) == 2
assert isinstance(message_store.messages[0], HumanMessage)
assert isinstance(message_store.messages[1], AIMessage)
assert message_store.messages[0].content == "Hello! Language Chain!"
assert message_store.messages[1].content == "Hi Guys!"

assert len(message_store_another.messages) == 3
assert isinstance(message_store_another.messages[0], HumanMessage)
assert isinstance(message_store_another.messages[1], AIMessage)
assert isinstance(message_store_another.messages[2], HumanMessage)
assert message_store_another.messages[0].content == "Hello! Bot!"
assert message_store_another.messages[1].content == "Hi there!"
assert message_store_another.messages[2].content == "How's this pr going?"

# Now clear the first history
message_store.clear()
assert len(message_store.messages) == 0
assert len(message_store_another.messages) == 3
message_store_another.clear()
assert len(message_store.messages) == 0
assert len(message_store_another.messages) == 0


def test_add_messages_graph_object() -> None:
"""Basic testing: Passing driver through graph object."""
graph = FalkorDBGraph("NeverGonnaLetYouDownNevergonnagiveyouup")
message_store = FalkorDBChatMessageHistory(
"Gonnahavetoteachmehowtoloveyouagain", graph=graph
)
message_store.clear()
assert len(message_store.messages) == 0
message_store.add_user_message("Hello! Language Chain!")
message_store.add_ai_message("Hi Guys!")
# Now check if the messages are stored in the database correctly
assert len(message_store.messages) == 2