Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Update and Signal handlers concurrency sample #123

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5d0307d
Atomic message handlers sample
drewhoskins Jun 19, 2024
bca534a
Remove resize jobs to reduce code size
drewhoskins Jun 19, 2024
8b0a6ed
Misc polish
drewhoskins Jun 19, 2024
fb7b32f
Add test
drewhoskins Jun 19, 2024
42d1f12
Format code
drewhoskins Jun 19, 2024
c96f06d
Continue as new
drewhoskins Jun 20, 2024
6944099
Formatting
drewhoskins Jun 20, 2024
ec1fb89
Feedback, readme, restructure files and directories
drewhoskins Jun 22, 2024
dd58c64
Format
drewhoskins Jun 22, 2024
37e56ed
More feedback. Add test-continue-as-new flag.
drewhoskins Jun 24, 2024
a1506b1
Feedback; throw ApplicationFailures from update handlers
drewhoskins Jun 24, 2024
2cad3dd
Formatting
drewhoskins Jun 24, 2024
d5db7d7
__init__.py
drewhoskins Jun 24, 2024
f39841c
Fix lint issues
drewhoskins Jun 24, 2024
344d694
Dan Feedback
drewhoskins Jun 25, 2024
fc74a69
More typehints
drewhoskins Jun 25, 2024
0b84c25
s/atomic/safe/
drewhoskins Jun 25, 2024
c8e9075
Fix and demo idempotency
drewhoskins Jun 26, 2024
4fc6dac
Compatibility with 3.8
drewhoskins Jun 26, 2024
3ba8882
More feedback
drewhoskins Jun 27, 2024
f47369e
Re-add tests
drewhoskins Jun 27, 2024
5dc6185
Fix flaky test
drewhoskins Jun 27, 2024
5b45b21
Improve update and tests
drewhoskins-temporal Jul 8, 2024
ce4d384
Ruff linting
drewhoskins-temporal Jul 8, 2024
52429bd
Use consistent verbs, improve health check
drewhoskins-temporal Jul 8, 2024
74867f1
poe format
drewhoskins-temporal Jul 8, 2024
c6bdd12
Minor sample improvements
drewhoskins-temporal Jul 8, 2024
62f24a2
Skip update tests under Java test server
dandavison Jul 22, 2024
d933042
Merge pull request #1 from dandavison/drewhoskins_concurrency_sample-dan
drewhoskins-temporal Jul 24, 2024
31e2d59
Merge branch 'main' into drewhoskins_concurrency_sample
drewhoskins-temporal Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
<!-- Keep this list in alphabetical order -->
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
* [atomic_message_handlers](updates_and_signals/atomic_message_handlers/) - Safely handling updates and signals.
Copy link
Contributor

@dandavison dandavison Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name of the sample should be changed to something like safe_message_handling. It's not about atomicity -- the sample doesn't demonstrate rolling back of incomplete side effects. Rather it's about maintaining strict isolation between handler executions, via serialization of handler executions. In any case, we don't want users to think this is showing a specialized form of message handling that they can ignore; we want them to consider whether they need this for any workflow with message handlers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. "Safe" feels much more like something I'm supposed to read.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the biggest fan of "safe" vs "atomic" since the latter is more discoverable/descriptive when looking at the list of samples, but I don't have a strong opinion here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cretz we could choose a word other than "safe", but I argued above that "atomic" isn't the right word.

Copy link
Member

@cretz cretz Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think "atomic" relates to rollback at all. Atomic just means one at a time or uninterruptible, as opposed to "transactional". But many can also see it as meaning "quick" or "all or none", but I don't see it that way when I see it used. I think atomic is an ok word, but again I don't have a strong opinion. Also "safe" has a lot of meanings for Temporal workflow code. Many users will be ok w/ their handlers running concurrently and will still be "safe". Maybe "serial" or something, unsure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to be honest I'm not in love with "safe" and implying that any other usage style is not safe.

Hm, an atomic operation is one that either completes in its entirety or behaves as if it never started, and can't be seen in an intermediate state. So, if the operation has multiple stages with side effects, that would require some notion of rollback. It's usually synonymous with "transactional". I agree it's closely related to the idea of serializing executions so that they occur one at a time, since that's one way of ensuring that one execution can't see in-progress state of another, but using "atomic" would imply that message handling that does multiple writes can rollback incomplete changes. I think here we're talking about "serialized message processing" or "preventing corruption of shared state by message handlers".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's more here than just concurrency, such as dangling handlers. Sticking with safe.
I'm think I'm going to touch on idempotency as well in my next push, though we probably should also add a more focused idempotency sample.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to be honest I'm not in love with "safe" and implying that any other usage style is not safe.

