-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
268 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
using Microsoft.Extensions.Logging; | ||
using Temporalio.Activities; | ||
using Temporalio.Exceptions; | ||
|
||
namespace TemporalioSamples.Saga; | ||
|
||
public record TransferDetails(decimal Amount, string FromAmount, string ToAmount, string ReferenceId); | ||
|
||
public static class Activities | ||
{ | ||
[Activity] | ||
public static void Withdraw(TransferDetails d) | ||
{ | ||
ActivityExecutionContext.Current.Logger.LogInformation("Withdrawing {Amount} from account {FromAmount}. ReferenceId: {ReferenceId}", d.Amount, d.FromAmount, d.ReferenceId); | ||
} | ||
|
||
[Activity] | ||
public static void WithdrawCompensation(TransferDetails d) | ||
{ | ||
ActivityExecutionContext.Current.Logger.LogInformation("Withdrawing Compensation {Amount} from account {FromAmount}. ReferenceId: {ReferenceId}", d.Amount, d.FromAmount, d.ReferenceId); | ||
} | ||
|
||
[Activity] | ||
public static void Deposit(TransferDetails d) | ||
{ | ||
ActivityExecutionContext.Current.Logger.LogInformation("Depositing {Amount} into account {ToAmount}. ReferenceId: {ReferenceId}", d.Amount, d.ToAmount, d.ReferenceId); | ||
} | ||
|
||
[Activity] | ||
public static void DepositCompensation(TransferDetails d) | ||
{ | ||
ActivityExecutionContext.Current.Logger.LogInformation("Depositing Compensation {Amount} int account {ToAmount}. ReferenceId: {ReferenceId}", d.Amount, d.ToAmount, d.ReferenceId); | ||
} | ||
|
||
[Activity] | ||
public static void StepWithError(TransferDetails d) | ||
{ | ||
ActivityExecutionContext.Current.Logger.LogInformation("Simulate failure to trigger compensation. ReferenceId: {ReferenceId}", d.ReferenceId); | ||
throw new ApplicationFailureException("Simulated failure"); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
using System.Diagnostics; | ||
using Microsoft.Extensions.Logging; | ||
using Temporalio.Client; | ||
using Temporalio.Worker; | ||
using TemporalioSamples.Saga; | ||
|
||
var client = await TemporalClient.ConnectAsync(new("localhost:7233") | ||
{ | ||
LoggerFactory = LoggerFactory.Create(builder => | ||
builder.AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").SetMinimumLevel(LogLevel.Information)), | ||
}); | ||
|
||
async Task RunWorkerAsync() | ||
{ | ||
// Cancellation token cancelled on ctrl+c | ||
using var tokenSource = new CancellationTokenSource(); | ||
Console.CancelKeyPress += (_, eventArgs) => | ||
{ | ||
tokenSource.Cancel(); | ||
eventArgs.Cancel = true; | ||
}; | ||
|
||
// Run worker until cancelled | ||
Console.WriteLine("Running worker"); | ||
|
||
using var worker = new TemporalWorker( | ||
client, | ||
new TemporalWorkerOptions(taskQueue: "workflow-saga-sample") | ||
.AddAllActivities(typeof(Activities), null) | ||
.AddWorkflow<SagaWorkflow>()); | ||
try | ||
{ | ||
await worker.ExecuteAsync(tokenSource.Token); | ||
} | ||
catch (OperationCanceledException) | ||
{ | ||
Console.WriteLine("Worker cancelled"); | ||
} | ||
} | ||
async Task ExecuteWorkflowAsync() | ||
{ | ||
var workflowId = "test-" + Guid.NewGuid(); | ||
Console.WriteLine($"Starting test workflow with id '{workflowId}'."); | ||
|
||
var sw = Stopwatch.StartNew(); | ||
var handle = await client.StartWorkflowAsync( | ||
(SagaWorkflow wf) => wf.RunAsync(new TransferDetails(100, "acc1000", "acc2000", "1324")), | ||
new(workflowId, "workflow-saga-sample")); | ||
|
||
Console.WriteLine($"Test workflow '{workflowId}' started"); | ||
|
||
await handle.GetResultAsync(); | ||
Console.WriteLine($"Test workflow '{workflowId}' finished after {sw.ElapsedMilliseconds}ms"); | ||
} | ||
|
||
switch (args.ElementAtOrDefault(0)) | ||
{ | ||
case "worker": | ||
await RunWorkerAsync(); | ||
break; | ||
case "workflow": | ||
await ExecuteWorkflowAsync(); | ||
break; | ||
default: | ||
throw new ArgumentException("Must pass 'worker' or 'workflow' as the first argument"); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# Saga | ||
|
||
This sample has a Saga |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
using Microsoft.Extensions.Logging; | ||
|
||
namespace TemporalioSamples.Saga; | ||
|
||
public class Saga | ||
{ | ||
private ILogger log; | ||
private Stack<Func<Task>> compensations; | ||
private Func<ILogger, Task> onCompensationError = default!; | ||
private Func<ILogger, Task> onCompensationComplete = default!; | ||
|
||
public Saga(ILogger logger) | ||
{ | ||
log = logger; | ||
compensations = new Stack<Func<Task>>(); | ||
} | ||
|
||
public void OnCompensationError(Func<ILogger, Task> onCompensationError) | ||
{ | ||
this.onCompensationError = onCompensationError; | ||
} | ||
|
||
public void OnCompensationComplete(Func<ILogger, Task> onCompensationComplete) | ||
{ | ||
this.onCompensationComplete = onCompensationComplete; | ||
} | ||
|
||
public void AddCompensation(Func<Task> compensation) | ||
{ | ||
compensations.Push(compensation); | ||
} | ||
|
||
public async Task CompensateAsync() | ||
{ | ||
int i = 0; | ||
while (compensations.Count > 0) | ||
{ | ||
i++; | ||
var c = compensations.Pop(); | ||
|
||
try | ||
{ | ||
log.LogInformation("Attempting compensation {I}...", i); | ||
await c.Invoke(); | ||
log.LogInformation("Compensation {I} successfull!", i); | ||
} | ||
catch (Exception) | ||
{ | ||
/* log details of all other compensations that have not yet been made if this is a show-stopper */ | ||
await onCompensationError(log); | ||
throw; | ||
} | ||
} | ||
await onCompensationComplete(log); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
using Microsoft.Extensions.Logging; | ||
using Temporalio.Workflows; | ||
|
||
namespace TemporalioSamples.Saga; | ||
|
||
[Workflow] | ||
public class SagaWorkflow | ||
{ | ||
[WorkflowRun] | ||
public async Task RunAsync(TransferDetails transfer) | ||
{ | ||
var options = new ActivityOptions() | ||
{ | ||
StartToCloseTimeout = TimeSpan.FromSeconds(90), // schedule a retry if the Activity function doesn't return within 90 seconds | ||
RetryPolicy = new() | ||
{ | ||
InitialInterval = TimeSpan.FromSeconds(15), // first try will occur after 15 seconds | ||
BackoffCoefficient = 1, // double the delay after each retry | ||
MaximumInterval = TimeSpan.FromMinutes(1), // up to a maximum delay of 1 minute | ||
MaximumAttempts = 2, // fail the Activitiesivity after 2 attempts | ||
}, | ||
}; | ||
|
||
var logger = Workflow.Logger; | ||
var saga = new Saga(logger); | ||
|
||
try | ||
{ | ||
await Workflow.ExecuteActivityAsync(() => Activities.Withdraw(transfer), options); | ||
|
||
saga.AddCompensation(async () => await Workflow.ExecuteActivityAsync( | ||
() => Activities.WithdrawCompensation(transfer), | ||
options)); | ||
|
||
await Workflow.ExecuteActivityAsync(() => Activities.Deposit(transfer), options); | ||
|
||
saga.AddCompensation(async () => await Workflow.ExecuteActivityAsync( | ||
() => Activities.DepositCompensation(transfer), | ||
options)); | ||
|
||
// throw new Exception | ||
await Workflow.ExecuteActivityAsync(() => Activities.StepWithError(transfer), options); | ||
} | ||
catch (Exception) | ||
{ | ||
logger.LogInformation("Exception caught. Initiating compensation..."); | ||
saga.OnCompensationComplete((log) => | ||
{ | ||
/* Send "we're sorry, but.." email to customer... */ | ||
log.LogInformation("Done. Compensation complete!"); | ||
return Task.CompletedTask; | ||
}); | ||
|
||
saga.OnCompensationError((log) => | ||
{ | ||
/* Send emails to internal supporting teams */ | ||
log.LogInformation("Done. Compensation unsuccessful... Manual intervention required!"); | ||
return Task.CompletedTask; | ||
}); | ||
|
||
await saga.CompensateAsync(); | ||
throw; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
</PropertyGroup> | ||
|
||
</Project> |