Skip to content

Commit

Permalink
Add async mode for pgvector (v2) (#64)
Browse files Browse the repository at this point in the history
I made the mistake of doing a rebase while a review was in progress.
This seems to block the process. There's a ‘1 change requested’ request
that I can't validate. I propose another PR identical to the [previous
one](#32).

This PR adds the **async** approach for pgvector.

Some remarks:
- We use assert to check invocations and not if. Thus, in production, it
is possible to
remove these checks with `python -O ...`
- We propose a public `session_maker` attribute. This is very important
for resilient
scenarios.

In a RAG architecture, it is necessary to import document chunks.

To keep track of the links between chunks and documents, we can use the 

[index()](https://python.langchain.com/docs/modules/data_connection/indexing/)
API.
This API proposes to use an SQL-type record manager.

In a classic use case, using `SQLRecordManager` and a vector database,
it is impossible
to guarantee the consistency of the import.

Indeed, if a crash occurs during the import, there is an inconsistency
between the SQL
database and the vector database.

**PGVector is the solution to this problem.**

Indeed, it is possible to use a single database (and not a two-phase
commit with 2
technologies, if they are both compatible). But, for this, it is
necessary to be able
to combine the transactions between the use of `SQLRecordManager` and
`PGVector` as a
vector database.

This is only possible if it is possible to intervene on the
`session_maker`.

This is why we propose to make this attribute public. By unifying the
`session_maker`
of `SQLRecordManager` and `PGVector`, it is possible to guarantee that
all processes will
be executed in a single transaction.

This is, moreover, the only solution we know of to guarantee the
consistency of the
import of chunks into a vector database. It's possible only if the outer
session is built
with the connection.

```python
def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)
```
The same thing is possible asynchronously, but a bug in
`sql_record_manager.py`
in `_amake_session()` must first be fixed (See
[PR](langchain-ai/langchain#20735) ).

```python
    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session
``` 
Then, it is possible to do the same thing asynchronously:

```python
async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())
```

The promise of the constructor, with the `create_extension` parameter,
is to guarantee that the extension is added before the APIs are used.
Since this promise cannot be kept in an `async` scenario, there is an
alternative:

- Remove this parameter, since the promise cannot be kept. Otherwise, an
`async` method is needed to install the extension before the APIs are
used, and to check that this method has been invoked at the start of
each API.
- Use a lazy approach as suggested, which simply respects the
constructor's promise.
  • Loading branch information
pprados authored Jun 10, 2024
1 parent 4acf5bd commit a7d345b
Show file tree
Hide file tree
Showing 3 changed files with 1,328 additions and 86 deletions.
Loading

0 comments on commit a7d345b

Please sign in to comment.