Skip to content

Commit

Permalink
Merge branch 'main' into CounterInterceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
rross authored Aug 15, 2024
2 parents d548089 + 175feb5 commit d113bd3
Show file tree
Hide file tree
Showing 14 changed files with 534 additions and 16 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ dotnet_diagnostic.SA1515.severity = none

# Do not require XML doc in samples
dotnet_diagnostic.SA1600.severity = none
dotnet_diagnostic.SA1602.severity = none

# Do not require file header
dotnet_diagnostic.SA1633.severity = none
Expand Down
8 changes: 4 additions & 4 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Temporalio" Version="1.1.2" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.1.2" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.1.2" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.1.2" />
<PackageReference Include="Temporalio" Version="1.2.0" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.2.0" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.2.0" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.2.0" />
<!--
Can also reference the SDK downloaded to a local directory:
<ProjectReference Include="$(MSBuildThisFileDirectory)..\temporal-sdk-dotnet\src\Temporalio\Temporalio.csproj" />
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Prerequisites:
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [SafeMessageHandlers](src/SafeMessageHandlers) - Use `Semaphore` to ensure operations are atomically processed in a workflow.
* [Saga](src/Saga) - Demonstrates how to implement a saga pattern.
* [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future.
* [SignalsQueries](src/SignalsQueries) - A loyalty program using Signals and Queries.
Expand Down
44 changes: 44 additions & 0 deletions src/SafeMessageHandlers/ClusterManagerActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
namespace TemporalioSamples.SafeMessageHandlers;

using Microsoft.Extensions.Logging;
using Temporalio.Activities;

public class ClusterManagerActivities
{
public record AllocateNodesToJobInput(
IList<string> Nodes,
string JobName);

[Activity]
public async Task AllocateNodesToJobAsync(AllocateNodesToJobInput input)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Assigning nodes {Nodes} to job {TaskName}", input.Nodes, input.JobName);
await Task.Delay(100);
}

public record DeallocateNodesFromJobInput(
IList<string> Nodes,
string JobName);

[Activity]
public async Task DeallocateNodesFromJobAsync(DeallocateNodesFromJobInput input)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Deallocating nodes {Nodes} from job {TaskName}", input.Nodes, input.JobName);
await Task.Delay(100);
}

public record FindBadNodesInput(
IList<string> Nodes);

[Activity]
public async Task<List<string>> FindBadNodesAsync(FindBadNodesInput input)
{
await Task.Delay(100);
return input.Nodes.
Select((node, index) => index % 5 == 0 ? null : node).
OfType<string>().
ToList();
}
}
209 changes: 209 additions & 0 deletions src/SafeMessageHandlers/ClusterManagerWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
namespace TemporalioSamples.SafeMessageHandlers;

using Microsoft.Extensions.Logging;
using Temporalio.Exceptions;
using Temporalio.Workflows;

[Workflow]
public class ClusterManagerWorkflow
{
public record State
{
public bool ClusterStarted { get; set; }

public bool ClusterShutdown { get; set; }

public IDictionary<string, string?> Nodes { get; init; } = new Dictionary<string, string?>();

public int MaxAssignedNodes { get; set; }
}

public record Input
{
public State State { get; init; } = new();

public bool TestContinueAsNew { get; init; }
}

public record Result(
int MaxAssignedNodes,
int NumAssignedNodes);

private readonly Semaphore nodesLock = new(1);
private readonly int maxHistoryLength;
private readonly TimeSpan sleepInterval;

[WorkflowInit]
public ClusterManagerWorkflow(Input input)
{
CurrentState = input.State;
maxHistoryLength = input.TestContinueAsNew ? 120 : int.MaxValue;
sleepInterval = TimeSpan.FromSeconds(input.TestContinueAsNew ? 1 : 600);
}

[WorkflowQuery]
public State CurrentState { get; init; }

[WorkflowRun]
public async Task<Result> RunAsync(Input input)
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);

