Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demonstrate workflow.all_handlers_finished #139

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions updates_and_signals/safe_message_handlers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ This sample shows off important techniques for handling signals and updates, aka

* Here, using workflow.wait_condition, signal and update handlers will only operate when the workflow is within a certain state--between cluster_started and cluster_shutdown.
* You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start."
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so we use a lock to protect shared state from interleaved access.
* Message handlers should also finish before the workflow run completes. One option is to use a lock.
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. Just make sure message handlers have finished before doing so.
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so you can use a lock to protect shared state from interleaved access.
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time.
* Most people want their message handlers to finish before the workflow run completes or continues as new. Use `await workflow.wait_condition(lambda: workflow.all_handlers_finished())` to achieve this.
* Message handlers can be made idempotent. See update `ClusterManager.assign_nodes_to_job`.

To run, first see [README.md](../../README.md) for prerequisites.
Expand Down
7 changes: 4 additions & 3 deletions updates_and_signals/safe_message_handlers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,6 @@ def init(self, input: ClusterManagerInput) -> None:
self.sleep_interval_seconds = 1

def should_continue_as_new(self) -> bool:
# We don't want to continue-as-new if we're in the middle of an update
if self.nodes_lock.locked():
return False
if workflow.info().is_continue_as_new_suggested():
return True
# This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new.
Expand Down Expand Up @@ -243,13 +240,17 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
if self.state.cluster_shutdown:
break
if self.should_continue_as_new():
# We don't want to leave any job assignment or deletion handlers half-finished when we continue as new.
await workflow.wait_condition(lambda: workflow.all_handlers_finished())
workflow.logger.info("Continuing as new")
workflow.continue_as_new(
ClusterManagerInput(
state=self.state,
test_continue_as_new=input.test_continue_as_new,
)
)
# Make sure we finish off handlers such as deleting jobs before we complete the workflow.
await workflow.wait_condition(lambda: workflow.all_handlers_finished())
return ClusterManagerResult(
len(self.get_assigned_nodes()),
len(self.get_bad_nodes()),
Expand Down