From 8b392cacac212001ae73ee301c8473e61695c17a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Sat, 11 Nov 2023 21:35:03 -0500 Subject: [PATCH 1/5] move --- .../FileProcessingWorkflow.workflow.cs | 0 .../NonStickyActivities.cs | 0 src/{ActivityStickyQueues => WorkerSpecificTaskQueues}/Program.cs | 0 src/{ActivityStickyQueues => WorkerSpecificTaskQueues}/README.md | 0 .../StickyActivities.cs | 0 .../TemporalioSamples.ActivityStickyQueues.csproj | 0 6 files changed, 0 insertions(+), 0 deletions(-) rename src/{ActivityStickyQueues => WorkerSpecificTaskQueues}/FileProcessingWorkflow.workflow.cs (100%) rename src/{ActivityStickyQueues => WorkerSpecificTaskQueues}/NonStickyActivities.cs (100%) rename src/{ActivityStickyQueues => WorkerSpecificTaskQueues}/Program.cs (100%) rename src/{ActivityStickyQueues => WorkerSpecificTaskQueues}/README.md (100%) rename src/{ActivityStickyQueues => WorkerSpecificTaskQueues}/StickyActivities.cs (100%) rename src/{ActivityStickyQueues => WorkerSpecificTaskQueues}/TemporalioSamples.ActivityStickyQueues.csproj (100%) diff --git a/src/ActivityStickyQueues/FileProcessingWorkflow.workflow.cs b/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs similarity index 100% rename from src/ActivityStickyQueues/FileProcessingWorkflow.workflow.cs rename to src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs diff --git a/src/ActivityStickyQueues/NonStickyActivities.cs b/src/WorkerSpecificTaskQueues/NonStickyActivities.cs similarity index 100% rename from src/ActivityStickyQueues/NonStickyActivities.cs rename to src/WorkerSpecificTaskQueues/NonStickyActivities.cs diff --git a/src/ActivityStickyQueues/Program.cs b/src/WorkerSpecificTaskQueues/Program.cs similarity index 100% rename from src/ActivityStickyQueues/Program.cs rename to src/WorkerSpecificTaskQueues/Program.cs diff --git a/src/ActivityStickyQueues/README.md b/src/WorkerSpecificTaskQueues/README.md similarity index 100% rename from src/ActivityStickyQueues/README.md rename to src/WorkerSpecificTaskQueues/README.md diff --git a/src/ActivityStickyQueues/StickyActivities.cs b/src/WorkerSpecificTaskQueues/StickyActivities.cs similarity index 100% rename from src/ActivityStickyQueues/StickyActivities.cs rename to src/WorkerSpecificTaskQueues/StickyActivities.cs diff --git a/src/ActivityStickyQueues/TemporalioSamples.ActivityStickyQueues.csproj b/src/WorkerSpecificTaskQueues/TemporalioSamples.ActivityStickyQueues.csproj similarity index 100% rename from src/ActivityStickyQueues/TemporalioSamples.ActivityStickyQueues.csproj rename to src/WorkerSpecificTaskQueues/TemporalioSamples.ActivityStickyQueues.csproj From 57b77c36ce1cb1fcf6c4e7fe98b72777751408bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Sat, 11 Nov 2023 21:37:03 -0500 Subject: [PATCH 2/5] update --- README.md | 2 +- .../FileProcessingWorkflow.workflow.cs | 2 +- src/WorkerSpecificTaskQueues/NonStickyActivities.cs | 4 ++-- src/WorkerSpecificTaskQueues/Program.cs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index f2957e9..3b5c118 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,6 @@ Prerequisites: * [ActivityHeartbeatingCancellation](src/ActivityHeartbeatingCancellation) - How to use heartbeating and cancellation handling in an activity. * [ActivitySimple](src/ActivitySimple) - Simple workflow that runs simple activities. -* [ActivityStickyQueues](src/ActivityStickyQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker. * [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language. * [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter. * [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud. @@ -28,6 +27,7 @@ Prerequisites: * [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource. * [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future. +* [WorkerSpecificTaskQueues](src/WorkerSpecificTaskQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker. * [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code. ## Development diff --git a/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs b/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs index ea13baf..a538127 100644 --- a/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs +++ b/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs @@ -9,7 +9,7 @@ public class FileProcessingWorkflow public async Task RunAsync(int maxAttempts = 5) { var uniqueWorkerTaskQueue = await Workflow.ExecuteActivityAsync( - (NonStickyActivities act) => act.GetUniqueTaskQueue(), + (NormalActivities act) => act.GetUniqueTaskQueue(), new() { StartToCloseTimeout = TimeSpan.FromMinutes(1) }); var downloadPath = await Workflow.ExecuteActivityAsync( diff --git a/src/WorkerSpecificTaskQueues/NonStickyActivities.cs b/src/WorkerSpecificTaskQueues/NonStickyActivities.cs index 1254efb..3191d5e 100644 --- a/src/WorkerSpecificTaskQueues/NonStickyActivities.cs +++ b/src/WorkerSpecificTaskQueues/NonStickyActivities.cs @@ -2,11 +2,11 @@ namespace TemporalioSamples.ActivityStickyQueues; -public class NonStickyActivities +public class NormalActivities { private readonly string uniqueWorkerTaskQueue; - public NonStickyActivities(string uniqueWorkerTaskQueue) => + public NormalActivities(string uniqueWorkerTaskQueue) => this.uniqueWorkerTaskQueue = uniqueWorkerTaskQueue; [Activity] diff --git a/src/WorkerSpecificTaskQueues/Program.cs b/src/WorkerSpecificTaskQueues/Program.cs index c8b0905..602c13b 100644 --- a/src/WorkerSpecificTaskQueues/Program.cs +++ b/src/WorkerSpecificTaskQueues/Program.cs @@ -24,7 +24,7 @@ async Task RunWorkerAsync() var uniqueWorkerTaskQueue = Guid.NewGuid().ToString(); - var nonStickyActivities = new NonStickyActivities(uniqueWorkerTaskQueue); + var normalActivities = new NormalActivities(uniqueWorkerTaskQueue); // Run worker until cancelled Console.WriteLine("Running worker"); @@ -32,7 +32,7 @@ async Task RunWorkerAsync() using var nonStickyWorker = new TemporalWorker( client, new TemporalWorkerOptions(taskQueue: "activity-sticky-queues-sample") - .AddActivity(nonStickyActivities.GetUniqueTaskQueue) + .AddActivity(normalActivities.GetUniqueTaskQueue) .AddWorkflow()); using var stickyWorker = new TemporalWorker( From 1e6a51172873639a5854eacc0f81bc2eb14c38b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Sat, 11 Nov 2023 21:37:38 -0500 Subject: [PATCH 3/5] move --- .../{NonStickyActivities.cs => NormalActivities.cs} | 0 .../{StickyActivities.cs => WorkerSpecificActivities.cs} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename src/WorkerSpecificTaskQueues/{NonStickyActivities.cs => NormalActivities.cs} (100%) rename src/WorkerSpecificTaskQueues/{StickyActivities.cs => WorkerSpecificActivities.cs} (100%) diff --git a/src/WorkerSpecificTaskQueues/NonStickyActivities.cs b/src/WorkerSpecificTaskQueues/NormalActivities.cs similarity index 100% rename from src/WorkerSpecificTaskQueues/NonStickyActivities.cs rename to src/WorkerSpecificTaskQueues/NormalActivities.cs diff --git a/src/WorkerSpecificTaskQueues/StickyActivities.cs b/src/WorkerSpecificTaskQueues/WorkerSpecificActivities.cs similarity index 100% rename from src/WorkerSpecificTaskQueues/StickyActivities.cs rename to src/WorkerSpecificTaskQueues/WorkerSpecificActivities.cs From 4cd264c61aea50d675c34bcb10a2e55d20874c34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Sat, 11 Nov 2023 21:58:38 -0500 Subject: [PATCH 4/5] Rename Sticky Activities sample --- TemporalioSamples.sln | 2 +- .../FileProcessingWorkflow.workflow.cs | 8 ++--- .../NormalActivities.cs | 2 +- src/WorkerSpecificTaskQueues/Program.cs | 18 +++++------ src/WorkerSpecificTaskQueues/README.md | 30 +++++++++++++------ ...oSamples.WorkerSpecificTasksQueues.csproj} | 0 .../WorkerSpecificActivities.cs | 4 +-- 7 files changed, 38 insertions(+), 26 deletions(-) rename src/WorkerSpecificTaskQueues/{TemporalioSamples.ActivityStickyQueues.csproj => TemporalioSamples.WorkerSpecificTasksQueues.csproj} (100%) diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index 6331d68..1fc061f 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -29,7 +29,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Encryptio EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ClientMtls", "src\ClientMtls\TemporalioSamples.ClientMtls.csproj", "{D2A3546F-2462-4B86-8B5E-999505483A2D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ActivityStickyQueues", "src\ActivityStickyQueues\TemporalioSamples.ActivityStickyQueues.csproj", "{974CCD5E-0254-4C85-9618-8CD014A2734F}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkerSpecificTaskQueues", "src\WorkerSpecificTaskQueues\TemporalioSamples.WorkerSpecificTaskQueues.csproj", "{974CCD5E-0254-4C85-9618-8CD014A2734F}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Schedules", "src\Schedules\TemporalioSamples.Schedules.csproj", "{297A58BE-3959-4525-A329-222B3575139D}" EndProject diff --git a/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs b/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs index a538127..2a379cb 100644 --- a/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs +++ b/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs @@ -1,6 +1,6 @@ using Temporalio.Workflows; -namespace TemporalioSamples.ActivityStickyQueues; +namespace TemporalioSamples.WorkerSpecificTaskQueues; [Workflow] public class FileProcessingWorkflow @@ -13,7 +13,7 @@ public async Task RunAsync(int maxAttempts = 5) new() { StartToCloseTimeout = TimeSpan.FromMinutes(1) }); var downloadPath = await Workflow.ExecuteActivityAsync( - () => StickyActivities.DownloadFileToWorkerFileSystemAsync("https://temporal.io"), + () => WorkerSpecificActivities.DownloadFileToWorkerFileSystemAsync("https://temporal.io"), new() { TaskQueue = uniqueWorkerTaskQueue, @@ -26,7 +26,7 @@ public async Task RunAsync(int maxAttempts = 5) }); await Workflow.ExecuteActivityAsync( - () => StickyActivities.WorkOnFileInWorkerFileSystemAsync(downloadPath), + () => WorkerSpecificActivities.WorkOnFileInWorkerFileSystemAsync(downloadPath), new() { TaskQueue = uniqueWorkerTaskQueue, @@ -35,7 +35,7 @@ await Workflow.ExecuteActivityAsync( }); await Workflow.ExecuteActivityAsync( - () => StickyActivities.CleanupFileFromWorkerFileSystemAsync(downloadPath), + () => WorkerSpecificActivities.CleanupFileFromWorkerFileSystemAsync(downloadPath), new() { TaskQueue = uniqueWorkerTaskQueue, diff --git a/src/WorkerSpecificTaskQueues/NormalActivities.cs b/src/WorkerSpecificTaskQueues/NormalActivities.cs index 3191d5e..c4a9f63 100644 --- a/src/WorkerSpecificTaskQueues/NormalActivities.cs +++ b/src/WorkerSpecificTaskQueues/NormalActivities.cs @@ -1,6 +1,6 @@ using Temporalio.Activities; -namespace TemporalioSamples.ActivityStickyQueues; +namespace TemporalioSamples.WorkerSpecificTaskQueues; public class NormalActivities { diff --git a/src/WorkerSpecificTaskQueues/Program.cs b/src/WorkerSpecificTaskQueues/Program.cs index 602c13b..9ca9f9e 100644 --- a/src/WorkerSpecificTaskQueues/Program.cs +++ b/src/WorkerSpecificTaskQueues/Program.cs @@ -1,7 +1,7 @@ using Microsoft.Extensions.Logging; using Temporalio.Client; using Temporalio.Worker; -using TemporalioSamples.ActivityStickyQueues; +using TemporalioSamples.WorkerSpecificTaskQueues; // Create a client to localhost on default namespace var client = await TemporalClient.ConnectAsync(new("localhost:7233") @@ -29,20 +29,20 @@ async Task RunWorkerAsync() // Run worker until cancelled Console.WriteLine("Running worker"); - using var nonStickyWorker = new TemporalWorker( + using var normalWorker = new TemporalWorker( client, - new TemporalWorkerOptions(taskQueue: "activity-sticky-queues-sample") + new TemporalWorkerOptions(taskQueue: "worker-specific-task-queues-sample") .AddActivity(normalActivities.GetUniqueTaskQueue) .AddWorkflow()); - using var stickyWorker = new TemporalWorker( + using var uniqueTaskQueueWorker = new TemporalWorker( client, new TemporalWorkerOptions(taskQueue: uniqueWorkerTaskQueue) - .AddActivity(StickyActivities.DownloadFileToWorkerFileSystemAsync) - .AddActivity(StickyActivities.CleanupFileFromWorkerFileSystemAsync) - .AddActivity(StickyActivities.WorkOnFileInWorkerFileSystemAsync)); + .AddActivity(WorkerSpecificActivities.DownloadFileToWorkerFileSystemAsync) + .AddActivity(WorkerSpecificActivities.CleanupFileFromWorkerFileSystemAsync) + .AddActivity(WorkerSpecificActivities.WorkOnFileInWorkerFileSystemAsync)); - var tasks = new List { nonStickyWorker.ExecuteAsync(tokenSource.Token), stickyWorker.ExecuteAsync(tokenSource.Token) }; + var tasks = new List { normalWorker.ExecuteAsync(tokenSource.Token), uniqueTaskQueueWorker.ExecuteAsync(tokenSource.Token) }; var task = await Task.WhenAny(tasks); if (task.Exception is not null) @@ -69,7 +69,7 @@ async Task ExecuteWorkflowAsync() Console.WriteLine("Executing workflow"); await client.ExecuteWorkflowAsync( (FileProcessingWorkflow wf) => wf.RunAsync(5), - new(id: "file-processing-0", taskQueue: "activity-sticky-queues-sample")); + new(id: "file-processing-0", taskQueue: "worker-specific-task-queues-sample")); } switch (args.ElementAtOrDefault(0)) diff --git a/src/WorkerSpecificTaskQueues/README.md b/src/WorkerSpecificTaskQueues/README.md index 6628b49..719ba2c 100644 --- a/src/WorkerSpecificTaskQueues/README.md +++ b/src/WorkerSpecificTaskQueues/README.md @@ -1,14 +1,16 @@ -# Sticky Activity Queues +# Worker-Specific Task Queues -This sample shows how to have [Sticky Execution](https://docs.temporal.io/tasks/#sticky-execution): using a unique task queue per Worker to have certain activities only run on that specific Worker. +Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker. -The strategy is: +This is useful in scenarios where multiple Activities need to run in the same process or on the same host, for example to share memory or disk. This sample has a file processing Workflow, where one Activity downloads the file to disk and other Activities process it and clean it up. -- Create a `GetUniqueTaskQueue` activity that generates a unique task queue name, `uniqueWorkerTaskQueue`. -- It doesn't matter where this activity is run, so it can be "non sticky" as per Temporal default behavior. -- In this demo, `uniqueWorkerTaskQueue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). -- For activities intended to be "sticky", only register them in one Worker, and have that be the only Worker listening on that `uniqueWorkerTaskQueue`. -- Execute workflows from the Client like normal. +This strategy is: + +- Each Worker process runs two `TemporalWorker`s: + - One `TemporalWorker` listens on the shared `worker-specific-task-queues-sample` Task Queue. + - Another `TemporalWorker` listens on a uniquely generated Task Queue. +- Create a `GetUniqueTaskQueue` Activity that returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**). It doesn't matter where this Activity is run, so it can be executed on the shared Task Queue. In this sample, the unique Task Queue is simply a `uuid`, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). +- The Workflow and the first Activity are run on the shared `worker-specific-task-queues-sample` Task Queue. The rest of the Activities that do the file processing are run on the Worker-specific Task Queue. Activities have been artificially slowed with `await Task.Delay(TimeSpan.FromSeconds(3))` to simulate slow activities. @@ -23,4 +25,14 @@ Then in another terminal, run the workflow from this directory: dotnet run workflow -This will show logs in the worker window of the workflow running. \ No newline at end of file +In the worker terminal, you should see logs like: + +``` +Running worker +[21:57:26] info: Temporalio.Activity:DownloadFileToWorkerFileSystem[0] + Downloading https://temporal.io and saving to path /tmp/tmpD5c6wy.tmp +[21:57:32] info: Temporalio.Activity:WorkOnFileInWorkerFileSystem[0] + Did some work on /tmp/tmpD5c6wy.tmp, checksum: 49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +[21:57:35] info: Temporalio.Activity:CleanupFileFromWorkerFileSystem[0] + Removing /tmp/tmpD5c6wy.tmp +``` \ No newline at end of file diff --git a/src/WorkerSpecificTaskQueues/TemporalioSamples.ActivityStickyQueues.csproj b/src/WorkerSpecificTaskQueues/TemporalioSamples.WorkerSpecificTasksQueues.csproj similarity index 100% rename from src/WorkerSpecificTaskQueues/TemporalioSamples.ActivityStickyQueues.csproj rename to src/WorkerSpecificTaskQueues/TemporalioSamples.WorkerSpecificTasksQueues.csproj diff --git a/src/WorkerSpecificTaskQueues/WorkerSpecificActivities.cs b/src/WorkerSpecificTaskQueues/WorkerSpecificActivities.cs index 56f82af..e13378f 100644 --- a/src/WorkerSpecificTaskQueues/WorkerSpecificActivities.cs +++ b/src/WorkerSpecificTaskQueues/WorkerSpecificActivities.cs @@ -4,9 +4,9 @@ using Microsoft.Extensions.Logging; using Temporalio.Activities; -namespace TemporalioSamples.ActivityStickyQueues; +namespace TemporalioSamples.WorkerSpecificTaskQueues; -public static class StickyActivities +public static class WorkerSpecificActivities { [Activity] #pragma warning disable CA1054 From d4bc65342074cb26b4a3bae59f0f316592c1af45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Sat, 11 Nov 2023 22:02:03 -0500 Subject: [PATCH 5/5] fix name --- ...s.csproj => TemporalioSamples.WorkerSpecificTaskQueues.csproj} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/WorkerSpecificTaskQueues/{TemporalioSamples.WorkerSpecificTasksQueues.csproj => TemporalioSamples.WorkerSpecificTaskQueues.csproj} (100%) diff --git a/src/WorkerSpecificTaskQueues/TemporalioSamples.WorkerSpecificTasksQueues.csproj b/src/WorkerSpecificTaskQueues/TemporalioSamples.WorkerSpecificTaskQueues.csproj similarity index 100% rename from src/WorkerSpecificTaskQueues/TemporalioSamples.WorkerSpecificTasksQueues.csproj rename to src/WorkerSpecificTaskQueues/TemporalioSamples.WorkerSpecificTaskQueues.csproj