Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Asynchronous Connection Pooling Support to PostgresChatMessageHistory #130

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

shamspias
Copy link

This PR adds support for asynchronous connection pooling in the PostgresChatMessageHistory class, addressing issues #122 and #129

Changes Made:

  • Modified PostgresChatMessageHistory:
    • Added a conn_pool parameter to accept an AsyncConnectionPool instance.
    • Adjusted the __init__ method to include conn_pool and reordered parameters for improved usability.
    • Ensured session_id and table_name can be passed as keyword arguments.
  • Updated Asynchronous Methods:
    • Modified aget_messages and aadd_messages to utilize the connection pool when provided.
    • Maintained existing functionality for async_connection to ensure backward compatibility.
  • Added Unit Tests:
    • Introduced test_async_chat_history_with_pool in tests/unit_tests/test_chat_histories.py to verify the new functionality.
  • Updated Documentation:
    • Revised the README to include examples of using PostgresChatMessageHistory with an asynchronous connection pool.
    • Adjusted usage examples to reflect the updated parameter order and new conn_pool parameter.

Example Usage:

import uuid
import asyncio

from langchain_core.messages import SystemMessage, AIMessage, HumanMessage
from langchain_postgres import PostgresChatMessageHistory
from psycopg_pool import AsyncConnectionPool

async def main():
    # Database connection string
    conn_info = "postgresql://user:password@host:port/dbname"  # Replace with your connection info

    # Initialize the connection pool
    pool = AsyncConnectionPool(conninfo=conn_info)

    try:
        # Create the table schema (only needs to be done once)
        async with pool.connection() as async_connection:
            table_name = "chat_history"
            await PostgresChatMessageHistory.adrop_table(async_connection, table_name)
            await PostgresChatMessageHistory.acreate_tables(async_connection, table_name)

        session_id = str(uuid.uuid4())

        # Initialize the chat history manager with the connection pool
        chat_history = PostgresChatMessageHistory(
            session_id=session_id,
            table_name=table_name,
            conn_pool=pool
        )

        # Add messages to the chat history asynchronously
        await chat_history.aadd_messages([
            SystemMessage(content="System message"),
            AIMessage(content="AI response"),
            HumanMessage(content="Human message"),
        ])

        # Retrieve messages from the chat history
        messages = await chat_history.aget_messages()
        print(messages)
    finally:
        # Close the connection pool
        await pool.close()

# Run the async main function
asyncio.run(main())

Testing:

  • Added a new test test_async_chat_history_with_pool in tests/unit_tests/test_chat_histories.py:

    async def test_async_chat_history_with_pool() -> None:
        """Test the async chat history using a connection pool."""
        from psycopg_pool import AsyncConnectionPool
        from tests.utils import DSN
    
        # Initialize the connection pool
        pool = AsyncConnectionPool(conninfo=DSN)
        try:
            table_name = "chat_history"
            session_id = str(uuid.uuid4())
    
            # Create tables using a connection from the pool
            async with pool.connection() as async_connection:
                await PostgresChatMessageHistory.adrop_table(async_connection, table_name)
                await PostgresChatMessageHistory.acreate_tables(async_connection, table_name)
    
            # Create PostgresChatMessageHistory with conn_pool
            chat_history = PostgresChatMessageHistory(
                session_id=session_id,
                table_name=table_name,
                conn_pool=pool,
            )
    
            # Ensure the chat history is empty
            messages = await chat_history.aget_messages()
            assert messages == []
    
            # Add messages to the chat history
            await chat_history.aadd_messages(
                [
                    SystemMessage(content="System message"),
                    AIMessage(content="AI response"),
                    HumanMessage(content="Human message"),
                ]
            )
    
            # Retrieve messages from the chat history
            messages = await chat_history.aget_messages()
            assert len(messages) == 3
            assert messages == [
                SystemMessage(content="System message"),
                AIMessage(content="AI response"),
                HumanMessage(content="Human message"),
            ]
    
            # Clear the chat history
            await chat_history.aclear()
            messages = await chat_history.aget_messages()
            assert messages == []
        finally:
            # Close the connection pool
            await pool.close()
  • Ensured all existing tests pass, maintaining backward compatibility.

Documentation:

  • README Updates:
    • Adjusted parameter usage in examples to match the updated __init__ method.
    • Added a new section demonstrating asynchronous usage with connection pooling.

Notes:

  • Backward Compatibility:
    • Existing code using sync_connection or async_connection continues to work without modifications.
  • Benefits:
    • Improves efficiency by reusing database connections through a connection pool.
    • Enhances resource management in asynchronous applications.

Related Issues:

@JoshOlam
Copy link

@efriis - Could you help review and merge this PR? I am building something that needs this fix ASAP, please.

Thanks.

@syahrulhamdani
Copy link

Hi - any updates so far for this fix?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants