Skip to content

Commit

Permalink
Use consistent verbs, improve health check
Browse files Browse the repository at this point in the history
  • Loading branch information
drewhoskins-temporal committed Jul 8, 2024
1 parent ce4d384 commit 52429bd
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 79 deletions.
42 changes: 21 additions & 21 deletions tests/updates_and_signals/safe_message_handlers/workflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from temporalio.worker import Worker

from updates_and_signals.safe_message_handlers.activities import (
allocate_nodes_to_job,
deallocate_nodes_for_job,
assign_nodes_to_job,
unassign_nodes_for_job,
find_bad_nodes,
)
from updates_and_signals.safe_message_handlers.workflow import (
ClusterManagerAllocateNNodesToJobInput,
ClusterManagerAssignNodesToJobInput,
ClusterManagerDeleteJobInput,
ClusterManagerInput,
ClusterManagerWorkflow,
Expand All @@ -24,7 +24,7 @@ async def test_safe_message_handlers(client: Client):
client,
task_queue=task_queue,
workflows=[ClusterManagerWorkflow],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManagerWorkflow.run,
Expand All @@ -38,9 +38,9 @@ async def test_safe_message_handlers(client: Client):
for i in range(6):
allocation_updates.append(
cluster_manager_handle.execute_update(
ClusterManagerWorkflow.allocate_n_nodes_to_job,
ClusterManagerAllocateNNodesToJobInput(
num_nodes=2, job_name=f"task-{i}"
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(
total_num_nodes=2, job_name=f"task-{i}"
),
)
)
Expand Down Expand Up @@ -72,7 +72,7 @@ async def test_update_idempotency(client: Client):
client,
task_queue=task_queue,
workflows=[ClusterManagerWorkflow],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManagerWorkflow.run,
Expand All @@ -84,15 +84,15 @@ async def test_update_idempotency(client: Client):
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)

result_1 = await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.allocate_n_nodes_to_job,
ClusterManagerAllocateNNodesToJobInput(num_nodes=5, job_name="jobby-job"),
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(total_num_nodes=5, job_name="jobby-job"),
)
# simulate that in calling it twice, the operation is idempotent
result_2 = await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.allocate_n_nodes_to_job,
ClusterManagerAllocateNNodesToJobInput(num_nodes=5, job_name="jobby-job"),
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(total_num_nodes=5, job_name="jobby-job"),
)
# the second call should not allocate more nodes (it may return fewer if the health check finds bad nodes
# the second call should not assign more nodes (it may return fewer if the health check finds bad nodes
# in between the two signals.)
assert result_1.nodes_assigned >= result_2.nodes_assigned

Expand All @@ -103,7 +103,7 @@ async def test_update_failure(client: Client):
client,
task_queue=task_queue,
workflows=[ClusterManagerWorkflow],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManagerWorkflow.run,
Expand All @@ -115,20 +115,20 @@ async def test_update_failure(client: Client):
await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)

await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.allocate_n_nodes_to_job,
ClusterManagerAllocateNNodesToJobInput(num_nodes=24, job_name="big-task"),
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(total_num_nodes=24, job_name="big-task"),
)
try:
# Try to allocate too many nodes
# Try to assign too many nodes
await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.allocate_n_nodes_to_job,
ClusterManagerAllocateNNodesToJobInput(
num_nodes=3, job_name="little-task"
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(
total_num_nodes=3, job_name="little-task"
),
)
except WorkflowUpdateFailedError as e:
assert isinstance(e.cause, ApplicationError)
assert e.cause.message == "Cannot allocate 3 nodes; have only 1 available"
assert e.cause.message == "Cannot assign 3 nodes; have only 1 available"
finally:
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster)
result = await cluster_manager_handle.result()
Expand Down
2 changes: 1 addition & 1 deletion updates_and_signals/safe_message_handlers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This sample shows off important techniques for handling signals and updates, aka
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so we use a lock to protect shared state from interleaved access.
* Message handlers should also finish before the workflow run completes. One option is to use a lock.
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. Just make sure message handlers have finished before doing so.
* Message handlers can be made idempotent. See update `ClusterManager.allocate_n_nodes_to_job`.
* Message handlers can be made idempotent. See update `ClusterManager.assign_nodes_to_job`.

To run, first see [README.md](../../README.md) for prerequisites.

Expand Down
8 changes: 4 additions & 4 deletions updates_and_signals/safe_message_handlers/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@


@dataclass
class AllocateNodesToJobInput:
class AssignNodesToJobInput:
nodes: List[str]
job_name: str


@activity.defn
async def allocate_nodes_to_job(input: AllocateNodesToJobInput) -> None:
async def assign_nodes_to_job(input: AssignNodesToJobInput) -> None:
print(f"Assigning nodes {input.nodes} to job {input.job_name}")
await asyncio.sleep(0.1)


@dataclass
class DeallocateNodesForJobInput:
class UnassignNodesForJobInput:
nodes: List[str]
job_name: str


@activity.defn
async def deallocate_nodes_for_job(input: DeallocateNodesForJobInput) -> None:
async def unassign_nodes_for_job(input: UnassignNodesForJobInput) -> None:
print(f"Deallocating nodes {input.nodes} from job {input.job_name}")
await asyncio.sleep(0.1)

