Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Sticky Activities sample #75

Merged
merged 4 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ Some examples require extra dependencies. See each sample's directory for specif
while running.
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
<!-- Keep this list in alphabetical order -->
* [activity_sticky_queue](activity_sticky_queues) - Uses unique task queues to ensure activities run on specific workers.
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
Expand All @@ -65,6 +64,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.

## Test
Expand Down
51 changes: 0 additions & 51 deletions activity_sticky_queues/README.md

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path
from unittest import mock

from activity_sticky_queues import tasks
from worker_specific_task_queues import tasks

RETURNED_PATH = "valid/path"
tasks._get_delay_secs = mock.MagicMock(return_value=0.0001)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from activity_sticky_queues import tasks
from worker_specific_task_queues import tasks

CHECKSUM = "a checksum"
RETURNED_PATH = "valid/path"
Expand Down
56 changes: 56 additions & 0 deletions worker_specific_task_queues/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Worker-Specific Task Queues

Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker. In the Go SDK, this is explicitly supported via the Session option, but in other SDKs a different approach is required.

Typical use cases include tasks where interaction with a filesystem is required, such as data processing or interacting with legacy access structures. This example will write text files to folders corresponding to each worker, located in the `demo_fs` folder. In production, these folders would typically be independent machines in a worker cluster.

This strategy is:

- Each Worker process runs two `Worker`s:
- One `Worker` listens on the `worker_specific_task_queue-distribution-queue` Task Queue.
- Another `Worker` listens on a uniquely generated Task Queue.
- The Workflow and the first Activity are run on `worker_specific_task_queue-distribution-queue`.
- The first Activity returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**).
- The rest of the Activities do the file processing and are run on the Worker-specific Task Queue.

Check the Temporal Web UI to confirm tasks were staying with their respective worker.

It doesn't matter where the `get_available_task_queue` activity is run, so it can be executed on the shared Task Queue. In this demo, `unique_worker_task_queue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045).

Activities have been artificially slowed with `time.sleep(3)` to simulate doing more work.

### Running This Sample

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute the workflow:

poetry run python starter.py

#### Example output:

```bash
(temporalio-samples-py3.10) user@machine:~/samples-python/activities_sticky_queues$ poetry run python starter.py
Output checksums:
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
```

<details>
<summary>Checking the history to see where activities are run</summary>
All activities for the one workflow are running against the same task queue, which corresponds to unique workers:

![image](./static/all-activitites-on-same-task-queue.png)

</details>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from temporalio.client import Client

from activity_sticky_queues.tasks import FileProcessing
from worker_specific_task_queues.tasks import FileProcessing


async def main():
Expand All @@ -15,8 +15,8 @@ async def main():
for idx in range(10):
result = client.execute_workflow(
FileProcessing.run,
id=f"activity_sticky_queue-workflow-id-{idx}",
task_queue="activity_sticky_queue-distribution-queue",
id=f"worker_specific_task_queue-workflow-id-{idx}",
task_queue="worker_specific_task_queue-distribution-queue",
)
await asyncio.sleep(0.1)
futures.append(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ class FileProcessing:
async def run(self) -> str:
"""Workflow implementing the basic file processing example.

First, a worker is selected randomly. This is the "sticky worker" on which
the workflow runs. This consists of a file download and some processing task,
with a file cleanup if an error occurs.
First, a task queue is selected randomly. A single worker is listening on
this queue, so when we execute all the file processing activities on this
queue, they will all be run on the same worker, and all be able to access
the same file on disk. The activities download the file, do some processing
task on the file, and clean up the file.
"""
workflow.logger.info("Searching for available worker")
unique_worker_task_queue = await workflow.execute_activity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from temporalio.client import Client
from temporalio.worker import Worker

from activity_sticky_queues import tasks
from worker_specific_task_queues import tasks

interrupt_event = asyncio.Event()

Expand All @@ -21,7 +21,9 @@ async def main():
random.seed(667)

# Create random task queues and build task queue selection function
task_queue: str = f"activity_sticky_queue-host-{UUID(int=random.getrandbits(128))}"
task_queue: str = (
f"worker_specific_task_queue-host-{UUID(int=random.getrandbits(128))}"
)

@activity.defn(name="get_available_task_queue")
async def select_task_queue() -> str:
Expand All @@ -35,7 +37,7 @@ async def select_task_queue() -> str:
run_futures = []
handle = Worker(
client,
task_queue="activity_sticky_queue-distribution-queue",
task_queue="worker_specific_task_queue-distribution-queue",
workflows=[tasks.FileProcessing],
activities=[select_task_queue],
)
Expand Down
Loading