-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async db support #497
Async db support #497
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Reviewed everything up to 304eb21 in 37 seconds
More details
- Looked at
662
lines of code in6
files - Skipped
0
files when reviewing. - Skipped posting
2
drafted comments based on config settings.
1. burr/integrations/persisters/b_redis.py:315
- Draft comment:
Thecreate_key
method does not perform any asynchronous operations and can be converted to a regular method. - Reason this comment was not posted:
Confidence changes required:50%
Thecreate_key
method inAsyncRedisBasePersister
is unnecessarily defined as an async function. It does not perform any asynchronous operations and can be converted to a regular method.
2. burr/integrations/persisters/postgresql.py:362
- Draft comment:
Thecreate_key
method does not perform any asynchronous operations and can be converted to a regular method. - Reason this comment was not posted:
Confidence changes required:50%
Thecreate_key
method inAsyncPostgreSQLPersister
is unnecessarily defined as an async function. It does not perform any asynchronous operations and can be converted to a regular method.
Workflow ID: wflow_m5wjdK3DlH4vtpsj
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet
mode, and more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments, will take a look again
class AsyncRedisBasePersister(persistence.BaseStatePersister): | ||
"""Main class for Async Redis persister. | ||
|
||
Use this class if you want to directly control injecting the async Redis client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean?
"status": data[b"status"].decode(), | ||
} | ||
|
||
async def create_key(self, app_id, partition_key, sequence_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if this should subclass the synchronous redis persister to share code -- there's a lot of duplication...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively helper functions to help with common operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through both classes and there is not much shared stuff.. The annoying thing is that most code is of the pattern: talk to db -- extract some useful data (e.g. sequence_id) -- talk to db again. So all the logic that appears duplicate is sandwich between db calls that are sync/async depending on the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, yeah, I think that's OK given sync versus async is generally tricky
@@ -1,6 +1,7 @@ | |||
from burr.integrations import base | |||
|
|||
try: | |||
import asyncpg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto with the above -- I think this should probably be a separate one so we don't force people who just need sync to also have async libraries
if self._initialized: | ||
return True | ||
|
||
async with self.connection.transaction(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be a transaction? It's a single query, and it's purely a read (so it doesn't/can't be rolled back)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, also feels like we should be able to set self._initialized to be true here if _initialized is true...
partition_key = self.PARTITION_KEY_DEFAULT | ||
logger.debug("Loading %s, %s, %s", partition_key, app_id, sequence_id) | ||
|
||
async with self.connection.transaction(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto with transaction -- not needed I think
status, | ||
) | ||
|
||
async with self.connection.transaction(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should not need to be a transaction
9bce9c2
to
63c306c
Compare
I have increased the scope to include all existing persister implementations to make some consistency additions. There are 3 points to highlight:
I opted to name the modules after the underlying dependency library. I am not a fan of that naming, but this gives us more flexibility down the line in case we implement the same persister with multiple different libraries since we can then keep the class name -- For example:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good -- it's consistent, which is nice. Will think through the naming but naming it based on the libraries/plugins makes sense to me! @skrawcz thoughts?
@jernejfrank --maybe add a note in the references on naming scheme/rules. Right next to the table.
def __getstate__(self) -> dict: | ||
state = self.__dict__.copy() | ||
if not hasattr(self.connection, "connection_pool"): | ||
logger.warning("Redis connection is not serializable.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I don't think we want a warning here -- this specifically is what we need to recreate in setstate. E.G. the reason we have this is so we can serialize a redis connection and recreate it when needed.
burr/core/persistence.py
Outdated
@classmethod | ||
def from_config(cls, config: dict) -> "SQLitePersister": | ||
"""Creates a new instance of the SQLitePersister from a configuration dictionary.""" | ||
return cls.from_values( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use of this? Also should be able to just use **
-- the defaults work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some DB persister had from_config
and from_values
, others had from_values
, and some (like SQLite
) did not have either. I implemented both from_config
and from_values
so that all persisters have the same ability to initialize. This maybe useful to put on the abstract class level to enforce it when implementing new persisters?
burr/core/persistence.py
Outdated
db_path, **connect_kwargs if connect_kwargs is not None else {} | ||
) | ||
|
||
# Here for backwards compatibility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explain a little more
burr/core/persistence.py
Outdated
def __del__(self): | ||
# This should be deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explain why
@@ -45,114 +25,22 @@ def from_values( | |||
mongo_client_kwargs: dict = None, | |||
) -> "MongoDBBasePersister": | |||
"""Initializes the MongoDBBasePersister class.""" | |||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to put this logger at the import level I think. Just at the top of the file?
63c306c
to
8c834af
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@skrawcz any thoughts on naming for integraitons?
burr/core/persistence.py
Outdated
@@ -304,6 +338,7 @@ def __init__( | |||
table_name: str = "burr_state", | |||
serde_kwargs: dict = None, | |||
connect_kwargs: dict = None, | |||
connection=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type-hint?
Async implementation to persist the state to a postgresql db. In addition adds methods to postgresql sync persister to make it consistent. Migrates postgresql to psycopg2 module for consistent renaming.
Implements the async function from redis-py.asyncio for state persistance. Add methods to sync redis persister for consistency.
The methods added make it consistent with the implementations of other db persisters.
Migrates the MongoDBPersister to b_pymongo.py for consistent naming convention.
b2660e1
to
024625f
Compare
61821b7
to
c666cba
Compare
Some libraries are only supported for python <= 3.12 this sets the python version to 3.12 for the docs and validate_examples hooks so that Ray gets installed.
c666cba
to
32d3eb4
Compare
Adding async support for Postgres and Redis state persisters. This expands on #488 and addresses #484 when using Postgres or Redis.
Changes
asyncpg
.redis.asyncio
How I tested this
Notes
asyncpg
uses a coroutine to open/close the connection. In this case pickling / unpickling the state needs a workaround (not yet implemented) since__getstate__
and__setstate__
do not have async support.Checklist
Important
Add asynchronous support for Redis and PostgreSQL state persisters using
redis.asyncio
andasyncpg
.AsyncRedisBasePersister
usingredis.asyncio
for async Redis operations inb_redis.py
.AsyncPostgreSQLPersister
usingasyncpg
for async PostgreSQL operations inpostgresql.py
.test_b_redis.py
forAsyncRedisBasePersister
.test_postgresql.py
forAsyncPostgreSQLPersister
.persister.rst
to includeAsyncRedisBasePersister
andAsyncPostgreSQLPersister
.asyncpg
topyproject.toml
underpostgresql
dependencies.This description was created by
for 304eb21. It will automatically update as commits are pushed.