Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async persistance #488

Merged
merged 3 commits into from
Jan 4, 2025
Merged

Conversation

jernejfrank
Copy link
Contributor

@jernejfrank jernejfrank commented Dec 27, 2024

Addressing #484 to have an async persistence interface.

Changes

  • Adds base classes for saving/loading/running persistor in application via async
  • Adds async initializer/persister to ApplicationBuilder and lets us build async Application via .abuild()
  • Adds async validation to Application to warn about async hook being ignored in sync runs and raises error if you try to do sync run with an async Application.
  • Adds create_async_app in parallelism to create application via AsyncApplicationBuilder (in case of legacy use of sync initializer/persister reverts to old sync Application)
  • TBD: database specific implementations (only AsyncDevNullPersister for testing)

To think about: (1) fire-and-forget or (2) blocking/transactional options --
Async persistors are naturally (1) and sync persistors are (2). If we want to have both options for both cases, we need to:

  • effectively block the async save to turn (1)->(2) and
  • create an event loop / another tread and make sync save a coroutine executing there to go from (2) -> (1).
    My initial idea was to have that option as an attribute of the persistor class and handle it within PersisterHook/PersisterHookAsync, but still checking if there is a more natural place for this.

How I tested this

  • Unit and E2E

Notes

This part is very much for discussion if we want to have another AsyncApplicationBuilder or put this into the existing ApplicationBuilder. My arguments for having two:

  1. It is cleaner to separate async and sync (and maybe less error-prone to use the wrong one?).
  2. There are 3 methods we need to define async: with_state_persister, _load_from_persister, and build.
  3. Related to the above, if with_state_persister is async it gets a bit hairy how to do method chaining -- what I did is to overwrite the original method to just store the state_persister (similar to what initialize_from does) and then pushed all the logic into an async helper function __with_async_state_persister that gets awaited in build to manually chain coroutines.
  4. Having another builder class made it clearer also in the Application class to raise an error when run is used instead of arun.

Having said all of that, I also can push those methods into the existing builder with an abuild() to follow the pattern in the Application class where both sync and async are side-by-side.

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • [] New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

Important

Add asynchronous persistence support with AsyncApplicationBuilder and related async classes and tests.

  • Behavior:
    • Introduces AsyncApplicationBuilder to support asynchronous state persistence.
    • Adds create_async_app in parallelism.py for async application creation.
    • Implements AsyncDevNullPersister for testing async persistence.
  • Persistence:
    • Adds AsyncBaseStateLoader and AsyncBaseStateSaver in persistence.py for async state operations.
    • Introduces PersisterHookAsync for async lifecycle hooks.
  • Testing:
    • Adds tests for async persistence in test_application.py, including test_async_save_and_load_from_persister_end_to_end.
  • Misc:
    • Updates imports in __init__.py and application.py to include async classes.

This description was created by Ellipsis for 9e233c8. It will automatically update as commits are pushed.

Copy link

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Looks good to me! Reviewed everything up to 9e233c8 in 1 minute and 20 seconds

More details
  • Looked at 803 lines of code in 5 files
  • Skipped 0 files when reviewing.
  • Skipped posting 4 drafted comments based on config settings.
1. burr/core/persistence.py:55
  • Draft comment:
    Ensure that the is_async method is overridden in subclasses of BaseStateLoader and BaseStateSaver to accurately reflect their async capabilities.
  • Reason this comment was not posted:
    Comment did not seem useful.
2. burr/core/persistence.py:129
  • Draft comment:
    Ensure that the is_async method is overridden in subclasses of BaseStateLoader and BaseStateSaver to accurately reflect their async capabilities.
  • Reason this comment was not posted:
    Marked as duplicate.
3. burr/core/persistence.py:174
  • Draft comment:
    Ensure that the is_async method is overridden in subclasses of BaseStateLoader and BaseStateSaver to accurately reflect their async capabilities.
  • Reason this comment was not posted:
    Marked as duplicate.
4. burr/core/persistence.py:86
  • Draft comment:
    Ensure that the is_async method is overridden in subclasses of BaseStateLoader and BaseStateSaver to accurately reflect their async capabilities.
  • Reason this comment was not posted:
    Marked as duplicate.

Workflow ID: wflow_QbxcMoFBccy3mX8X


You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

Copy link
Contributor

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lookking good -- some broad comments. I'm falling on the side of making it a single builder to force sharing of code , IMO it makes it a bit simpler + duplicates less logic. That said, i see both ways of doing it.

About this:

To think about: (1) fire-and-forget or (2) blocking/transactional options --
Async persistors are naturally (1) and sync persistors are (2). If we want to have both options for both cases, we need to:

effectively block the async save to turn (1)->(2) and
create an event loop / another tread and make sync save a coroutine executing there to go from (2) -> (1).
My initial idea was to have that option as an attribute of the persistor class and handle it within PersisterHook/PersisterHookAsync, but still checking if there is a more natural place for this.

To clarify -- async persisters are not naturally fire-and-forget -- they'll await completion of the task, which blocks the event loop. asyncio.ensure_future (I think that's the right one) is a way to make it fire/forget. You're right about the thread -- we'll need to tie into some sort of execution. This is also why we can probably push this up in the stack -- E.G. we don't use the result of these for the next execution, so we can put it in a queue.

We would have to worry about how to ensure order, especially in persisters (there's probably an async queue pattern that works -- should be pretty easy, but definitely more complicated). This also gets to some interesting db/distributed systems problems -- I'm not convinced there aren't other ugly pitfalls here.

So I think we should keep it transactional as it is now...

@@ -1174,6 +1184,22 @@ def iterate(
:return: Each iteration returns the result of running `step`. This generator also returns a tuple of
[action, result, current state]
"""
# This is a gentle warning for existing users
if self._adapter_set.async_hooks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is good to raise -- only in .iterate though (unless I'm missing something?). Maybe take it and refactor out to a specific checking function, then put in the non-iterate functions as well? stream_result, step, run? There's some nuance about things that get called in both, so worth looking out for that.

)

# Seems fair to raise this if everything is async but the app execution
if self._adapter_set.async_hooks and isinstance(self._builder, AsyncApplicationBuilder):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see -- this makes sense. E.G. why would you ever call ApplicationBuilder for async stuff.

I'm thinking that we want to centralize validation:

  1. If any action is non-async -- we should maybe error out
  2. The other async stuff should call this validation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should error out when the application is built with .abuild(), has async hooks, and we run it with sync methods, e.g. .run() since this is very easy to miss that the async hooks just don't get called.

Vice versa, if we have an async application with .arun() and sync hooks, I wouldn't error it out since we cannot guarantee that we every adapter has async support / user might be ok for some things to be blocking. Maybe a warning here is more appropriate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree -- this is backwards compatible. The second case is odd, actually, as you've pointed out. To make it more concrete, not every synchronous hook is incompatible in an async mode. Take, for example, the hook that prints "doing X step" before every step -- this is just going to be a sync hook, not an async hook, but it should work in an async setting. No need to build an async version of something like that, when it never has an await.

I think we shold maintain the design where .arun() allows sync + async hooks, .run() only allows sync hooks (and should break otherwise, althugh that's an edge case...). Also, think this simplifies?

self.state_persister = persister # track for later
return self

async def __with_async_state_persister(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we just want to push this into build for both sync/async?

results=result, final_state=state
out = (
action,
StreamingResultContainer.pass_through(results=result, final_state=state),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting -- is there a reason it changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, might have been my local settings..

@@ -2249,7 +2279,7 @@ def with_tracker(

def initialize_from(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I like with_state_persister logic moving to build -- it mirrors this. Might be worth doing for sync side as well. Can always leave as a TODO. But it should simplify logic?

pass

@telemetry.capture_function_usage
async def build(self) -> Application[StateType]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this has duplicated logic from the synchronous one. I wonder if this is a reason to combine.

  • _with_state_persister, _load_from_persister are helper functions for each
  • _build_common() or something has shared logic. Everything after 2654
  • build() has synchronous logic -- calls _build_common()
  • abuild() has async logic -- also calls _build_common

self.state_persister = persister # track for later
return self

async def __with_async_state_persister(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit -- naming conventions:

  • no underscore -- public method
  • one underscore -- private method
  • two underscores -- name mangling, private for subclasses
  • two underscores before/after -- system-level concerns

This should have a single underscore

burr/core/persistence.py Show resolved Hide resolved
Copy link
Contributor

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good -- as noted in a DM I think we have to align on behavior. E.G. when using .build() versus .abuild(), sync versus async persister, amethod versus method -- which configurations break/log warning/work? Cartesian product/table is a clean way to think of it.

self.state = State()

if self.state_persister:
await self._set_async_state_persister() # this is used to save the state during application run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that .abuild() should allow a sync persister but log a warning. But maybe not? Seems fair to force it here...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that, yes, that case should not work

@jernejfrank
Copy link
Contributor Author

Looks really good -- as noted in a DM I think we have to align on behavior. E.G. when using .build() versus .abuild(), sync versus async persister, amethod versus method -- which configurations break/log warning/work? Cartesian product/table is a clean way to think of it.

screenshot_2024-12-29_at_17 32 31

This is a current working proposition of which cases should allow what. I think we can replace persister with hooks and put it in the builder docs as well to have clarity on how to use .abuild().

@elijahbenizzy
Copy link
Contributor

Looks really good -- as noted in a DM I think we have to align on behavior. E.G. when using .build() versus .abuild(), sync versus async persister, amethod versus method -- which configurations break/log warning/work? Cartesian product/table is a clean way to think of it.

screenshot_2024-12-29_at_17 32 31

This is a current working proposition of which cases should allow what. I think we can replace persister with hooks and put it in the builder docs as well to have clarity on how to use .abuild().

Looks really good -- as noted in a DM I think we have to align on behavior. E.G. when using .build() versus .abuild(), sync versus async persister, amethod versus method -- which configurations break/log warning/work? Cartesian product/table is a clean way to think of it.

screenshot_2024-12-29_at_17 32 31

This is a current working proposition of which cases should allow what. I think we can replace persister with hooks and put it in the builder docs as well to have clarity on how to use .abuild().

OK, yeah, I think this is the right way to do it! Nice work enumerating it, you have me convinced. The only case (and I'm not sure how this works in the one above) is when you have a synchronous hook in an async .abuild() method. Not a persister (so not fully covered by the above), but I think it should work. That said, we should have appropriate documentation about it.

Copy link
Contributor

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Some small points:

  1. I think we should document this clearly. Ideally is a sync versus async page in concepts, but this could just be part of the docstring
  2. A bit of a strange case in parallelism, where we call the new function by default. Might be worth adding in a parameter to bypass validation. Or just breaking -- it's an edge case.

Otherwise, let's ship soon! Nice work!

)

# Seems fair to raise this if everything is async but the app execution is sync
if self._adapter_set.async_hooks and self._builder.is_async:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused -- this is erroring out if the builder is async and the adapter has async hooks? .is_async?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! So I am calling this in the sync methods and it covers cases 3 and 7 in the above table. Thinking back, maybe it's better to be strict from the get go and only check _builder.is_async and error out.

Previously, if there were no async hooks present (i.e. no async persisters, etc.) it doesn't really matter how the app was built or is run -- since only sync hooks in the app. But might as well just say if you build it async, use it async.

@@ -2484,3 +2589,51 @@ def build(self) -> Application[StateType]:
state_persister=self.state_persister,
state_initializer=self.state_initializer,
)

@telemetry.capture_function_usage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice this function is extremely clean

self.state = State()

if self.state_persister:
await self._set_async_state_persister() # this is used to save the state during application run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that, yes, that case should not work

async def abuild(self) -> Application[StateType]:
"""Builds the application.

This function is a bit messy as we iron out the exact logic and rigor we want around things.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer messy, fix this. And make it + build() clear about the parameters (good place to have this table?).

status: Literal["completed", "failed"],
**kwargs,
):
# print("I saved something.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

else:
builder = builder.with_entrypoint(self.graph.entrypoint).with_state(self.state)

return await builder.abuild()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, complexity, this has the off-chance of making something backwards incompatible. That said, I think that's OK TBH. In this case it's an odd one:

  1. Someone is using the async parallelism piece
  2. Someone is not using the async persister (which they won't be, it isn't there)
  3. It breaks on reloading

I think very few people are doing this, but it might be worth having a parameter in .abuild() to bypass validation, we could do that here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add the parameter, but not sure if it is that helpful. If they are using sync persisters they will have to go through the sync .build() method anyway since .abuild() expects async persisters.

This is what I capture now parallelism .arun(): we check if we have sync persisters and then use .build() to have that backward compatibility, otherwise everything async and ok.

@jernejfrank
Copy link
Contributor Author

jernejfrank commented Dec 30, 2024

OK, yeah, I think this is the right way to do it! Nice work enumerating it, you have me convinced. The only case (and I'm not sure how this works in the one above) is when you have a synchronous hook in an async .abuild() method. Not a persister (so not fully covered by the above), but I think it should work. That said, we should have appropriate documentation about it.

This works as before. You run both sync and async hooks --> sync ones are blocking. Also, if you use sync persister with .build() it will still let you run async (this is backwards compatibility). But if you use the new .abuild() method, it complains in there about using sync persisters and later when you have the application it complains that you are trying to run it sync.

Will document!

@jernejfrank jernejfrank force-pushed the async_persistance branch 2 times, most recently from 5008a12 to b7ba29f Compare December 30, 2024 13:13
Copy link
Contributor

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Really big improvements. Left some small comments but it's there.

# this application is meant to be run in async mode.
if self._builder and self._builder.is_async:
raise ValueError(
"The application was build with async hooks "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this say "it was built with abuild" -- think that's clearer/more actionable

Sync vs Async Applications
===========================

TL;DR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link to the arun, aiterate, etc... methods. Good to tie it back into the actions.

Adding async support for persistance and refactoring builder:
- classes for building async persistance adapters / hooks
- builder extended to include async initializer/persister, async build
- builder refactored
- application added async validation, warning/error when async hooks not
  invoked
- automatic built for app in parallelism made backward compatible
Prototyping and testing async persisters:
- Add AsyncDevNull and AsyncInMemory persisters for tests
- Added support for async sqlite persister
- Test Async persister interface, async builder, async application
Docs explaining allowed cases and deprecation warnings.
@elijahbenizzy elijahbenizzy merged commit c36877d into DAGWorks-Inc:main Jan 4, 2025
10 of 11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants