diff --git a/README.md b/README.md index cffcaeeb..63486fca 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,6 @@ Some examples require extra dependencies. See each sample's directory for specif while running. * [hello_signal](hello/hello_signal.py) - Send signals to a workflow. -* [activity_sticky_queue](activity_sticky_queues) - Uses unique task queues to ensure activities run on specific workers. * [activity_worker](activity_worker) - Use Python activities from a workflow in another language. * [custom_converter](custom_converter) - Use a custom payload converter to handle custom types. * [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity. @@ -65,6 +64,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. +* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers. * [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code. ## Test diff --git a/activity_sticky_queues/README.md b/activity_sticky_queues/README.md deleted file mode 100644 index 1c44539f..00000000 --- a/activity_sticky_queues/README.md +++ /dev/null @@ -1,51 +0,0 @@ -# Sticky Activity Queues - -This sample is a Python implementation of the [TypeScript "Sticky Workers" example](https://github.com/temporalio/samples-typescript/tree/main/activities-sticky-queues), full credit for the design to the authors of that sample. A [sticky execution](https://docs.temporal.io/tasks#sticky-execution) is a job distribution design pattern where all workflow computational tasks are executed on a single worker. In the Go and Java SDKs this is explicitly supported via the Session option, but in other SDKs a different approach is required. - -Typical use cases for sticky executions include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster. - -This strategy is: - -- Create a `get_available_task_queue` activity that generates a unique task queue name, `unique_worker_task_queue`. -- For activities intended to be "sticky", only register them in one Worker, and have that be the only Worker listening on that `unique_worker_task_queue`. This will be run on a series of `FileProcessing` workflows. -- Execute workflows from the Client like normal. Check the Temporal Web UI to confirm tasks were staying with their respective worker. - -It doesn't matter where the `get_available_task_queue` activity is run, so it can be "non sticky" as per Temporal default behavior. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). - -Activities have been artificially slowed with `time.sleep(3)` to simulate slow activities. - -### Running This Sample - -To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the -worker: - - poetry run python worker.py - -This will start the worker. Then, in another terminal, run the following to execute the workflow: - - poetry run python starter.py - -#### Example output: - -```bash -(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py -Output checksums: -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -``` - -
-Checking the history to see where activities are run -All activities for the one workflow are running against the same task queue, which corresponds to unique workers: - -![image](./static/all-activitites-on-same-task-queue.png) - -
diff --git a/tests/activity_sticky_queues/activity_sticky_queues_activity_test.py b/tests/activity_sticky_queues/activity_sticky_queues_activity_test.py index 8a0889a7..2525ba69 100644 --- a/tests/activity_sticky_queues/activity_sticky_queues_activity_test.py +++ b/tests/activity_sticky_queues/activity_sticky_queues_activity_test.py @@ -1,7 +1,7 @@ from pathlib import Path from unittest import mock -from activity_sticky_queues import tasks +from worker_specific_task_queues import tasks RETURNED_PATH = "valid/path" tasks._get_delay_secs = mock.MagicMock(return_value=0.0001) diff --git a/tests/activity_sticky_queues/activity_sticky_worker_workflow_test.py b/tests/activity_sticky_queues/activity_sticky_worker_workflow_test.py index 58ba824c..fa72e18b 100644 --- a/tests/activity_sticky_queues/activity_sticky_worker_workflow_test.py +++ b/tests/activity_sticky_queues/activity_sticky_worker_workflow_test.py @@ -9,7 +9,7 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker -from activity_sticky_queues import tasks +from worker_specific_task_queues import tasks CHECKSUM = "a checksum" RETURNED_PATH = "valid/path" diff --git a/worker_specific_task_queues/README.md b/worker_specific_task_queues/README.md new file mode 100644 index 00000000..8e8e577a --- /dev/null +++ b/worker_specific_task_queues/README.md @@ -0,0 +1,56 @@ +# Worker-Specific Task Queues + +Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker. In the Go SDK, this is explicitly supported via the Session option, but in other SDKs a different approach is required. + +Typical use cases include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster. + +This strategy is: + +- Each Worker process runs two `Worker`s: + - One `Worker` listens on the `worker_specific_task_queue-distribution-queue` Task Queue. + - Another `Worker` listens on a uniquely generated Task Queue. +- The Workflow and the first Activity are run on `worker_specific_task_queue-distribution-queue`. +- The first Activity returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**). +- The rest of the Activities do the file processing and are run on the Worker-specific Task Queue. + +Check the Temporal Web UI to confirm tasks were staying with their respective worker. + +It doesn't matter where the `get_available_task_queue` activity is run, so it can be executed on the shared Task Queue. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). + +Activities have been artificially slowed with `time.sleep(3)` to simulate doing more work. + +### Running This Sample + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the +worker: + + poetry run python worker.py + +This will start the worker. Then, in another terminal, run the following to execute the workflow: + + poetry run python starter.py + +#### Example output: + +```bash +(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py +Output checksums: +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +``` + +
+Checking the history to see where activities are run +All activities for the one workflow are running against the same task queue, which corresponds to unique workers: + +![image](./static/all-activitites-on-same-task-queue.png) + +
diff --git a/activity_sticky_queues/__init__.py b/worker_specific_task_queues/__init__.py similarity index 100% rename from activity_sticky_queues/__init__.py rename to worker_specific_task_queues/__init__.py diff --git a/activity_sticky_queues/demo_fs/.gitignore b/worker_specific_task_queues/demo_fs/.gitignore similarity index 100% rename from activity_sticky_queues/demo_fs/.gitignore rename to worker_specific_task_queues/demo_fs/.gitignore diff --git a/activity_sticky_queues/starter.py b/worker_specific_task_queues/starter.py similarity index 73% rename from activity_sticky_queues/starter.py rename to worker_specific_task_queues/starter.py index 98c91bfc..c55c63be 100644 --- a/activity_sticky_queues/starter.py +++ b/worker_specific_task_queues/starter.py @@ -3,7 +3,7 @@ from temporalio.client import Client -from activity_sticky_queues.tasks import FileProcessing +from worker_specific_task_queues.tasks import FileProcessing async def main(): @@ -15,8 +15,8 @@ async def main(): for idx in range(10): result = client.execute_workflow( FileProcessing.run, - id=f"activity_sticky_queue-workflow-id-{idx}", - task_queue="activity_sticky_queue-distribution-queue", + id=f"worker_specific_task_queue-workflow-id-{idx}", + task_queue="worker_specific_task_queue-distribution-queue", ) await asyncio.sleep(0.1) futures.append(result) diff --git a/activity_sticky_queues/static/all-activitites-on-same-task-queue.png b/worker_specific_task_queues/static/all-activitites-on-same-task-queue.png similarity index 100% rename from activity_sticky_queues/static/all-activitites-on-same-task-queue.png rename to worker_specific_task_queues/static/all-activitites-on-same-task-queue.png diff --git a/activity_sticky_queues/tasks.py b/worker_specific_task_queues/tasks.py similarity index 92% rename from activity_sticky_queues/tasks.py rename to worker_specific_task_queues/tasks.py index aa74e396..c54eb8b5 100644 --- a/activity_sticky_queues/tasks.py +++ b/worker_specific_task_queues/tasks.py @@ -102,9 +102,11 @@ class FileProcessing: async def run(self) -> str: """Workflow implementing the basic file processing example. - First, a worker is selected randomly. This is the "sticky worker" on which - the workflow runs. This consists of a file download and some processing task, - with a file cleanup if an error occurs. + First, a task queue is selected randomly. A single worker is listening on + this queue, so when we execute all the file processing activities on this + queue, they will all be run on the same worker, and all be able to access + the same file on disk. The activities download the file, do some processing + task on the file, and clean up the file. """ workflow.logger.info("Searching for available worker") unique_worker_task_queue = await workflow.execute_activity( diff --git a/activity_sticky_queues/worker.py b/worker_specific_task_queues/worker.py similarity index 89% rename from activity_sticky_queues/worker.py rename to worker_specific_task_queues/worker.py index 2b8fee9c..30ea18b6 100644 --- a/activity_sticky_queues/worker.py +++ b/worker_specific_task_queues/worker.py @@ -8,7 +8,7 @@ from temporalio.client import Client from temporalio.worker import Worker -from activity_sticky_queues import tasks +from worker_specific_task_queues import tasks interrupt_event = asyncio.Event() @@ -21,7 +21,9 @@ async def main(): random.seed(667) # Create random task queues and build task queue selection function - task_queue: str = f"activity_sticky_queue-host-{UUID(int=random.getrandbits(128))}" + task_queue: str = ( + f"worker_specific_task_queue-host-{UUID(int=random.getrandbits(128))}" + ) @activity.defn(name="get_available_task_queue") async def select_task_queue() -> str: @@ -35,7 +37,7 @@ async def select_task_queue() -> str: run_futures = [] handle = Worker( client, - task_queue="activity_sticky_queue-distribution-queue", + task_queue="worker_specific_task_queue-distribution-queue", workflows=[tasks.FileProcessing], activities=[select_task_queue], )