Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Sticky Activities sample #41

Merged
merged 5 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ Prerequisites:
<!-- Keep this list in alphabetical order -->
* [ActivityHeartbeatingCancellation](src/ActivityHeartbeatingCancellation) - How to use heartbeating and cancellation handling in an activity.
* [ActivitySimple](src/ActivitySimple) - Simple workflow that runs simple activities.
* [ActivityStickyQueues](src/ActivityStickyQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker.
* [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language.
* [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter.
* [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud.
Expand All @@ -28,6 +27,7 @@ Prerequisites:
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future.
* [WorkerSpecificTaskQueues](src/WorkerSpecificTaskQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker.
* [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code.

## Development
Expand Down
2 changes: 1 addition & 1 deletion TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Encryptio
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ClientMtls", "src\ClientMtls\TemporalioSamples.ClientMtls.csproj", "{D2A3546F-2462-4B86-8B5E-999505483A2D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ActivityStickyQueues", "src\ActivityStickyQueues\TemporalioSamples.ActivityStickyQueues.csproj", "{974CCD5E-0254-4C85-9618-8CD014A2734F}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkerSpecificTaskQueues", "src\WorkerSpecificTaskQueues\TemporalioSamples.WorkerSpecificTaskQueues.csproj", "{974CCD5E-0254-4C85-9618-8CD014A2734F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Schedules", "src\Schedules\TemporalioSamples.Schedules.csproj", "{297A58BE-3959-4525-A329-222B3575139D}"
EndProject
Expand Down
26 changes: 0 additions & 26 deletions src/ActivityStickyQueues/README.md

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using Temporalio.Workflows;

namespace TemporalioSamples.ActivityStickyQueues;
namespace TemporalioSamples.WorkerSpecificTaskQueues;

[Workflow]
public class FileProcessingWorkflow
Expand All @@ -9,11 +9,11 @@ public class FileProcessingWorkflow
public async Task RunAsync(int maxAttempts = 5)
{
var uniqueWorkerTaskQueue = await Workflow.ExecuteActivityAsync(
(NonStickyActivities act) => act.GetUniqueTaskQueue(),
(NormalActivities act) => act.GetUniqueTaskQueue(),
new() { StartToCloseTimeout = TimeSpan.FromMinutes(1) });

var downloadPath = await Workflow.ExecuteActivityAsync(
() => StickyActivities.DownloadFileToWorkerFileSystemAsync("https://temporal.io"),
() => WorkerSpecificActivities.DownloadFileToWorkerFileSystemAsync("https://temporal.io"),
new()
{
TaskQueue = uniqueWorkerTaskQueue,
Expand All @@ -26,7 +26,7 @@ public async Task RunAsync(int maxAttempts = 5)
});

await Workflow.ExecuteActivityAsync(
() => StickyActivities.WorkOnFileInWorkerFileSystemAsync(downloadPath),
() => WorkerSpecificActivities.WorkOnFileInWorkerFileSystemAsync(downloadPath),
new()
{
TaskQueue = uniqueWorkerTaskQueue,
Expand All @@ -35,7 +35,7 @@ await Workflow.ExecuteActivityAsync(
});

await Workflow.ExecuteActivityAsync(
() => StickyActivities.CleanupFileFromWorkerFileSystemAsync(downloadPath),
() => WorkerSpecificActivities.CleanupFileFromWorkerFileSystemAsync(downloadPath),
new()
{
TaskQueue = uniqueWorkerTaskQueue,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using Temporalio.Activities;

namespace TemporalioSamples.ActivityStickyQueues;
namespace TemporalioSamples.WorkerSpecificTaskQueues;

public class NonStickyActivities
public class NormalActivities
{
private readonly string uniqueWorkerTaskQueue;

public NonStickyActivities(string uniqueWorkerTaskQueue) =>
public NormalActivities(string uniqueWorkerTaskQueue) =>
this.uniqueWorkerTaskQueue = uniqueWorkerTaskQueue;

[Activity]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Worker;
using TemporalioSamples.ActivityStickyQueues;
using TemporalioSamples.WorkerSpecificTaskQueues;

// Create a client to localhost on default namespace
var client = await TemporalClient.ConnectAsync(new("localhost:7233")
Expand All @@ -24,25 +24,25 @@ async Task RunWorkerAsync()

var uniqueWorkerTaskQueue = Guid.NewGuid().ToString();

var nonStickyActivities = new NonStickyActivities(uniqueWorkerTaskQueue);
var normalActivities = new NormalActivities(uniqueWorkerTaskQueue);

// Run worker until cancelled
Console.WriteLine("Running worker");

using var nonStickyWorker = new TemporalWorker(
using var normalWorker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue: "activity-sticky-queues-sample")
.AddActivity(nonStickyActivities.GetUniqueTaskQueue)
new TemporalWorkerOptions(taskQueue: "worker-specific-task-queues-sample")
.AddActivity(normalActivities.GetUniqueTaskQueue)
.AddWorkflow<FileProcessingWorkflow>());

using var stickyWorker = new TemporalWorker(
using var uniqueTaskQueueWorker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue: uniqueWorkerTaskQueue)
.AddActivity(StickyActivities.DownloadFileToWorkerFileSystemAsync)
.AddActivity(StickyActivities.CleanupFileFromWorkerFileSystemAsync)
.AddActivity(StickyActivities.WorkOnFileInWorkerFileSystemAsync));
.AddActivity(WorkerSpecificActivities.DownloadFileToWorkerFileSystemAsync)
.AddActivity(WorkerSpecificActivities.CleanupFileFromWorkerFileSystemAsync)
.AddActivity(WorkerSpecificActivities.WorkOnFileInWorkerFileSystemAsync));

var tasks = new List<Task> { nonStickyWorker.ExecuteAsync(tokenSource.Token), stickyWorker.ExecuteAsync(tokenSource.Token) };
var tasks = new List<Task> { normalWorker.ExecuteAsync(tokenSource.Token), uniqueTaskQueueWorker.ExecuteAsync(tokenSource.Token) };

var task = await Task.WhenAny(tasks);
if (task.Exception is not null)
Expand All @@ -69,7 +69,7 @@ async Task ExecuteWorkflowAsync()
Console.WriteLine("Executing workflow");
await client.ExecuteWorkflowAsync(
(FileProcessingWorkflow wf) => wf.RunAsync(5),
new(id: "file-processing-0", taskQueue: "activity-sticky-queues-sample"));
new(id: "file-processing-0", taskQueue: "worker-specific-task-queues-sample"));
}

switch (args.ElementAtOrDefault(0))
Expand Down
38 changes: 38 additions & 0 deletions src/WorkerSpecificTaskQueues/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Worker-Specific Task Queues

Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker.

This is useful in scenarios where multiple Activities need to run in the same process or on the same host, for example to share memory or disk. This sample has a file processing Workflow, where one Activity downloads the file to disk and other Activities process it and clean it up.

This strategy is:

- Each Worker process runs two `TemporalWorker`s:
- One `TemporalWorker` listens on the shared `worker-specific-task-queues-sample` Task Queue.
- Another `TemporalWorker` listens on a uniquely generated Task Queue.
- Create a `GetUniqueTaskQueue` Activity that returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**). It doesn't matter where this Activity is run, so it can be executed on the shared Task Queue. In this sample, the unique Task Queue is simply a `uuid`, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045).
- The Workflow and the first Activity are run on the shared `worker-specific-task-queues-sample` Task Queue. The rest of the Activities that do the file processing are run on the Worker-specific Task Queue.

Activities have been artificially slowed with `await Task.Delay(TimeSpan.FromSeconds(3))` to simulate slow activities.

### Running this sample

To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory
in a separate terminal to start the worker:

dotnet run worker

Then in another terminal, run the workflow from this directory:

dotnet run workflow

In the worker terminal, you should see logs like:

```
Running worker
[21:57:26] info: Temporalio.Activity:DownloadFileToWorkerFileSystem[0]
Downloading https://temporal.io and saving to path /tmp/tmpD5c6wy.tmp
[21:57:32] info: Temporalio.Activity:WorkOnFileInWorkerFileSystem[0]
Did some work on /tmp/tmpD5c6wy.tmp, checksum: 49d7419e6cba3575b3158f62d053f922aa08b23c64f05411cda3213b56c84ba4
[21:57:35] info: Temporalio.Activity:CleanupFileFromWorkerFileSystem[0]
Removing /tmp/tmpD5c6wy.tmp
```
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
using Microsoft.Extensions.Logging;
using Temporalio.Activities;

namespace TemporalioSamples.ActivityStickyQueues;
namespace TemporalioSamples.WorkerSpecificTaskQueues;

public static class StickyActivities
public static class WorkerSpecificActivities
{
[Activity]
#pragma warning disable CA1054
Expand Down
Loading