Skip to content

Commit

Permalink
Demonstrate workflow.all_handlers_finished (#139)
Browse files Browse the repository at this point in the history
* Demonstrate workflow.all_handlers_finished

* Document all_handlers_finished in the README
  • Loading branch information
drewhoskins-temporal authored Sep 4, 2024
1 parent ccf0945 commit 4df348a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
6 changes: 3 additions & 3 deletions updates_and_signals/safe_message_handlers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ This sample shows off important techniques for handling signals and updates, aka

* Here, using workflow.wait_condition, signal and update handlers will only operate when the workflow is within a certain state--between cluster_started and cluster_shutdown.
* You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start."
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so we use a lock to protect shared state from interleaved access.
* Message handlers should also finish before the workflow run completes. One option is to use a lock.
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. Just make sure message handlers have finished before doing so.
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so you can use a lock to protect shared state from interleaved access.
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time.
* Most people want their message handlers to finish before the workflow run completes or continues as new. Use `await workflow.wait_condition(lambda: workflow.all_handlers_finished())` to achieve this.
* Message handlers can be made idempotent. See update `ClusterManager.assign_nodes_to_job`.

To run, first see [README.md](../../README.md) for prerequisites.
Expand Down
7 changes: 4 additions & 3 deletions updates_and_signals/safe_message_handlers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,6 @@ def init(self, input: ClusterManagerInput) -> None:
self.sleep_interval_seconds = 1

def should_continue_as_new(self) -> bool:
# We don't want to continue-as-new if we're in the middle of an update
if self.nodes_lock.locked():
return False
if workflow.info().is_continue_as_new_suggested():
return True
# This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new.
Expand Down Expand Up @@ -243,13 +240,17 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
if self.state.cluster_shutdown:
break
if self.should_continue_as_new():
# We don't want to leave any job assignment or deletion handlers half-finished when we continue as new.
await workflow.wait_condition(lambda: workflow.all_handlers_finished())
workflow.logger.info("Continuing as new")
workflow.continue_as_new(
ClusterManagerInput(
state=self.state,
test_continue_as_new=input.test_continue_as_new,
)
)
# Make sure we finish off handlers such as deleting jobs before we complete the workflow.
await workflow.wait_condition(lambda: workflow.all_handlers_finished())
return ClusterManagerResult(
len(self.get_assigned_nodes()),
len(self.get_bad_nodes()),
Expand Down

0 comments on commit 4df348a

Please sign in to comment.