Expand Down
10 changes: 5 additions & 5 deletions updates_and_signals/safe_message_handlers/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import uuid
from typing import Optional

from temporalio import client, common
from temporalio import common
from temporalio.client import Client, WorkflowHandle

from updates_and_signals.safe_message_handlers.workflow import (
ClusterManagerAllocateNNodesToJobInput,
ClusterManagerAssignNodesToJobInput,
ClusterManagerDeleteJobInput,
ClusterManagerInput,
ClusterManagerWorkflow,
Expand All @@ -23,9 +23,9 @@ async def do_cluster_lifecycle(wf: WorkflowHandle, delay_seconds: Optional[int]
for i in range(6):
allocation_updates.append(
wf.execute_update(
ClusterManagerWorkflow.allocate_n_nodes_to_job,
ClusterManagerAllocateNNodesToJobInput(
num_nodes=2, job_name=f"task-{i}"
ClusterManagerWorkflow.assign_nodes_to_job,
ClusterManagerAssignNodesToJobInput(
total_num_nodes=2, job_name=f"task-{i}"
),
)
)
Expand Down
9 changes: 4 additions & 5 deletions updates_and_signals/safe_message_handlers/worker.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import asyncio
import logging

from temporalio import activity, common, workflow
from temporalio.client import Client, WorkflowHandle
from temporalio.client import Client
from temporalio.worker import Worker

from updates_and_signals.safe_message_handlers.workflow import (
ClusterManagerWorkflow,
allocate_nodes_to_job,
deallocate_nodes_for_job,
assign_nodes_to_job,
unassign_nodes_for_job,
find_bad_nodes,
)

Expand All @@ -23,7 +22,7 @@ async def main():
client,
task_queue="safe-message-handlers-task-queue",
workflows=[ClusterManagerWorkflow],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
):
# Wait until interrupted
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")
Expand Down
90 changes: 47 additions & 43 deletions updates_and_signals/safe_message_handlers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from temporalio.exceptions import ApplicationError

from updates_and_signals.safe_message_handlers.activities import (
AllocateNodesToJobInput,
DeallocateNodesForJobInput,
AssignNodesToJobInput,
UnassignNodesForJobInput,
FindBadNodesInput,
allocate_nodes_to_job,
deallocate_nodes_for_job,
assign_nodes_to_job,
unassign_nodes_for_job,
find_bad_nodes,
)

Expand All @@ -25,7 +25,7 @@ class ClusterManagerState:
cluster_started: bool = False
cluster_shutdown: bool = False
nodes: Dict[str, Optional[str]] = dataclasses.field(default_factory=dict)
jobs_added: Set[str] = dataclasses.field(default_factory=set)
jobs_assigned: Set[str] = dataclasses.field(default_factory=set)


@dataclass
Expand All @@ -42,8 +42,9 @@ class ClusterManagerResult:
# Be in the habit of storing message inputs and outputs in serializable structures.
# This makes it easier to add more over time in a backward-compatible way.
@dataclass
class ClusterManagerAllocateNNodesToJobInput:
num_nodes: int
class ClusterManagerAssignNodesToJobInput:
# If larger or smaller than previous amounts, will resize the job.
total_num_nodes: int
job_name: str


Expand All @@ -52,10 +53,10 @@ class ClusterManagerDeleteJobInput:
job_name: str

@dataclass
class ClusterManagerAllocateNNodesToJobResult:
class ClusterManagerAssignNodesToJobResult:
nodes_assigned: Set[str]

# ClusterManagerWorkflow keeps track of the allocations of a cluster of nodes.
# ClusterManagerWorkflow keeps track of the assignments of a cluster of nodes.
# Via signals, the cluster can be started and shutdown.
# Via updates, clients can also assign jobs to nodes and delete jobs.
# These updates must run atomically.
Expand Down Expand Up @@ -84,52 +85,52 @@ async def shutdown_cluster(self) -> None:
# before sending work to those nodes.
# Returns the list of node names that were allocated to the job.
@workflow.update
async def allocate_n_nodes_to_job(
self, input: ClusterManagerAllocateNNodesToJobInput
) -> ClusterManagerAllocateNNodesToJobResult:
async def assign_nodes_to_job(
self, input: ClusterManagerAssignNodesToJobInput
) -> ClusterManagerAssignNodesToJobResult:
await workflow.wait_condition(lambda: self.state.cluster_started)
if self.state.cluster_shutdown:
# If you want the client to receive a failure, either add an update validator and throw the
# exception from there, or raise an ApplicationError. Other exceptions in the main handler
# will cause the workflow to keep retrying and get it stuck.
raise ApplicationError(
"Cannot allocate nodes to a job: Cluster is already shut down"
"Cannot assign nodes to a job: Cluster is already shut down"
)

async with self.nodes_lock:
# Idempotency guard.
if input.job_name in self.state.jobs_added:
return ClusterManagerAllocateNNodesToJobResult(
if input.job_name in self.state.jobs_assigned:
return ClusterManagerAssignNodesToJobResult(
self.get_assigned_nodes(job_name=input.job_name))
unassigned_nodes = self.get_unassigned_nodes()
if len(unassigned_nodes) < input.num_nodes:
if len(unassigned_nodes) < input.total_num_nodes:
# If you want the client to receive a failure, either add an update validator and throw the
# exception from there, or raise an ApplicationError. Other exceptions in the main handler
# will cause the workflow to keep retrying and get it stuck.
raise ApplicationError(
f"Cannot allocate {input.num_nodes} nodes; have only {len(unassigned_nodes)} available"
f"Cannot assign {input.total_num_nodes} nodes; have only {len(unassigned_nodes)} available"
)
nodes_to_assign = unassigned_nodes[: input.num_nodes]
nodes_to_assign = unassigned_nodes[: input.total_num_nodes]
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
# with delete_job and perform_health_checks, which both touch self.state.nodes.
await self._allocate_nodes_to_job(nodes_to_assign, input.job_name)
return ClusterManagerAllocateNNodesToJobResult(
await self._assign_nodes_to_job(nodes_to_assign, input.job_name)
return ClusterManagerAssignNodesToJobResult(
nodes_assigned=self.get_assigned_nodes(job_name=input.job_name))

async def _allocate_nodes_to_job(
async def _assign_nodes_to_job(
self, assigned_nodes: List[str], job_name: str
) -> None:
await workflow.execute_activity(
allocate_nodes_to_job,
AllocateNodesToJobInput(nodes=assigned_nodes, job_name=job_name),
assign_nodes_to_job,
AssignNodesToJobInput(nodes=assigned_nodes, job_name=job_name),
start_to_close_timeout=timedelta(seconds=10),
)
for node in assigned_nodes:
self.state.nodes[node] = job_name
self.state.jobs_added.add(job_name)
self.state.jobs_assigned.add(job_name)

# Even though it returns nothing, this is an update because the client may want to track it, for example
# to wait for nodes to be deallocated before reassigning them.
# to wait for nodes to be unassignd before reassigning them.
@workflow.update
async def delete_job(self, input: ClusterManagerDeleteJobInput) -> None:
await workflow.wait_condition(lambda: self.state.cluster_started)
Expand All @@ -140,20 +141,20 @@ async def delete_job(self, input: ClusterManagerDeleteJobInput) -> None:
raise ApplicationError("Cannot delete a job: Cluster is already shut down")

async with self.nodes_lock:
nodes_to_free = [
nodes_to_unassign = [
k for k, v in self.state.nodes.items() if v == input.job_name
]
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
# with allocate_n_nodes_to_job and perform_health_checks, which all touch self.state.nodes.
await self._deallocate_nodes_for_job(nodes_to_free, input.job_name)
# with assign_nodes_to_job and perform_health_checks, which all touch self.state.nodes.
await self._unassign_nodes_for_job(nodes_to_unassign, input.job_name)

async def _deallocate_nodes_for_job(self, nodes_to_free: List[str], job_name: str):
async def _unassign_nodes_for_job(self, nodes_to_unassign: List[str], job_name: str):
await workflow.execute_activity(
deallocate_nodes_for_job,
DeallocateNodesForJobInput(nodes=nodes_to_free, job_name=job_name),
unassign_nodes_for_job,
UnassignNodesForJobInput(nodes=nodes_to_unassign, job_name=job_name),
start_to_close_timeout=timedelta(seconds=10),
)
for node in nodes_to_free:
for node in nodes_to_unassign:
self.state.nodes[node] = None

def get_unassigned_nodes(self) -> List[str]:
Expand All @@ -173,17 +174,20 @@ def get_assigned_nodes(self, *, job_name: Optional[str] = None) -> Set[str]:
async def perform_health_checks(self) -> None:
async with self.nodes_lock:
assigned_nodes = self.get_assigned_nodes()
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
# with allocate_n_nodes_to_job and delete_job, which both touch self.state.nodes.
bad_nodes = await workflow.execute_activity(
find_bad_nodes,
FindBadNodesInput(nodes_to_check=assigned_nodes),
start_to_close_timeout=timedelta(seconds=10),
# This health check is optional, and our lock would block the whole workflow if we let it retry forever.
retry_policy=RetryPolicy(maximum_attempts=1),
)
for node in bad_nodes:
self.state.nodes[node] = "BAD!"
try:
# This await would be dangerous without nodes_lock because it yields control and allows interleaving
# with assign_nodes_to_job and delete_job, which both touch self.state.nodes.
bad_nodes = await workflow.execute_activity(
find_bad_nodes,
FindBadNodesInput(nodes_to_check=assigned_nodes),
start_to_close_timeout=timedelta(seconds=10),
# This health check is optional, and our lock would block the whole workflow if we let it retry forever.
retry_policy=RetryPolicy(maximum_attempts=1),
)
for node in bad_nodes:
self.state.nodes[node] = "BAD!"
except Exception as e:
workflow.logger.warn(f"Health check failed with error {type(e).__name__}:{e}")

# The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint its state and
# continue-as-new.
Expand Down

0 comments on commit 52429bd

Please sign in to comment.