I don't think it implies that. "Robust" is an alternate word.

Copy link
Contributor Author

@drewhoskins-temporal drewhoskins-temporal Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: added idempotency. I didn't use the built-in update ID, since it wasn't necessary here. Maybe that can be our separate idempotency sample.

* [bedrock](bedrock) - Orchestrate a chatbot with Amazon Bedrock.
* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported files on an hourly basis
* [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor.
Expand Down
74 changes: 74 additions & 0 deletions tests/updates_and_signals/atomic_message_handlers_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import uuid

from temporalio import common, workflow
from temporalio.client import Client, WorkflowUpdateFailedError
from temporalio.worker import Worker

from updates_and_signals.atomic_message_handlers.activities import (
allocate_nodes_to_job,
deallocate_nodes_for_job,
find_bad_nodes,
)
from updates_and_signals.atomic_message_handlers.starter import do_cluster_lifecycle
from updates_and_signals.atomic_message_handlers.workflow import (
ClusterManagerAllocateNNodesToJobInput,
ClusterManagerInput,
ClusterManagerWorkflow,
)


async def test_atomic_message_handlers(client: Client):
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(
client,
task_queue=task_queue,
workflows=[ClusterManagerWorkflow],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManagerWorkflow.run,
ClusterManagerInput(),
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
task_queue=task_queue,
)
await do_cluster_lifecycle(cluster_manager_handle, delay_seconds=1)
result = await cluster_manager_handle.result()
assert result.max_assigned_nodes == 12
assert result.num_currently_assigned_nodes == 0


async def test_update_failure(client: Client):
task_queue = f"tq-{uuid.uuid4()}"
async with Worker(
client,
task_queue=task_queue,
workflows=[ClusterManagerWorkflow],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
):
cluster_manager_handle = await client.start_workflow(
ClusterManagerWorkflow.run,
ClusterManagerInput(),
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
task_queue=task_queue,
)

await cluster_manager_handle.signal(ClusterManagerWorkflow.start_cluster)

await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.allocate_n_nodes_to_job,
ClusterManagerAllocateNNodesToJobInput(num_nodes=24, task_name=f"big-task"),
)
try:
# Try to allocate too many nodes
await cluster_manager_handle.execute_update(
ClusterManagerWorkflow.allocate_n_nodes_to_job,
ClusterManagerAllocateNNodesToJobInput(
num_nodes=3, task_name=f"little-task"
),
)
except WorkflowUpdateFailedError as e:
assert e.cause.message == "Cannot allocate 3 nodes; have only 1 available"
finally:
await cluster_manager_handle.signal(ClusterManagerWorkflow.shutdown_cluster)
result = await cluster_manager_handle.result()
assert result.num_currently_assigned_nodes == 24
20 changes: 20 additions & 0 deletions updates_and_signals/atomic_message_handlers/README.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can just be at a top-level directory of atomic_message_handlers, no need to nest an extra directory deep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I wanted people to see updates and signals for discoverability, and we're planning at least one more updates sample.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't usually grouped by those top-level features before but more by what the sample does. So we don't have interceptors/context_propagation and interceptors/sentry, just two top-level separate samples that use the same Temporal features. We just need to determine whether we want this type of grouping now and maybe apply it generally. I know our other samples repositories have also tried to avoid nesting most samples.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary README at the root of this repo should be updated to reference this sample

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Atomic message handlers

This sample shows off important techniques for handling signals and updates, aka messages. In particular, it illustrates how message handlers can interleave and how you can manage that.

* Here, using workflow.wait_condition, signal and update handlers will only operate when the workflow is within a certain state--between cluster_started and cluster_shutdown.
* You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start."
* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so we use a lock to protect shared state from interleaved access.
* Message handlers should also finish before the workflow run completes. One option is to use a lock.
* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `workflow.info().is_continue_as_new_suggested()` to see when it's time. Just make sure message handlers have finished before doing so.

To run, first see [README.md](../../README.md) for prerequisites.

Then, run the following from this directory to run the sample:

```bash
poetry run python worker.py
poetry run python starter.py
```

This will start a worker to run your workflow and activities, then start a ClusterManagerWorkflow and put it through its paces.
Empty file.
45 changes: 45 additions & 0 deletions updates_and_signals/atomic_message_handlers/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio
from dataclasses import dataclass
from typing import List

from temporalio import activity


