diff --git a/README.md b/README.md index f2957e9..3b5c118 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,6 @@ Prerequisites: * [ActivityHeartbeatingCancellation](src/ActivityHeartbeatingCancellation) - How to use heartbeating and cancellation handling in an activity. * [ActivitySimple](src/ActivitySimple) - Simple workflow that runs simple activities. -* [ActivityStickyQueues](src/ActivityStickyQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker. * [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language. * [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter. * [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud. @@ -28,6 +27,7 @@ Prerequisites: * [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource. * [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future. +* [WorkerSpecificTaskQueues](src/WorkerSpecificTaskQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker. * [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code. ## Development diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index 6331d68..1fc061f 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -29,7 +29,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Encryptio EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ClientMtls", "src\ClientMtls\TemporalioSamples.ClientMtls.csproj", "{D2A3546F-2462-4B86-8B5E-999505483A2D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ActivityStickyQueues", "src\ActivityStickyQueues\TemporalioSamples.ActivityStickyQueues.csproj", "{974CCD5E-0254-4C85-9618-8CD014A2734F}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkerSpecificTaskQueues", "src\WorkerSpecificTaskQueues\TemporalioSamples.WorkerSpecificTaskQueues.csproj", "{974CCD5E-0254-4C85-9618-8CD014A2734F}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Schedules", "src\Schedules\TemporalioSamples.Schedules.csproj", "{297A58BE-3959-4525-A329-222B3575139D}" EndProject diff --git a/src/ActivityStickyQueues/README.md b/src/ActivityStickyQueues/README.md deleted file mode 100644 index 6628b49..0000000 --- a/src/ActivityStickyQueues/README.md +++ /dev/null @@ -1,26 +0,0 @@ -# Sticky Activity Queues - -This sample shows how to have [Sticky Execution](https://docs.temporal.io/tasks/#sticky-execution): using a unique task queue per Worker to have certain activities only run on that specific Worker. - -The strategy is: - -- Create a `GetUniqueTaskQueue` activity that generates a unique task queue name, `uniqueWorkerTaskQueue`. -- It doesn't matter where this activity is run, so it can be "non sticky" as per Temporal default behavior. -- In this demo, `uniqueWorkerTaskQueue` is simply a `uuid` initialized in the Worker, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). -- For activities intended to be "sticky", only register them in one Worker, and have that be the only Worker listening on that `uniqueWorkerTaskQueue`. -- Execute workflows from the Client like normal. - -Activities have been artificially slowed with `await Task.Delay(TimeSpan.FromSeconds(3))` to simulate slow activities. - -### Running this sample - -To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory -in a separate terminal to start the worker: - - dotnet run worker - -Then in another terminal, run the workflow from this directory: - - dotnet run workflow - -This will show logs in the worker window of the workflow running. \ No newline at end of file diff --git a/src/ActivityStickyQueues/FileProcessingWorkflow.workflow.cs b/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs similarity index 77% rename from src/ActivityStickyQueues/FileProcessingWorkflow.workflow.cs rename to src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs index ea13baf..2a379cb 100644 --- a/src/ActivityStickyQueues/FileProcessingWorkflow.workflow.cs +++ b/src/WorkerSpecificTaskQueues/FileProcessingWorkflow.workflow.cs @@ -1,6 +1,6 @@ using Temporalio.Workflows; -namespace TemporalioSamples.ActivityStickyQueues; +namespace TemporalioSamples.WorkerSpecificTaskQueues; [Workflow] public class FileProcessingWorkflow @@ -9,11 +9,11 @@ public class FileProcessingWorkflow public async Task RunAsync(int maxAttempts = 5) { var uniqueWorkerTaskQueue = await Workflow.ExecuteActivityAsync( - (NonStickyActivities act) => act.GetUniqueTaskQueue(), + (NormalActivities act) => act.GetUniqueTaskQueue(), new() { StartToCloseTimeout = TimeSpan.FromMinutes(1) }); var downloadPath = await Workflow.ExecuteActivityAsync( - () => StickyActivities.DownloadFileToWorkerFileSystemAsync("https://temporal.io"), + () => WorkerSpecificActivities.DownloadFileToWorkerFileSystemAsync("https://temporal.io"), new() { TaskQueue = uniqueWorkerTaskQueue, @@ -26,7 +26,7 @@ public async Task RunAsync(int maxAttempts = 5) }); await Workflow.ExecuteActivityAsync( - () => StickyActivities.WorkOnFileInWorkerFileSystemAsync(downloadPath), + () => WorkerSpecificActivities.WorkOnFileInWorkerFileSystemAsync(downloadPath), new() { TaskQueue = uniqueWorkerTaskQueue, @@ -35,7 +35,7 @@ await Workflow.ExecuteActivityAsync( }); await Workflow.ExecuteActivityAsync( - () => StickyActivities.CleanupFileFromWorkerFileSystemAsync(downloadPath), + () => WorkerSpecificActivities.CleanupFileFromWorkerFileSystemAsync(downloadPath), new() { TaskQueue = uniqueWorkerTaskQueue, diff --git a/src/ActivityStickyQueues/NonStickyActivities.cs b/src/WorkerSpecificTaskQueues/NormalActivities.cs similarity index 66% rename from src/ActivityStickyQueues/NonStickyActivities.cs rename to src/WorkerSpecificTaskQueues/NormalActivities.cs index 1254efb..c4a9f63 100644 --- a/src/ActivityStickyQueues/NonStickyActivities.cs +++ b/src/WorkerSpecificTaskQueues/NormalActivities.cs @@ -1,12 +1,12 @@ using Temporalio.Activities; -namespace TemporalioSamples.ActivityStickyQueues; +namespace TemporalioSamples.WorkerSpecificTaskQueues; -public class NonStickyActivities +public class NormalActivities { private readonly string uniqueWorkerTaskQueue; - public NonStickyActivities(string uniqueWorkerTaskQueue) => + public NormalActivities(string uniqueWorkerTaskQueue) => this.uniqueWorkerTaskQueue = uniqueWorkerTaskQueue; [Activity] diff --git a/src/ActivityStickyQueues/Program.cs b/src/WorkerSpecificTaskQueues/Program.cs similarity index 67% rename from src/ActivityStickyQueues/Program.cs rename to src/WorkerSpecificTaskQueues/Program.cs index c8b0905..9ca9f9e 100644 --- a/src/ActivityStickyQueues/Program.cs +++ b/src/WorkerSpecificTaskQueues/Program.cs @@ -1,7 +1,7 @@ using Microsoft.Extensions.Logging; using Temporalio.Client; using Temporalio.Worker; -using TemporalioSamples.ActivityStickyQueues; +using TemporalioSamples.WorkerSpecificTaskQueues; // Create a client to localhost on default namespace var client = await TemporalClient.ConnectAsync(new("localhost:7233") @@ -24,25 +24,25 @@ async Task RunWorkerAsync() var uniqueWorkerTaskQueue = Guid.NewGuid().ToString(); - var nonStickyActivities = new NonStickyActivities(uniqueWorkerTaskQueue); + var normalActivities = new NormalActivities(uniqueWorkerTaskQueue); // Run worker until cancelled Console.WriteLine("Running worker"); - using var nonStickyWorker = new TemporalWorker( + using var normalWorker = new TemporalWorker( client, - new TemporalWorkerOptions(taskQueue: "activity-sticky-queues-sample") - .AddActivity(nonStickyActivities.GetUniqueTaskQueue) + new TemporalWorkerOptions(taskQueue: "worker-specific-task-queues-sample") + .AddActivity(normalActivities.GetUniqueTaskQueue) .AddWorkflow()); - using var stickyWorker = new TemporalWorker( + using var uniqueTaskQueueWorker = new TemporalWorker( client, new TemporalWorkerOptions(taskQueue: uniqueWorkerTaskQueue) - .AddActivity(StickyActivities.DownloadFileToWorkerFileSystemAsync) - .AddActivity(StickyActivities.CleanupFileFromWorkerFileSystemAsync) - .AddActivity(StickyActivities.WorkOnFileInWorkerFileSystemAsync)); + .AddActivity(WorkerSpecificActivities.DownloadFileToWorkerFileSystemAsync) + .AddActivity(WorkerSpecificActivities.CleanupFileFromWorkerFileSystemAsync) + .AddActivity(WorkerSpecificActivities.WorkOnFileInWorkerFileSystemAsync)); - var tasks = new List { nonStickyWorker.ExecuteAsync(tokenSource.Token), stickyWorker.ExecuteAsync(tokenSource.Token) }; + var tasks = new List { normalWorker.ExecuteAsync(tokenSource.Token), uniqueTaskQueueWorker.ExecuteAsync(tokenSource.Token) }; var task = await Task.WhenAny(tasks); if (task.Exception is not null) @@ -69,7 +69,7 @@ async Task ExecuteWorkflowAsync() Console.WriteLine("Executing workflow"); await client.ExecuteWorkflowAsync( (FileProcessingWorkflow wf) => wf.RunAsync(5), - new(id: "file-processing-0", taskQueue: "activity-sticky-queues-sample")); + new(id: "file-processing-0", taskQueue: "worker-specific-task-queues-sample")); } switch (args.ElementAtOrDefault(0)) diff --git a/src/WorkerSpecificTaskQueues/README.md b/src/WorkerSpecificTaskQueues/README.md new file mode 100644 index 0000000..719ba2c --- /dev/null +++ b/src/WorkerSpecificTaskQueues/README.md @@ -0,0 +1,38 @@ +# Worker-Specific Task Queues + +Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker. + +This is useful in scenarios where multiple Activities need to run in the same process or on the same host, for example to share memory or disk. This sample has a file processing Workflow, where one Activity downloads the file to disk and other Activities process it and clean it up. + +This strategy is: + +- Each Worker process runs two `TemporalWorker`s: + - One `TemporalWorker` listens on the shared `worker-specific-task-queues-sample` Task Queue. + - Another `TemporalWorker` listens on a uniquely generated Task Queue. +- Create a `GetUniqueTaskQueue` Activity that returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**). It doesn't matter where this Activity is run, so it can be executed on the shared Task Queue. In this sample, the unique Task Queue is simply a `uuid`, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045). +- The Workflow and the first Activity are run on the shared `worker-specific-task-queues-sample` Task Queue. The rest of the Activities that do the file processing are run on the Worker-specific Task Queue. + +Activities have been artificially slowed with `await Task.Delay(TimeSpan.FromSeconds(3))` to simulate slow activities. + +### Running this sample + +To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory +in a separate terminal to start the worker: + + dotnet run worker + +Then in another terminal, run the workflow from this directory: + + dotnet run workflow + +In the worker terminal, you should see logs like: + +``` +Running worker +[21:57:26] info: Temporalio.Activity:DownloadFileToWorkerFileSystem[0] + Downloading https://temporal.io and saving to path /tmp/tmpD5c6wy.tmp +[21:57:32] info: Temporalio.Activity:WorkOnFileInWorkerFileSystem[0] + Did some work on /tmp/tmpD5c6wy.tmp, checksum: 49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4 +[21:57:35] info: Temporalio.Activity:CleanupFileFromWorkerFileSystem[0] + Removing /tmp/tmpD5c6wy.tmp +``` \ No newline at end of file diff --git a/src/ActivityStickyQueues/TemporalioSamples.ActivityStickyQueues.csproj b/src/WorkerSpecificTaskQueues/TemporalioSamples.WorkerSpecificTaskQueues.csproj similarity index 100% rename from src/ActivityStickyQueues/TemporalioSamples.ActivityStickyQueues.csproj rename to src/WorkerSpecificTaskQueues/TemporalioSamples.WorkerSpecificTaskQueues.csproj diff --git a/src/ActivityStickyQueues/StickyActivities.cs b/src/WorkerSpecificTaskQueues/WorkerSpecificActivities.cs similarity index 94% rename from src/ActivityStickyQueues/StickyActivities.cs rename to src/WorkerSpecificTaskQueues/WorkerSpecificActivities.cs index 56f82af..e13378f 100644 --- a/src/ActivityStickyQueues/StickyActivities.cs +++ b/src/WorkerSpecificTaskQueues/WorkerSpecificActivities.cs @@ -4,9 +4,9 @@ using Microsoft.Extensions.Logging; using Temporalio.Activities; -namespace TemporalioSamples.ActivityStickyQueues; +namespace TemporalioSamples.WorkerSpecificTaskQueues; -public static class StickyActivities +public static class WorkerSpecificActivities { [Activity] #pragma warning disable CA1054