// Perform health checks at intervals
do
{
await PerformHealthChecksAsync();
await Workflow.WaitConditionAsync(
() => CurrentState.ClusterShutdown || ShouldContinueAsNew,
sleepInterval);

// Continue as new if needed
if (ShouldContinueAsNew)
{
Workflow.Logger.LogInformation("Continuing as new");
throw Workflow.CreateContinueAsNewException((ClusterManagerWorkflow wf) => wf.RunAsync(new()
{
State = CurrentState,
TestContinueAsNew = input.TestContinueAsNew,
}));
}
}
while (!CurrentState.ClusterShutdown);
return new(CurrentState.MaxAssignedNodes, NumAssignedNodes);
}

[WorkflowSignal]
public async Task StartClusterAsync()
{
CurrentState.ClusterStarted = true;
foreach (var node in Enumerable.Range(0, 25))
{
CurrentState.Nodes[$"{node}"] = null;
}
Workflow.Logger.LogInformation("Cluster started");
}

[WorkflowSignal]
public async Task ShutdownClusterAsync()
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
CurrentState.ClusterShutdown = true;
Workflow.Logger.LogInformation("Cluster shut down");
}

public record AllocateNodesToJobInput(int NumNodes, string JobName);

[WorkflowUpdate]
public async Task<List<string>> AllocateNodesToJobAsync(AllocateNodesToJobInput input)
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
if (CurrentState.ClusterShutdown)
{
throw new ApplicationFailureException(
"Cannot allocate nodes to a job, cluster is already shut down");
}
await nodesLock.WaitAsync();
try
{
var unassignedNodes = CurrentState.Nodes.
Where(kvp => kvp.Value == null).
Select(kvp => kvp.Key).
ToList();
if (unassignedNodes.Count < input.NumNodes)
{
throw new ApplicationFailureException(
$"Cannot allocate {input.NumNodes} nodes, have only {unassignedNodes.Count} available");
}
var assignedNodes = unassignedNodes[..input.NumNodes];
// This await would be dangerous without nodesLock because it yields control and allows
// interleaving
await Workflow.ExecuteActivityAsync(
(ClusterManagerActivities acts) => acts.AllocateNodesToJobAsync(new(assignedNodes, input.JobName)),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
foreach (var node in assignedNodes)
{
CurrentState.Nodes[node] = input.JobName;
}
CurrentState.MaxAssignedNodes = int.Max(CurrentState.MaxAssignedNodes, NumAssignedNodes);
return assignedNodes;
}
finally
{
nodesLock.Release();
}
}

public record DeleteJobInput(string JobName);

[WorkflowUpdate]
public async Task DeleteJobAsync(DeleteJobInput input)
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
if (CurrentState.ClusterShutdown)
{
throw new ApplicationFailureException(
"Cannot delete job, cluster is already shut down");
}
await nodesLock.WaitAsync();
try
{
var toUnassign = CurrentState.Nodes.
Where(kvp => kvp.Value == input.JobName).
Select(kvp => kvp.Key).
ToList();
// This await would be dangerous without nodesLock because it yields control and allows
// interleaving
await Workflow.ExecuteActivityAsync(
(ClusterManagerActivities acts) => acts.DeallocateNodesFromJobAsync(new(toUnassign, input.JobName)),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
foreach (var node in toUnassign)
{
CurrentState.Nodes[node] = null;
}
}
finally
{
nodesLock.Release();
}
}

private int NumAssignedNodes =>
CurrentState.Nodes.Count(kvp => kvp.Value is { } val && val != "BAD!");

private bool ShouldContinueAsNew =>
// Don't continue as new while update running
nodesLock.CurrentCount > 0 &&
// Continue if suggested or, for ease of testing, max history reached
(Workflow.ContinueAsNewSuggested || Workflow.CurrentHistoryLength > maxHistoryLength);