@dataclass(kw_only=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@dataclass(kw_only=True)
@dataclass

Probably not needed, but no big deal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer named arguments in general for 2+ parameters. Cuts down on callsite bugs and makes them clearer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still use named arguments. We use them in lots of places, but since we're the only users of them we don't need to set this setting to force us to use them. Also, we have a CI check for our samples in 3.8 and I don't think this came about until 3.10 (we can look into relaxing our CI version constraints though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, too bad. I'd rather people who pattern-match off of this sample be directed toward best practices. Will remove for now. I wonder if we have stats on python versions people actually use in the wild?

class AllocateNodesToJobInput:
nodes: List[str]
task_name: str


@activity.defn
async def allocate_nodes_to_job(input: AllocateNodesToJobInput):
print(f"Assigning nodes {input.nodes} to job {input.task_name}")
await asyncio.sleep(0.1)


@dataclass(kw_only=True)
class DeallocateNodesForJobInput:
nodes: List[str]
task_name: str


@activity.defn
async def deallocate_nodes_for_job(input: DeallocateNodesForJobInput):
print(f"Deallocating nodes {input.nodes} from job {input.task_name}")
await asyncio.sleep(0.1)


@dataclass(kw_only=True)
class FindBadNodesInput:
nodes_to_check: List[str]


@activity.defn
async def find_bad_nodes(input: FindBadNodesInput) -> List[str]:
await asyncio.sleep(0.1)
bad_nodes = [n for n in input.nodes_to_check if int(n) % 5 == 0]
if bad_nodes:
print(f"Found bad nodes: {bad_nodes}")
else:
print("No new bad nodes found.")
return bad_nodes
80 changes: 80 additions & 0 deletions updates_and_signals/atomic_message_handlers/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import argparse
import asyncio
import logging
import uuid
from typing import Optional

from temporalio import client, common
from temporalio.client import Client, WorkflowHandle

from updates_and_signals.atomic_message_handlers.workflow import (
ClusterManagerAllocateNNodesToJobInput,
ClusterManagerDeleteJobInput,
ClusterManagerInput,
ClusterManagerWorkflow,
)


async def do_cluster_lifecycle(wf: WorkflowHandle, delay_seconds: Optional[int] = None):

await wf.signal(ClusterManagerWorkflow.start_cluster)

allocation_updates = []
for i in range(6):
allocation_updates.append(
wf.execute_update(
ClusterManagerWorkflow.allocate_n_nodes_to_job,
ClusterManagerAllocateNNodesToJobInput(
num_nodes=2, task_name=f"task-{i}"
),
)
)
await asyncio.gather(*allocation_updates)

if delay_seconds:
await asyncio.sleep(delay_seconds)

deletion_updates = []
for i in range(6):
deletion_updates.append(
wf.execute_update(
ClusterManagerWorkflow.delete_job,
ClusterManagerDeleteJobInput(task_name=f"task-{i}"),
)
)
await asyncio.gather(*deletion_updates)

await wf.signal(ClusterManagerWorkflow.shutdown_cluster)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably shutdown could be an update that returns what the workflow returns instead of making it a two-step process (but this is fine too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool idea, and would show off the power of update. Ran out of time this A.M, though.



async def main(should_test_continue_as_new: bool):
# Connect to Temporal
client = await Client.connect("localhost:7233")

cluster_manager_handle = await client.start_workflow(
ClusterManagerWorkflow.run,
ClusterManagerInput(test_continue_as_new=should_test_continue_as_new),
id=f"ClusterManagerWorkflow-{uuid.uuid4()}",
task_queue="atomic-message-handlers-task-queue",
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
)
delay_seconds = 10 if should_test_continue_as_new else 1
await do_cluster_lifecycle(cluster_manager_handle, delay_seconds=delay_seconds)
result = await cluster_manager_handle.result()
print(
f"Cluster shut down successfully. It peaked at {result.max_assigned_nodes} assigned nodes ."
f" It had {result.num_currently_assigned_nodes} nodes assigned at the end."
)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(description="Atomic message handlers")
parser.add_argument(
"--test-continue-as-new",
help="Make the ClusterManagerWorkflow continue as new before shutting down",
action="store_true",
default=False,
)
args = parser.parse_args()
asyncio.run(main(args.test_continue_as_new))
41 changes: 41 additions & 0 deletions updates_and_signals/atomic_message_handlers/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import asyncio
import logging

from temporalio import activity, common, workflow
from temporalio.client import Client, WorkflowHandle
from temporalio.worker import Worker

from updates_and_signals.atomic_message_handlers.workflow import (
ClusterManagerWorkflow,
allocate_nodes_to_job,
deallocate_nodes_for_job,
find_bad_nodes,
)

interrupt_event = asyncio.Event()


async def main():
# Connect client
client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue="atomic-message-handlers-task-queue",
workflows=[ClusterManagerWorkflow],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
):
# Wait until interrupted
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
Loading
Loading