Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async mode for pgvector (v2) #64

Merged
merged 20 commits into from
Jun 10, 2024
Merged

Conversation

pprados
Copy link
Contributor

@pprados pprados commented Jun 10, 2024

I made the mistake of doing a rebase while a review was in progress. This seems to block the process. There's a ‘1 change requested’ request that I can't validate. I propose another PR identical to the previous one.

This PR adds the async approach for pgvector.

Some remarks:

  • We use assert to check invocations and not if. Thus, in production, it is possible to
    remove these checks with python -O ...
  • We propose a public session_maker attribute. This is very important for resilient
    scenarios.

In a RAG architecture, it is necessary to import document chunks.

To keep track of the links between chunks and documents, we can use the
index() API.
This API proposes to use an SQL-type record manager.

In a classic use case, using SQLRecordManager and a vector database, it is impossible
to guarantee the consistency of the import.

Indeed, if a crash occurs during the import, there is an inconsistency between the SQL
database and the vector database.

PGVector is the solution to this problem.

Indeed, it is possible to use a single database (and not a two-phase commit with 2
technologies, if they are both compatible). But, for this, it is necessary to be able
to combine the transactions between the use of SQLRecordManager and PGVector as a
vector database.

This is only possible if it is possible to intervene on the session_maker.

This is why we propose to make this attribute public. By unifying the session_maker
of SQLRecordManager and PGVector, it is possible to guarantee that all processes will
be executed in a single transaction.

This is, moreover, the only solution we know of to guarantee the consistency of the
import of chunks into a vector database. It's possible only if the outer session is built
with the connection.

def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)

The same thing is possible asynchronously, but a bug in sql_record_manager.py
in _amake_session() must first be fixed (See PR ).

    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session

Then, it is possible to do the same thing asynchronously:

async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())

The promise of the constructor, with the create_extension parameter, is to guarantee that the extension is added before the APIs are used. Since this promise cannot be kept in an async scenario, there is an alternative:

  • Remove this parameter, since the promise cannot be kept. Otherwise, an async method is needed to install the extension before the APIs are used, and to check that this method has been invoked at the start of each API.
  • Use a lazy approach as suggested, which simply respects the constructor's promise.

@pprados pprados changed the title Pprados/async2 Add async mode for pgvector (v2) Jun 10, 2024
@eyurtsev eyurtsev merged commit a7d345b into langchain-ai:main Jun 10, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants