-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async mode for pgvector #32
Conversation
I am in holiday now. I will respond next week
Le lun. 29 avr. 2024, 20:24, Eugene Yurtsev ***@***.***> a
écrit :
… ***@***.**** commented on this pull request.
------------------------------
In langchain_postgres/vectorstores.py
<#32 (comment)>
:
> """
def __init__(
self,
embeddings: Embeddings,
*,
- connection: Optional[Connection] = None,
+ connection: Union[None, DBConnection, Engine, AsyncEngine, str] = None,
Could we separate out the async and sync connections?
connection: Union[None, DBConnection, Engine, str] = None,connection_async: Union[ AsyncEngine, str] = None
And remove async_mode as a parameter
------------------------------
In langchain_postgres/vectorstores.py
<#32 (comment)>
:
> collection = self.get_collection(session)
if not collection:
self.logger.warning("Collection not found")
return
session.delete(collection)
session.commit()
+ async def adelete_collection(self) -> None:
+ assert self._async_engine, "This method must be called with async_mode"
+ await self.__apost_init__() # Lazy async init
I would love to remove the post init behavior from all methods, and
eventually from *init*. Instead we can just raise an exception here.
The reasons are:
1. Because of this pattern, one cannot manage the schema without
initializing the vectostor
2. This causes bad patterns in unit testing code that's making
resetting the state of the db strange
3. User code becomes ignorant of migrations, which makes it difficult
to actually create migration logic on schema changes
What do you think?
—
Reply to this email directly, view it on GitHub
<#32 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABR7FWA3ZTIOMRIJGMPYV3Y72F4ZAVCNFSM6AAAAABGSRLIV6VHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDAMRZGIZTCMJRHE>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
…#20735) The `_amake_session()` method does not allow modifying the `self.session_factory` with anything other than `async_sessionmaker`. This prohibits advanced uses of `index()`. In a RAG architecture, it is necessary to import document chunks. To keep track of the links between chunks and documents, we can use the `index()` API. This API proposes to use an SQL-type record manager. In a classic use case, using `SQLRecordManager` and a vector database, it is impossible to guarantee the consistency of the import. Indeed, if a crash occurs during the import (problem with the network, ...) there is an inconsistency between the SQL database and the vector database. With the [PR](langchain-ai/langchain-postgres#32) we are proposing for `langchain-postgres`, it is now possible to guarantee the consistency of the import of chunks into a vector database. It's possible only if the outer session is built with the connection. ```python def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, ) record_manager.create_schema() with engine.connect() as connection: session_maker = scoped_session(sessionmaker(bind=connection)) # NOTE: Update session_factories record_manager.session_factory = session_maker pgvector.session_maker = session_maker with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = index( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) ``` The same thing is possible asynchronously, but a bug in `sql_record_manager.py` in `_amake_session()` must first be fixed. ```python async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]: """Create a session and close it after use.""" # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~ if not isinstance(self.engine, AsyncEngine): raise AssertionError("This method is not supported for sync engines.") async with self.session_factory() as session: yield session ``` Then, it is possible to do the same thing asynchronously: ```python async def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_async_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, async_mode=True, ) await record_manager.acreate_schema() async with engine.connect() as connection: session_maker = async_scoped_session( async_sessionmaker(bind=connection), scopefunc=current_task) record_manager.session_factory = session_maker pgvector.session_maker = session_maker async with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = await aindex( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) asyncio.run(main()) ``` --------- Signed-off-by: Rahul Tripathi <[email protected]> Co-authored-by: Bagatur <[email protected]> Co-authored-by: Sean <[email protected]> Co-authored-by: JuHyung-Son <[email protected]> Co-authored-by: Erick Friis <[email protected]> Co-authored-by: YISH <[email protected]> Co-authored-by: Bagatur <[email protected]> Co-authored-by: Jason_Chen <[email protected]> Co-authored-by: Joan Fontanals <[email protected]> Co-authored-by: Pavlo Paliychuk <[email protected]> Co-authored-by: fzowl <[email protected]> Co-authored-by: samanhappy <[email protected]> Co-authored-by: Lei Zhang <[email protected]> Co-authored-by: Tomaz Bratanic <[email protected]> Co-authored-by: merdan <[email protected]> Co-authored-by: ccurme <[email protected]> Co-authored-by: Andres Algaba <[email protected]> Co-authored-by: davidefantiniIntel <[email protected]> Co-authored-by: Jingpan Xiong <[email protected]> Co-authored-by: kaka <[email protected]> Co-authored-by: jingsi <[email protected]> Co-authored-by: Eugene Yurtsev <[email protected]> Co-authored-by: Rahul Triptahi <[email protected]> Co-authored-by: Rahul Tripathi <[email protected]> Co-authored-by: Shengsheng Huang <[email protected]> Co-authored-by: Michael Schock <[email protected]> Co-authored-by: Anish Chakraborty <[email protected]> Co-authored-by: am-kinetica <[email protected]> Co-authored-by: Dristy Srivastava <[email protected]> Co-authored-by: Matt <[email protected]> Co-authored-by: William FH <[email protected]>
…langchain-ai#20735) The `_amake_session()` method does not allow modifying the `self.session_factory` with anything other than `async_sessionmaker`. This prohibits advanced uses of `index()`. In a RAG architecture, it is necessary to import document chunks. To keep track of the links between chunks and documents, we can use the `index()` API. This API proposes to use an SQL-type record manager. In a classic use case, using `SQLRecordManager` and a vector database, it is impossible to guarantee the consistency of the import. Indeed, if a crash occurs during the import (problem with the network, ...) there is an inconsistency between the SQL database and the vector database. With the [PR](langchain-ai/langchain-postgres#32) we are proposing for `langchain-postgres`, it is now possible to guarantee the consistency of the import of chunks into a vector database. It's possible only if the outer session is built with the connection. ```python def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, ) record_manager.create_schema() with engine.connect() as connection: session_maker = scoped_session(sessionmaker(bind=connection)) # NOTE: Update session_factories record_manager.session_factory = session_maker pgvector.session_maker = session_maker with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = index( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) ``` The same thing is possible asynchronously, but a bug in `sql_record_manager.py` in `_amake_session()` must first be fixed. ```python async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]: """Create a session and close it after use.""" # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~ if not isinstance(self.engine, AsyncEngine): raise AssertionError("This method is not supported for sync engines.") async with self.session_factory() as session: yield session ``` Then, it is possible to do the same thing asynchronously: ```python async def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_async_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, async_mode=True, ) await record_manager.acreate_schema() async with engine.connect() as connection: session_maker = async_scoped_session( async_sessionmaker(bind=connection), scopefunc=current_task) record_manager.session_factory = session_maker pgvector.session_maker = session_maker async with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = await aindex( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) asyncio.run(main()) ``` --------- Signed-off-by: Rahul Tripathi <[email protected]> Co-authored-by: Bagatur <[email protected]> Co-authored-by: Sean <[email protected]> Co-authored-by: JuHyung-Son <[email protected]> Co-authored-by: Erick Friis <[email protected]> Co-authored-by: YISH <[email protected]> Co-authored-by: Bagatur <[email protected]> Co-authored-by: Jason_Chen <[email protected]> Co-authored-by: Joan Fontanals <[email protected]> Co-authored-by: Pavlo Paliychuk <[email protected]> Co-authored-by: fzowl <[email protected]> Co-authored-by: samanhappy <[email protected]> Co-authored-by: Lei Zhang <[email protected]> Co-authored-by: Tomaz Bratanic <[email protected]> Co-authored-by: merdan <[email protected]> Co-authored-by: ccurme <[email protected]> Co-authored-by: Andres Algaba <[email protected]> Co-authored-by: davidefantiniIntel <[email protected]> Co-authored-by: Jingpan Xiong <[email protected]> Co-authored-by: kaka <[email protected]> Co-authored-by: jingsi <[email protected]> Co-authored-by: Eugene Yurtsev <[email protected]> Co-authored-by: Rahul Triptahi <[email protected]> Co-authored-by: Rahul Tripathi <[email protected]> Co-authored-by: Shengsheng Huang <[email protected]> Co-authored-by: Michael Schock <[email protected]> Co-authored-by: Anish Chakraborty <[email protected]> Co-authored-by: am-kinetica <[email protected]> Co-authored-by: Dristy Srivastava <[email protected]> Co-authored-by: Matt <[email protected]> Co-authored-by: William FH <[email protected]>
The promise of the constructor, with the
Can you launch the workflow? |
@eyurtsev, can you approval the workflow? I can check if all the code passes the CI. |
Hello @eyurtsev I've aligned the code with your similar requests for Currently, Can you review this code which allows me to set the resilience with langchain definitively? |
I am waitting this PR!! aysnc langchain_postgres |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All is aligned with SQL Docstore
@eyurtsev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All it's fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…22065) # package community: Fix SQLChatMessageHistory ## Description Here is a rewrite of `SQLChatMessageHistory` to properly implement the asynchronous approach. The code circumvents [issue 22021](#22021) by accepting a synchronous call to `def add_messages()` in an asynchronous scenario. This bypasses the bug. For the same reasons as in [PR 22](langchain-ai/langchain-postgres#32) of `langchain-postgres`, we use a lazy strategy for table creation. Indeed, the promise of the constructor cannot be fulfilled without this. It is not possible to invoke a synchronous call in a constructor. We compensate for this by waiting for the next asynchronous method call to create the table. The goal of the `PostgresChatMessageHistory` class (in `langchain-postgres`) is, among other things, to be able to recycle database connections. The implementation of the class is problematic, as we have demonstrated in [issue 22021](#22021). Our new implementation of `SQLChatMessageHistory` achieves this by using a singleton of type (`Async`)`Engine` for the database connection. The connection pool is managed by this singleton, and the code is then reentrant. We also accept the type `str` (optionally complemented by `async_mode`. I know you don't like this much, but it's the only way to allow an asynchronous connection string). In order to unify the different classes handling database connections, we have renamed `connection_string` to `connection`, and `Session` to `session_maker`. Now, a single transaction is used to add a list of messages. Thus, a crash during this write operation will not leave the database in an unstable state with a partially added message list. This makes the code resilient. We believe that the `PostgresChatMessageHistory` class is no longer necessary and can be replaced by: ``` PostgresChatMessageHistory = SQLChatMessageHistory ``` This also fixes the bug. ## Issue - [issue 22021](#22021) - Bug in _exit_history() - Bugs in PostgresChatMessageHistory and sync usage - Bugs in PostgresChatMessageHistory and async usage - [issue 36](langchain-ai/langchain-postgres#36) ## Twitter handle: pprados ## Tests - libs/community/tests/unit_tests/chat_message_histories/test_sql.py (add async test) @baskaryan, @eyurtsev or @hwchase17 can you check this PR ? And, I've been waiting a long time for validation from other PRs. Can you take a look? - [PR 32](langchain-ai/langchain-postgres#32) - [PR 15575](#15575) - [PR 13200](#13200) --------- Co-authored-by: Eugene Yurtsev <[email protected]>
Hello @eyurtsev - package: langchain-comminity - **Description**: Add SQL implementation for docstore. A new implementation, in line with my other PR ([async PGVector](langchain-ai/langchain-postgres#32), [SQLChatMessageMemory](#22065)) - Twitter handler: pprados --------- Signed-off-by: ChengZi <[email protected]> Co-authored-by: Bagatur <[email protected]> Co-authored-by: Piotr Mardziel <[email protected]> Co-authored-by: ChengZi <[email protected]> Co-authored-by: Eugene Yurtsev <[email protected]>
@eyurtsev |
…elf query retriever (#22678) The fact that we outsourced pgvector to another project has an unintended effect. The mapping dictionary found by `_get_builtin_translator()` cannot recognize the new version of pgvector because it comes from another package. `SelfQueryRetriever` no longer knows `PGVector`. I propose to fix this by creating a global dictionary that can be populated by various database implementations. Thus, importing `langchain_postgres` will allow the registration of the `PGvector` mapping. But for the moment I'm just adding a lazy import Furthermore, the implementation of _get_builtin_translator() reconstructs the BUILTIN_TRANSLATORS variable with each invocation, which is not very efficient. A global map would be an optimization. - **Twitter handle:** pprados @eyurtsev, can you review this PR? And unlock the PR [Add async mode for pgvector](langchain-ai/langchain-postgres#32) and PR [community[minor]: Add SQL storage implementation](#22207)? Are you in favour of a global dictionary-based implementation of Translator?
I made the mistake of doing a rebase while a review was in progress. This seems to block the process. There's a ‘1 change requested’ request that I can't validate. I propose another PR identical to the [previous one](#32). This PR adds the **async** approach for pgvector. Some remarks: - We use assert to check invocations and not if. Thus, in production, it is possible to remove these checks with `python -O ...` - We propose a public `session_maker` attribute. This is very important for resilient scenarios. In a RAG architecture, it is necessary to import document chunks. To keep track of the links between chunks and documents, we can use the [index()](https://python.langchain.com/docs/modules/data_connection/indexing/) API. This API proposes to use an SQL-type record manager. In a classic use case, using `SQLRecordManager` and a vector database, it is impossible to guarantee the consistency of the import. Indeed, if a crash occurs during the import, there is an inconsistency between the SQL database and the vector database. **PGVector is the solution to this problem.** Indeed, it is possible to use a single database (and not a two-phase commit with 2 technologies, if they are both compatible). But, for this, it is necessary to be able to combine the transactions between the use of `SQLRecordManager` and `PGVector` as a vector database. This is only possible if it is possible to intervene on the `session_maker`. This is why we propose to make this attribute public. By unifying the `session_maker` of `SQLRecordManager` and `PGVector`, it is possible to guarantee that all processes will be executed in a single transaction. This is, moreover, the only solution we know of to guarantee the consistency of the import of chunks into a vector database. It's possible only if the outer session is built with the connection. ```python def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, ) record_manager.create_schema() with engine.connect() as connection: session_maker = scoped_session(sessionmaker(bind=connection)) # NOTE: Update session_factories record_manager.session_factory = session_maker pgvector.session_maker = session_maker with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = index( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) ``` The same thing is possible asynchronously, but a bug in `sql_record_manager.py` in `_amake_session()` must first be fixed (See [PR](langchain-ai/langchain#20735) ). ```python async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]: """Create a session and close it after use.""" # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~ if not isinstance(self.engine, AsyncEngine): raise AssertionError("This method is not supported for sync engines.") async with self.session_factory() as session: yield session ``` Then, it is possible to do the same thing asynchronously: ```python async def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_async_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, async_mode=True, ) await record_manager.acreate_schema() async with engine.connect() as connection: session_maker = async_scoped_session( async_sessionmaker(bind=connection), scopefunc=current_task) record_manager.session_factory = session_maker pgvector.session_maker = session_maker async with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = await aindex( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) asyncio.run(main()) ``` The promise of the constructor, with the `create_extension` parameter, is to guarantee that the extension is added before the APIs are used. Since this promise cannot be kept in an `async` scenario, there is an alternative: - Remove this parameter, since the promise cannot be kept. Otherwise, an `async` method is needed to install the extension before the APIs are used, and to check that this method has been invoked at the start of each API. - Use a lazy approach as suggested, which simply respects the constructor's promise.
…elf query retriever (langchain-ai#22678) The fact that we outsourced pgvector to another project has an unintended effect. The mapping dictionary found by `_get_builtin_translator()` cannot recognize the new version of pgvector because it comes from another package. `SelfQueryRetriever` no longer knows `PGVector`. I propose to fix this by creating a global dictionary that can be populated by various database implementations. Thus, importing `langchain_postgres` will allow the registration of the `PGvector` mapping. But for the moment I'm just adding a lazy import Furthermore, the implementation of _get_builtin_translator() reconstructs the BUILTIN_TRANSLATORS variable with each invocation, which is not very efficient. A global map would be an optimization. - **Twitter handle:** pprados @eyurtsev, can you review this PR? And unlock the PR [Add async mode for pgvector](langchain-ai/langchain-postgres#32) and PR [community[minor]: Add SQL storage implementation](langchain-ai#22207)? Are you in favour of a global dictionary-based implementation of Translator?
…22065) # package community: Fix SQLChatMessageHistory ## Description Here is a rewrite of `SQLChatMessageHistory` to properly implement the asynchronous approach. The code circumvents [issue 22021](#22021) by accepting a synchronous call to `def add_messages()` in an asynchronous scenario. This bypasses the bug. For the same reasons as in [PR 22](langchain-ai/langchain-postgres#32) of `langchain-postgres`, we use a lazy strategy for table creation. Indeed, the promise of the constructor cannot be fulfilled without this. It is not possible to invoke a synchronous call in a constructor. We compensate for this by waiting for the next asynchronous method call to create the table. The goal of the `PostgresChatMessageHistory` class (in `langchain-postgres`) is, among other things, to be able to recycle database connections. The implementation of the class is problematic, as we have demonstrated in [issue 22021](#22021). Our new implementation of `SQLChatMessageHistory` achieves this by using a singleton of type (`Async`)`Engine` for the database connection. The connection pool is managed by this singleton, and the code is then reentrant. We also accept the type `str` (optionally complemented by `async_mode`. I know you don't like this much, but it's the only way to allow an asynchronous connection string). In order to unify the different classes handling database connections, we have renamed `connection_string` to `connection`, and `Session` to `session_maker`. Now, a single transaction is used to add a list of messages. Thus, a crash during this write operation will not leave the database in an unstable state with a partially added message list. This makes the code resilient. We believe that the `PostgresChatMessageHistory` class is no longer necessary and can be replaced by: ``` PostgresChatMessageHistory = SQLChatMessageHistory ``` This also fixes the bug. ## Issue - [issue 22021](#22021) - Bug in _exit_history() - Bugs in PostgresChatMessageHistory and sync usage - Bugs in PostgresChatMessageHistory and async usage - [issue 36](langchain-ai/langchain-postgres#36) ## Twitter handle: pprados ## Tests - libs/community/tests/unit_tests/chat_message_histories/test_sql.py (add async test) @baskaryan, @eyurtsev or @hwchase17 can you check this PR ? And, I've been waiting a long time for validation from other PRs. Can you take a look? - [PR 32](langchain-ai/langchain-postgres#32) - [PR 15575](#15575) - [PR 13200](#13200) --------- Co-authored-by: Eugene Yurtsev <[email protected]>
Hello @eyurtsev - package: langchain-comminity - **Description**: Add SQL implementation for docstore. A new implementation, in line with my other PR ([async PGVector](langchain-ai/langchain-postgres#32), [SQLChatMessageMemory](#22065)) - Twitter handler: pprados --------- Signed-off-by: ChengZi <[email protected]> Co-authored-by: Bagatur <[email protected]> Co-authored-by: Piotr Mardziel <[email protected]> Co-authored-by: ChengZi <[email protected]> Co-authored-by: Eugene Yurtsev <[email protected]>
…elf query retriever (#22678) The fact that we outsourced pgvector to another project has an unintended effect. The mapping dictionary found by `_get_builtin_translator()` cannot recognize the new version of pgvector because it comes from another package. `SelfQueryRetriever` no longer knows `PGVector`. I propose to fix this by creating a global dictionary that can be populated by various database implementations. Thus, importing `langchain_postgres` will allow the registration of the `PGvector` mapping. But for the moment I'm just adding a lazy import Furthermore, the implementation of _get_builtin_translator() reconstructs the BUILTIN_TRANSLATORS variable with each invocation, which is not very efficient. A global map would be an optimization. - **Twitter handle:** pprados @eyurtsev, can you review this PR? And unlock the PR [Add async mode for pgvector](langchain-ai/langchain-postgres#32) and PR [community[minor]: Add SQL storage implementation](#22207)? Are you in favour of a global dictionary-based implementation of Translator?
This PR adds the async approach for pgvector.
Some remarks:
remove these checks with
python -O ...
session_maker
attribute. This is very important for resilientscenarios.
In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the
index() API.
This API proposes to use an SQL-type record manager.
In a classic use case, using
SQLRecordManager
and a vector database, it is impossibleto guarantee the consistency of the import.
Indeed, if a crash occurs during the import, there is an inconsistency between the SQL
database and the vector database.
PGVector is the solution to this problem.
Indeed, it is possible to use a single database (and not a two-phase commit with 2
technologies, if they are both compatible). But, for this, it is necessary to be able
to combine the transactions between the use of
SQLRecordManager
andPGVector
as avector database.
This is only possible if it is possible to intervene on the
session_maker
.This is why we propose to make this attribute public. By unifying the
session_maker
of
SQLRecordManager
andPGVector
, it is possible to guarantee that all processes willbe executed in a single transaction.
This is, moreover, the only solution we know of to guarantee the consistency of the
import of chunks into a vector database. It's possible only if the outer session is built
with the connection.
The same thing is possible asynchronously, but a bug in
sql_record_manager.py
in
_amake_session()
must first be fixed (See PR ).Then, it is possible to do the same thing asynchronously: