Skip to content

Commit

Permalink
Atomic message handler sample
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Jun 24, 2024
1 parent 1b00616 commit be233c9
Show file tree
Hide file tree
Showing 14 changed files with 539 additions and 12 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ dotnet_diagnostic.SA1515.severity = none

# Do not require XML doc in samples
dotnet_diagnostic.SA1600.severity = none
dotnet_diagnostic.SA1602.severity = none

# Do not require file header
dotnet_diagnostic.SA1633.severity = none
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Prerequisites:
* [ActivityHeartbeatingCancellation](src/ActivityHeartbeatingCancellation) - How to use heartbeating and cancellation handling in an activity.
* [ActivitySimple](src/ActivitySimple) - Simple workflow that runs simple activities.
* [ActivityWorker](src/ActivityWorker) - Use .NET activities from a workflow in another language.
* [AtomicMessageHandlers](src/AtomicMessageHandlers) - Use `SemaphoreSlim` to ensure operations are atomically processed in a workflow.
* [AspNet](src/AspNet) - Demonstration of a generic host worker and an ASP.NET workflow starter.
* [ClientMtls](src/ClientMtls) - How to use client certificate authentication, e.g. for Temporal Cloud.
* [ContextPropagation](src/ContextPropagation) - Context propagation via interceptors.
Expand Down
7 changes: 7 additions & 0 deletions TemporalioSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowU
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.ContextPropagation", "src\ContextPropagation\TemporalioSamples.ContextPropagation.csproj", "{7B797D20-485F-441D-8E71-AF7E315FA9CF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.AtomicMessageHandlers", "src\AtomicMessageHandlers\TemporalioSamples.AtomicMessageHandlers.csproj", "{5E497499-F87C-4DC6-B5DC-F508F31EA172}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -149,6 +151,10 @@ Global
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7B797D20-485F-441D-8E71-AF7E315FA9CF}.Release|Any CPU.Build.0 = Release|Any CPU
{5E497499-F87C-4DC6-B5DC-F508F31EA172}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5E497499-F87C-4DC6-B5DC-F508F31EA172}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5E497499-F87C-4DC6-B5DC-F508F31EA172}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5E497499-F87C-4DC6-B5DC-F508F31EA172}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -178,5 +184,6 @@ Global
{B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{7B797D20-485F-441D-8E71-AF7E315FA9CF} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
{5E497499-F87C-4DC6-B5DC-F508F31EA172} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC}
EndGlobalSection
EndGlobal
44 changes: 44 additions & 0 deletions src/AtomicMessageHandlers/ClusterManagerActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
namespace TemporalioSamples.AtomicMessageHandlers;

using Microsoft.Extensions.Logging;
using Temporalio.Activities;

public class ClusterManagerActivities
{
public record AllocateNodesToJobInput(
IList<string> Nodes,
string JobName);

[Activity]
public async Task AllocateNodesToJobAsync(AllocateNodesToJobInput input)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Assigning nodes {Nodes} to job {TaskName}", input.Nodes, input.JobName);
await Task.Delay(100);
}

public record DeallocateNodesFromJobInput(
IList<string> Nodes,
string JobName);

[Activity]
public async Task DeallocateNodesFromJobAsync(DeallocateNodesFromJobInput input)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Deallocating nodes {Nodes} from job {TaskName}", input.Nodes, input.JobName);
await Task.Delay(100);
}

public record FindBadNodesInput(
IList<string> Nodes);

[Activity]
public async Task<List<string>> FindBadNodesAsync(FindBadNodesInput input)
{
await Task.Delay(100);
return input.Nodes.
Select((node, index) => index % 5 == 0 ? null : node).
OfType<string>().
ToList();
}
}
211 changes: 211 additions & 0 deletions src/AtomicMessageHandlers/ClusterManagerWorkflow.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
namespace TemporalioSamples.AtomicMessageHandlers;

using Microsoft.Extensions.Logging;
using Temporalio.Exceptions;
using Temporalio.Workflows;

[Workflow]
public sealed class ClusterManagerWorkflow : IDisposable
{
public record State
{
public bool ClusterStarted { get; set; }

public bool ClusterShutdown { get; set; }

public IDictionary<string, string?> Nodes { get; init; } = new Dictionary<string, string?>();

public int MaxAssignedNodes { get; set; }
}

public record Input
{
public State State { get; init; } = new();

public bool TestContinueAsNew { get; init; }
}

public record Result(
int MaxAssignedNodes,
int NumAssignedNodes);

private readonly SemaphoreSlim nodesLock = new(1, 1);
private readonly int maxHistoryLength;
private readonly TimeSpan sleepInterval;

[WorkflowInit]
public ClusterManagerWorkflow(Input input)
{
CurrentState = input.State;
maxHistoryLength = input.TestContinueAsNew ? 120 : int.MaxValue;
sleepInterval = TimeSpan.FromSeconds(input.TestContinueAsNew ? 1 : 600);
}

[WorkflowQuery]
public State CurrentState { get; init; }

[WorkflowRun]
public async Task<Result> RunAsync(Input input)
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);

// Perform health checks at intervals
do
{
await PerformHealthChecksAsync();
await Workflow.WaitConditionAsync(
() => CurrentState.ClusterShutdown || ShouldContinueAsNew,
sleepInterval);

// Continue as new if needed
if (ShouldContinueAsNew)
{
Workflow.Logger.LogInformation("Continuing as new");
throw Workflow.CreateContinueAsNewException((ClusterManagerWorkflow wf) => wf.RunAsync(new()
{
State = CurrentState,
TestContinueAsNew = input.TestContinueAsNew,
}));
}
}
while (!CurrentState.ClusterShutdown);
return new(CurrentState.MaxAssignedNodes, NumAssignedNodes);
}

[WorkflowSignal]
public async Task StartClusterAsync()
{
CurrentState.ClusterStarted = true;
foreach (var node in Enumerable.Range(0, 25))
{
CurrentState.Nodes[$"{node}"] = null;
}
Workflow.Logger.LogInformation("Cluster started");
}

[WorkflowSignal]
public async Task ShutdownClusterAsync()
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
CurrentState.ClusterShutdown = true;
Workflow.Logger.LogInformation("Cluster shut down");
}

public record AllocateNodesToJobInput(int NumNodes, string JobName);

[WorkflowUpdate]
public async Task<List<string>> AllocateNodesToJobAsync(AllocateNodesToJobInput input)
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
if (CurrentState.ClusterShutdown)
{
throw new ApplicationFailureException(
"Cannot allocate nodes to a job, cluster is already shut down");
}
await nodesLock.WaitAsync();
try
{
var unassignedNodes = CurrentState.Nodes.
Where(kvp => kvp.Value == null).
Select(kvp => kvp.Key).
ToList();
if (unassignedNodes.Count < input.NumNodes)
{
throw new ApplicationFailureException(
$"Cannot allocate {input.NumNodes} nodes, have only {unassignedNodes.Count} available");
}
var assignedNodes = unassignedNodes[..input.NumNodes];
// This await would be dangerous without nodesLock because it yields control and allows
// interleaving
await Workflow.ExecuteActivityAsync(
(ClusterManagerActivities acts) => acts.AllocateNodesToJobAsync(new(assignedNodes, input.JobName)),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
foreach (var node in assignedNodes)
{
CurrentState.Nodes[node] = input.JobName;
}
CurrentState.MaxAssignedNodes = int.Max(CurrentState.MaxAssignedNodes, NumAssignedNodes);
return assignedNodes;
}
finally
{
nodesLock.Release();
}
}

public record DeleteJobInput(string JobName);

[WorkflowUpdate]
public async Task DeleteJobAsync(DeleteJobInput input)
{
await Workflow.WaitConditionAsync(() => CurrentState.ClusterStarted);
if (CurrentState.ClusterShutdown)
{
throw new ApplicationFailureException(
"Cannot delete job, cluster is already shut down");
}
await nodesLock.WaitAsync();
try
{
var toUnassign = CurrentState.Nodes.
Where(kvp => kvp.Value == input.JobName).
Select(kvp => kvp.Key).
ToList();
// This await would be dangerous without nodesLock because it yields control and allows
// interleaving
await Workflow.ExecuteActivityAsync(
(ClusterManagerActivities acts) => acts.DeallocateNodesFromJobAsync(new(toUnassign, input.JobName)),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
foreach (var node in toUnassign)
{
CurrentState.Nodes[node] = null;
}
}
finally
{
nodesLock.Release();
}
}

public void Dispose() => nodesLock.Dispose();

private int NumAssignedNodes =>
CurrentState.Nodes.Count(kvp => kvp.Value is { } val && val != "BAD!");

private bool ShouldContinueAsNew =>
// Don't continue as new while update running
nodesLock.CurrentCount > 0 &&
// Continue if suggested or, for ease of testing, max history reached
(Workflow.ContinueAsNewSuggested || Workflow.CurrentHistoryLength > maxHistoryLength);

private async Task PerformHealthChecksAsync()
{
await nodesLock.WaitAsync();
try
{
// Find bad nodes from the set of non-bad ones. This await would be dangerous without
// nodesLock because it yields control and allows interleaving.
var assignedNodes = CurrentState.Nodes.
Where(kvp => kvp.Value is { } val && val != "BAD!").
Select(kvp => kvp.Value!).
ToList();
var badNodes = await Workflow.ExecuteActivityAsync(
(ClusterManagerActivities acts) => acts.FindBadNodesAsync(new(assignedNodes)),
new()
{
StartToCloseTimeout = TimeSpan.FromSeconds(10),
// This health check is optional, and our lock would block the whole workflow if
// we let it retry forever
RetryPolicy = new() { MaximumAttempts = 1 },
});
foreach (var node in badNodes)
{
CurrentState.Nodes[node] = "BAD!";
}
}
finally
{
nodesLock.Release();
}
}
}
Loading

0 comments on commit be233c9

Please sign in to comment.