Skip to content

Commit

Permalink
Use workflow.init, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Dec 7, 2024
1 parent 0763309 commit 950681e
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions message_passing/safe_message_handlers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,17 @@ class ClusterManagerAssignNodesToJobResult:
# These updates must run atomically.
@workflow.defn
class ClusterManagerWorkflow:
def __init__(self) -> None:
self.state = ClusterManagerState()
@workflow.init
def __init__(self, input: ClusterManagerInput) -> None:
if input.state:
self.state = input.state
else:
self.state = ClusterManagerState()

if input.test_continue_as_new:
self.max_history_length = 120
self.sleep_interval_seconds = 1

# Protects workflow state from interleaved access
self.nodes_lock = asyncio.Lock()
self.max_history_length: Optional[int] = None
Expand Down Expand Up @@ -202,29 +211,8 @@ async def perform_health_checks(self) -> None:
f"Health check failed with error {type(e).__name__}:{e}"
)

# The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint its state and
# continue-as-new.
def init(self, input: ClusterManagerInput) -> None:
if input.state:
self.state = input.state
if input.test_continue_as_new:
self.max_history_length = 120
self.sleep_interval_seconds = 1

def should_continue_as_new(self) -> bool:
if workflow.info().is_continue_as_new_suggested():
return True
# This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new.
if (
self.max_history_length
and workflow.info().get_current_history_length() > self.max_history_length
):
return True
return False

@workflow.run
async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
self.init(input)
await workflow.wait_condition(lambda: self.state.cluster_started)
# Perform health checks at intervals.
while True:
Expand All @@ -239,6 +227,8 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
pass
if self.state.cluster_shutdown:
break
# The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint its state and
# continue-as-new.
if self.should_continue_as_new():
# We don't want to leave any job assignment or deletion handlers half-finished when we continue as new.
await workflow.wait_condition(lambda: workflow.all_handlers_finished())
Expand All @@ -255,3 +245,14 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
len(self.get_assigned_nodes()),
len(self.get_bad_nodes()),
)

def should_continue_as_new(self) -> bool:
if workflow.info().is_continue_as_new_suggested():
return True
# This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new.
if (
self.max_history_length
and workflow.info().get_current_history_length() > self.max_history_length
):
return True
return False

0 comments on commit 950681e

Please sign in to comment.