Skip to content

Commit

Permalink
community[patch]: Make sql record manager fully compatible with async (
Browse files Browse the repository at this point in the history
…#20735)

The `_amake_session()` method does not allow modifying the
`self.session_factory` with
anything other than `async_sessionmaker`. This prohibits advanced uses
of `index()`.

In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the
`index()` API.
This API proposes to use an SQL-type record manager.

In a classic use case, using `SQLRecordManager` and a vector database,
it is impossible
to guarantee the consistency of the import. Indeed, if a crash occurs
during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector
database.

With the
[PR](langchain-ai/langchain-postgres#32) we are
proposing for `langchain-postgres`,
it is now possible to guarantee the consistency of the import of chunks
into
a vector database.  It's possible only if the outer session is built
with the connection.

```python
def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)
```
The same thing is possible asynchronously, but a bug in
`sql_record_manager.py`
in `_amake_session()` must first be fixed.

```python
    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session
``` 

Then, it is possible to do the same thing asynchronously:

```python
async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())
```

---------

Signed-off-by: Rahul Tripathi <[email protected]>
Co-authored-by: Bagatur <[email protected]>
Co-authored-by: Sean <[email protected]>
Co-authored-by: JuHyung-Son <[email protected]>
Co-authored-by: Erick Friis <[email protected]>
Co-authored-by: YISH <[email protected]>
Co-authored-by: Bagatur <[email protected]>
Co-authored-by: Jason_Chen <[email protected]>
Co-authored-by: Joan Fontanals <[email protected]>
Co-authored-by: Pavlo Paliychuk <[email protected]>
Co-authored-by: fzowl <[email protected]>
Co-authored-by: samanhappy <[email protected]>
Co-authored-by: Lei Zhang <[email protected]>
Co-authored-by: Tomaz Bratanic <[email protected]>
Co-authored-by: merdan <[email protected]>
Co-authored-by: ccurme <[email protected]>
Co-authored-by: Andres Algaba <[email protected]>
Co-authored-by: davidefantiniIntel <[email protected]>
Co-authored-by: Jingpan Xiong <[email protected]>
Co-authored-by: kaka <[email protected]>
Co-authored-by: jingsi <[email protected]>
Co-authored-by: Eugene Yurtsev <[email protected]>
Co-authored-by: Rahul Triptahi <[email protected]>
Co-authored-by: Rahul Tripathi <[email protected]>
Co-authored-by: Shengsheng Huang <[email protected]>
Co-authored-by: Michael Schock <[email protected]>
Co-authored-by: Anish Chakraborty <[email protected]>
Co-authored-by: am-kinetica <[email protected]>
Co-authored-by: Dristy Srivastava <[email protected]>
Co-authored-by: Matt <[email protected]>
Co-authored-by: William FH <[email protected]>
  • Loading branch information
1 parent 17e42bb commit 7be6822
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions libs/community/langchain_community/indexes/_sql_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@
import contextlib
import decimal
import uuid
from typing import Any, AsyncGenerator, Dict, Generator, List, Optional, Sequence, Union
from typing import (
Any,
AsyncGenerator,
Dict,
Generator,
List,
Optional,
Sequence,
Union,
cast,
)

from sqlalchemy import (
URL,
Expand Down Expand Up @@ -175,10 +185,10 @@ def _make_session(self) -> Generator[Session, None, None]:
async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
"""Create a session and close it after use."""

if not isinstance(self.session_factory, async_sessionmaker):
if not isinstance(self.engine, AsyncEngine):
raise AssertionError("This method is not supported for sync engines.")

async with self.session_factory() as session:
async with cast(AsyncSession, self.session_factory()) as session:
yield session

def get_time(self) -> float:
Expand Down

0 comments on commit 7be6822

Please sign in to comment.