private async Task PerformHealthChecksAsync()
{
await nodesLock.WaitAsync();
try
{
// Find bad nodes from the set of non-bad ones. This await would be dangerous without
// nodesLock because it yields control and allows interleaving.
var assignedNodes = CurrentState.Nodes.
Where(kvp => kvp.Value is { } val && val != "BAD!").
Select(kvp => kvp.Value!).
ToList();
var badNodes = await Workflow.ExecuteActivityAsync(
(ClusterManagerActivities acts) => acts.FindBadNodesAsync(new(assignedNodes)),
new()
{
StartToCloseTimeout = TimeSpan.FromSeconds(10),
// This health check is optional, and our lock would block the whole workflow if
// we let it retry forever
RetryPolicy = new() { MaximumAttempts = 1 },
});
foreach (var node in badNodes)
{
CurrentState.Nodes[node] = "BAD!";
}
}
finally
{
nodesLock.Release();
}
}
}
101 changes: 101 additions & 0 deletions src/SafeMessageHandlers/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
using Microsoft.Extensions.Logging;
using Temporalio.Api.Enums.V1;
using Temporalio.Client;
using Temporalio.Worker;
using TemporalioSamples.SafeMessageHandlers;

// Create a client to localhost on default namespace
using var loggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information));
var client = await TemporalClient.ConnectAsync(new("localhost:7233")
{
LoggerFactory = loggerFactory,
});
var logger = loggerFactory.CreateLogger<Program>();

async Task RunWorkerAsync()
{
// Cancellation token cancelled on ctrl+c
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

// Run worker until cancelled
logger.LogInformation("Running worker");
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue: "atomic-message-handlers-sample").
AddAllActivities(new ClusterManagerActivities()).
AddWorkflow<ClusterManagerWorkflow>());
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
logger.LogInformation("Worker cancelled");
}
}

async Task ExecuteWorkflowAsync(bool testContinueAsNew)
{
// Start workflow
var workflowOptions = new WorkflowOptions(
id: "atomic-message-handlers-workflow-id",
taskQueue: "atomic-message-handlers-sample")
{
IdReusePolicy = WorkflowIdReusePolicy.TerminateIfRunning,
};
workflowOptions.SignalWithStart((ClusterManagerWorkflow wf) => wf.StartClusterAsync());
var handle = await client.StartWorkflowAsync(
(ClusterManagerWorkflow wf) => wf.RunAsync(new() { TestContinueAsNew = testContinueAsNew }),
workflowOptions);

// Allocate 2 nodes each to 6 jobs
await Task.WhenAll(Enumerable.Range(0, 6).Select(i =>
handle.ExecuteUpdateAsync(wf => wf.AllocateNodesToJobAsync(
new(2, $"job-{i}")))));

// Wait a bit
await Task.Delay(testContinueAsNew ? 10000 : 1000);

// Delete the jobs
await Task.WhenAll(Enumerable.Range(0, 6).Select(i =>
handle.ExecuteUpdateAsync(wf => wf.DeleteJobAsync(new($"job-{i}")))));

// Shutdown cluster
await handle.SignalAsync(wf => wf.ShutdownClusterAsync());
var result = await handle.GetResultAsync();

logger.LogInformation(
"Cluster shut down successfully. " +
"It peaked at {MaxAssignedNodes} assigned nodes. " +
"It had {NumAssignedNodes} nodes assigned at the end.",
result.MaxAssignedNodes,
result.NumAssignedNodes);
}

switch (args.ElementAtOrDefault(0))
{
case "worker":
if (args.Length > 1)
{
throw new ArgumentException("No extra options allowed for 'worker'");
}
await RunWorkerAsync();
break;
case "workflow":
if (args.Length > 2 || (args.Length == 2 && args[1] != "--test-continue-as-new"))
{
throw new ArgumentException("Only '--test-continue-as-new' option allowed for 'worker'");
}
await ExecuteWorkflowAsync(args.ElementAtOrDefault(1) == "--test-continue-as-new");
break;
default:
throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument");
}
Loading

0 comments on commit d113bd3

Please sign in to comment.