From ea705111a85c3389d523697b9eca91e3f951861b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= <lorensr@gmail.com> Date: Thu, 29 Jun 2023 19:41:51 -0400 Subject: [PATCH 1/3] Rename Sticky Activities sample --- README.md | 2 +- activity_sticky_queues/README.md | 51 ---------------- .../activity_sticky_queues_activity_test.py | 2 +- .../activity_sticky_worker_workflow_test.py | 2 +- worker_specific_task_queues/README.md | 56 ++++++++++++++++++ .../__init__.py | 0 .../demo_fs/.gitignore | 0 .../starter.py | 6 +- .../all-activitites-on-same-task-queue.png | Bin .../tasks.py | 8 ++- .../worker.py | 6 +- 11 files changed, 70 insertions(+), 63 deletions(-) delete mode 100644 activity_sticky_queues/README.md create mode 100644 worker_specific_task_queues/README.md rename {activity_sticky_queues => worker_specific_task_queues}/__init__.py (100%) rename {activity_sticky_queues => worker_specific_task_queues}/demo_fs/.gitignore (100%) rename {activity_sticky_queues => worker_specific_task_queues}/starter.py (73%) rename {activity_sticky_queues => worker_specific_task_queues}/static/all-activitites-on-same-task-queue.png (100%) rename {activity_sticky_queues => worker_specific_task_queues}/tasks.py (92%) rename {activity_sticky_queues => worker_specific_task_queues}/worker.py (89%) diff --git a/README.md b/README.md index 5c9b8cbd..626a6ac7 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,6 @@ Some examples require extra dependencies. See each sample's directory for specif while running. * [hello_signal](hello/hello_signal.py) - Send signals to a workflow. <!-- Keep this list in alphabetical order --> -* [activity_sticky_queue](activity_sticky_queue) - Uses unique task queues to ensure activities run on specific workers. * [activity_worker](activity_worker) - Use Python activities from a workflow in another language. * [custom_converter](custom_converter) - Use a custom payload converter to handle custom types. * [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity. @@ -63,6 +62,7 @@ Some examples require extra dependencies. See each sample's directory for specif * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. +* [worker_specific_task_queues](worker_specific_task_queues) - Uses unique task queues to ensure activities run on specific workers. ## Test diff --git a/activity_sticky_queues/README.md b/activity_sticky_queues/README.md deleted file mode 100644 index 1c44539f..00000000 --- a/activity_sticky_queues/README.md +++ /dev/null @@ -1,51 +0,0 @@ -# Sticky Activity Queues - -This sample is a Python implementation of the [TypeScript "Sticky Workers" example](https://github.com/temporalio/samples-typescript/tree/main/activities-sticky-queues), full credit for the design to the authors of that sample. A [sticky execution](https://docs.temporal.io/tasks#sticky-execution) is a job distribution design pattern where all workflow computational tasks are executed on a single worker. In the Go and Java SDKs this is explicitly supported via the Session option, but in other SDKs a different approach is required. - -Typical use cases for sticky executions include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster. - -This strategy is: - -- Create a `get_available_task_queue` activity that generates a unique task queue name, `unique_worker_task_queue`. -- For activities intended to be "sticky", only register them in one Worker, and have that be the only Worker listening on that `unique_worker_task_queue`. This will be run on a series of `FileProcessing` workflows. -- Execute workflows from the Client like normal. Check the Temporal Web UI to confirm tasks were staying with their respective worker. - -It doesn't matter where the `get_available_task_queue` activity is run, so it can be "non sticky" as per Temporal default behavior. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). - -Activities have been artificially slowed with `time.sleep(3)` to simulate slow activities. - -### Running This Sample - -To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the -worker: - - poetry run python worker.py - -This will start the worker. Then, in another terminal, run the following to execute the workflow: - - poetry run python starter.py - -#### Example output: - -```bash -(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py -Output checksums: -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 -``` - -<details> -<summary>Checking the history to see where activities are run</summary> -All activities for the one workflow are running against the same task queue, which corresponds to unique workers: - - - -</details> diff --git a/tests/activity_sticky_queues/activity_sticky_queues_activity_test.py b/tests/activity_sticky_queues/activity_sticky_queues_activity_test.py index 8a0889a7..2525ba69 100644 --- a/tests/activity_sticky_queues/activity_sticky_queues_activity_test.py +++ b/tests/activity_sticky_queues/activity_sticky_queues_activity_test.py @@ -1,7 +1,7 @@ from pathlib import Path from unittest import mock -from activity_sticky_queues import tasks +from worker_specific_task_queues import tasks RETURNED_PATH = "valid/path" tasks._get_delay_secs = mock.MagicMock(return_value=0.0001) diff --git a/tests/activity_sticky_queues/activity_sticky_worker_workflow_test.py b/tests/activity_sticky_queues/activity_sticky_worker_workflow_test.py index 58ba824c..fa72e18b 100644 --- a/tests/activity_sticky_queues/activity_sticky_worker_workflow_test.py +++ b/tests/activity_sticky_queues/activity_sticky_worker_workflow_test.py @@ -9,7 +9,7 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker -from activity_sticky_queues import tasks +from worker_specific_task_queues import tasks CHECKSUM = "a checksum" RETURNED_PATH = "valid/path" diff --git a/worker_specific_task_queues/README.md b/worker_specific_task_queues/README.md new file mode 100644 index 00000000..4bede156 --- /dev/null +++ b/worker_specific_task_queues/README.md @@ -0,0 +1,56 @@ +# Worker-Specific Task Queues + +Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker. In the Go and Java SDKs, this is explicitly supported via the Session option, but in other SDKs a different approach is required. + +Typical use cases include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster. + +This strategy is: + +- Each Worker process runs two `Worker`s: + - One `Worker` listens on the `worker_specific_task_queue-distribution-queue` Task Queue. + - Another `Worker` listens on a uniquely generated Task Queue. +- The Workflow and the first Activity are run on `worker_specific_task_queue-distribution-queue`. +- The first Activity returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**). +- The rest of the Activities do the file processing and are run on the Worker-specific Task Queue. + +Check the Temporal Web UI to confirm tasks were staying with their respective worker. + +It doesn't matter where the `get_available_task_queue` activity is run, so it can be executed on the shared Task Queue. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). + +Activities have been artificially slowed with `time.sleep(3)` to simulate doing more work. + +### Running This Sample + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the +worker: + + poetry run python worker.py + +This will start the worker. Then, in another terminal, run the following to execute the workflow: + + poetry run python starter.py + +#### Example output: + +```bash +(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py +Output checksums: +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +``` + +<details> +<summary>Checking the history to see where activities are run</summary> +All activities for the one workflow are running against the same task queue, which corresponds to unique workers: + + + +</details> diff --git a/activity_sticky_queues/__init__.py b/worker_specific_task_queues/__init__.py similarity index 100% rename from activity_sticky_queues/__init__.py rename to worker_specific_task_queues/__init__.py diff --git a/activity_sticky_queues/demo_fs/.gitignore b/worker_specific_task_queues/demo_fs/.gitignore similarity index 100% rename from activity_sticky_queues/demo_fs/.gitignore rename to worker_specific_task_queues/demo_fs/.gitignore diff --git a/activity_sticky_queues/starter.py b/worker_specific_task_queues/starter.py similarity index 73% rename from activity_sticky_queues/starter.py rename to worker_specific_task_queues/starter.py index 98c91bfc..c55c63be 100644 --- a/activity_sticky_queues/starter.py +++ b/worker_specific_task_queues/starter.py @@ -3,7 +3,7 @@ from temporalio.client import Client -from activity_sticky_queues.tasks import FileProcessing +from worker_specific_task_queues.tasks import FileProcessing async def main(): @@ -15,8 +15,8 @@ async def main(): for idx in range(10): result = client.execute_workflow( FileProcessing.run, - id=f"activity_sticky_queue-workflow-id-{idx}", - task_queue="activity_sticky_queue-distribution-queue", + id=f"worker_specific_task_queue-workflow-id-{idx}", + task_queue="worker_specific_task_queue-distribution-queue", ) await asyncio.sleep(0.1) futures.append(result) diff --git a/activity_sticky_queues/static/all-activitites-on-same-task-queue.png b/worker_specific_task_queues/static/all-activitites-on-same-task-queue.png similarity index 100% rename from activity_sticky_queues/static/all-activitites-on-same-task-queue.png rename to worker_specific_task_queues/static/all-activitites-on-same-task-queue.png diff --git a/activity_sticky_queues/tasks.py b/worker_specific_task_queues/tasks.py similarity index 92% rename from activity_sticky_queues/tasks.py rename to worker_specific_task_queues/tasks.py index aa74e396..ecca33ea 100644 --- a/activity_sticky_queues/tasks.py +++ b/worker_specific_task_queues/tasks.py @@ -102,9 +102,11 @@ class FileProcessing: async def run(self) -> str: """Workflow implementing the basic file processing example. - First, a worker is selected randomly. This is the "sticky worker" on which - the workflow runs. This consists of a file download and some processing task, - with a file cleanup if an error occurs. + First, a task queue is selected randomly. A single worker is listening on + this queue, so when we execute all the file processing activities on this + queue, they will all be run on the same worker, and all be able to access + the same file on disk. The activities download the file, do some processing + task on the file, and clean up the file. """ workflow.logger.info("Searching for available worker") unique_worker_task_queue = await workflow.execute_activity( diff --git a/activity_sticky_queues/worker.py b/worker_specific_task_queues/worker.py similarity index 89% rename from activity_sticky_queues/worker.py rename to worker_specific_task_queues/worker.py index 2b8fee9c..2ad30b22 100644 --- a/activity_sticky_queues/worker.py +++ b/worker_specific_task_queues/worker.py @@ -8,7 +8,7 @@ from temporalio.client import Client from temporalio.worker import Worker -from activity_sticky_queues import tasks +from worker_specific_task_queues import tasks interrupt_event = asyncio.Event() @@ -21,7 +21,7 @@ async def main(): random.seed(667) # Create random task queues and build task queue selection function - task_queue: str = f"activity_sticky_queue-host-{UUID(int=random.getrandbits(128))}" + task_queue: str = f"worker_specific_task_queue-host-{UUID(int=random.getrandbits(128))}" @activity.defn(name="get_available_task_queue") async def select_task_queue() -> str: @@ -35,7 +35,7 @@ async def select_task_queue() -> str: run_futures = [] handle = Worker( client, - task_queue="activity_sticky_queue-distribution-queue", + task_queue="worker_specific_task_queue-distribution-queue", workflows=[tasks.FileProcessing], activities=[select_task_queue], ) From d30354162d6e047de7b6e5a344f023429c16155c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= <lorensr@gmail.com> Date: Thu, 29 Jun 2023 21:55:06 -0400 Subject: [PATCH 2/3] Run poe format --- worker_specific_task_queues/tasks.py | 6 +++--- worker_specific_task_queues/worker.py | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/worker_specific_task_queues/tasks.py b/worker_specific_task_queues/tasks.py index ecca33ea..c54eb8b5 100644 --- a/worker_specific_task_queues/tasks.py +++ b/worker_specific_task_queues/tasks.py @@ -102,10 +102,10 @@ class FileProcessing: async def run(self) -> str: """Workflow implementing the basic file processing example. - First, a task queue is selected randomly. A single worker is listening on + First, a task queue is selected randomly. A single worker is listening on this queue, so when we execute all the file processing activities on this - queue, they will all be run on the same worker, and all be able to access - the same file on disk. The activities download the file, do some processing + queue, they will all be run on the same worker, and all be able to access + the same file on disk. The activities download the file, do some processing task on the file, and clean up the file. """ workflow.logger.info("Searching for available worker") diff --git a/worker_specific_task_queues/worker.py b/worker_specific_task_queues/worker.py index 2ad30b22..30ea18b6 100644 --- a/worker_specific_task_queues/worker.py +++ b/worker_specific_task_queues/worker.py @@ -21,7 +21,9 @@ async def main(): random.seed(667) # Create random task queues and build task queue selection function - task_queue: str = f"worker_specific_task_queue-host-{UUID(int=random.getrandbits(128))}" + task_queue: str = ( + f"worker_specific_task_queue-host-{UUID(int=random.getrandbits(128))}" + ) @activity.defn(name="get_available_task_queue") async def select_task_queue() -> str: From 0c9dab079c33a83c3a9db108b2728112c9a15be5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=E2=98=BA=EF=B8=8F?= <251288+lorensr@users.noreply.github.com> Date: Wed, 5 Jul 2023 22:41:42 -0400 Subject: [PATCH 3/3] Update worker_specific_task_queues/README.md --- worker_specific_task_queues/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker_specific_task_queues/README.md b/worker_specific_task_queues/README.md index 4bede156..8e8e577a 100644 --- a/worker_specific_task_queues/README.md +++ b/worker_specific_task_queues/README.md @@ -1,6 +1,6 @@ # Worker-Specific Task Queues -Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker. In the Go and Java SDKs, this is explicitly supported via the Session option, but in other SDKs a different approach is required. +Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker. In the Go SDK, this is explicitly supported via the Session option, but in other SDKs a different approach is required. Typical use cases include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster.