From 4df348a1438fa73949c6bde51dd83de503493f83 Mon Sep 17 00:00:00 2001 From: Drew Hoskins <166441821+drewhoskins-temporal@users.noreply.github.com> Date: Tue, 3 Sep 2024 21:28:33 -0700 Subject: [PATCH] Demonstrate workflow.all_handlers_finished (#139) * Demonstrate workflow.all_handlers_finished * Document all_handlers_finished in the README --- updates_and_signals/safe_message_handlers/README.md | 6 +++--- updates_and_signals/safe_message_handlers/workflow.py | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/updates_and_signals/safe_message_handlers/README.md b/updates_and_signals/safe_message_handlers/README.md index 44bb9457..7d727af3 100644 --- a/updates_and_signals/safe_message_handlers/README.md +++ b/updates_and_signals/safe_message_handlers/README.md @@ -4,9 +4,9 @@ This sample shows off important techniques for handling signals and updates, aka * Here, using workflow.wait_condition, signal and update handlers will only operate when the workflow is within a certain state--between cluster_started and cluster_shutdown. * You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start." -* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so we use a lock to protect shared state from interleaved access. -* Message handlers should also finish before the workflow run completes. One option is to use a lock. -* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. Just make sure message handlers have finished before doing so. +* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so you can use a lock to protect shared state from interleaved access. +* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. +* Most people want their message handlers to finish before the workflow run completes or continues as new. Use `await workflow.wait_condition(lambda: workflow.all_handlers_finished())` to achieve this. * Message handlers can be made idempotent. See update `ClusterManager.assign_nodes_to_job`. To run, first see [README.md](../../README.md) for prerequisites. diff --git a/updates_and_signals/safe_message_handlers/workflow.py b/updates_and_signals/safe_message_handlers/workflow.py index b8230578..5d441637 100644 --- a/updates_and_signals/safe_message_handlers/workflow.py +++ b/updates_and_signals/safe_message_handlers/workflow.py @@ -212,9 +212,6 @@ def init(self, input: ClusterManagerInput) -> None: self.sleep_interval_seconds = 1 def should_continue_as_new(self) -> bool: - # We don't want to continue-as-new if we're in the middle of an update - if self.nodes_lock.locked(): - return False if workflow.info().is_continue_as_new_suggested(): return True # This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new. @@ -243,6 +240,8 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: if self.state.cluster_shutdown: break if self.should_continue_as_new(): + # We don't want to leave any job assignment or deletion handlers half-finished when we continue as new. + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) workflow.logger.info("Continuing as new") workflow.continue_as_new( ClusterManagerInput( @@ -250,6 +249,8 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: test_continue_as_new=input.test_continue_as_new, ) ) + # Make sure we finish off handlers such as deleting jobs before we complete the workflow. + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) return ClusterManagerResult( len(self.get_assigned_nodes()), len(self.get_bad_nodes()),