diff --git a/src/SignalsQueries/LoyaltyProgram.workflow.cs b/src/SignalsQueries/LoyaltyProgram.workflow.cs index a01a142..affec39 100644 --- a/src/SignalsQueries/LoyaltyProgram.workflow.cs +++ b/src/SignalsQueries/LoyaltyProgram.workflow.cs @@ -3,10 +3,12 @@ namespace TemporalioSamples.SignalsQueries; using Microsoft.Extensions.Logging; using Temporalio.Workflows; +public record Purchase(string Id, int TotalCents); + [Workflow] public class LoyaltyProgram { - private string? userId; + private readonly Queue toProcess = new(); [WorkflowQuery] public int Points { get; private set; } @@ -14,24 +16,31 @@ public class LoyaltyProgram [WorkflowRun] public async Task RunAsync(string userId) { - this.userId = userId; + while (true) + { + // Wait for purchase + await Workflow.WaitConditionAsync(() => toProcess.Count > 0); - // Keep this workflow running forever - await Workflow.WaitConditionAsync(() => false); + // Process + var purchase = toProcess.Dequeue(); + Points += purchase.TotalCents; + Workflow.Logger.LogInformation("Added {TotalCents} points, total: {Points}", purchase.TotalCents, Points); + if (Points >= 10_000) + { + await Workflow.ExecuteActivityAsync( + () => MyActivities.SendCoupon(userId), + new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) }); + Points -= 10_000; + } + } } [WorkflowSignal] - public async Task NotifyPurchaseAsync(int purchaseTotalCents) + public async Task NotifyPurchaseAsync(Purchase purchase) { - Points += purchaseTotalCents; - Workflow.Logger.LogInformation("Added {Result} points, total: {Total}", purchaseTotalCents, Points); - - if (Points >= 10_000) + if (!toProcess.Contains(purchase)) { - Points -= 10_000; - await Workflow.ExecuteActivityAsync( - () => MyActivities.SendCoupon(userId), - new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) }); + toProcess.Enqueue(purchase); } } } \ No newline at end of file diff --git a/src/SignalsQueries/Program.cs b/src/SignalsQueries/Program.cs index afa948d..924e72d 100644 --- a/src/SignalsQueries/Program.cs +++ b/src/SignalsQueries/Program.cs @@ -42,7 +42,14 @@ async Task RunWorkerAsync() async Task ExecuteWorkflowAsync() { // If the workflow is already running from a previous run, terminate it - await client.GetWorkflowHandle("signals-queries-workflow-id").TerminateAsync(); + try + { + await client.GetWorkflowHandle("signals-queries-workflow-id").TerminateAsync(); + } + catch + { + // Ignore + } Console.WriteLine("Executing workflow"); var handle = await client.StartWorkflowAsync( @@ -50,9 +57,9 @@ async Task ExecuteWorkflowAsync() new(id: "signals-queries-workflow-id", taskQueue: "signals-queries-sample")); Console.WriteLine("Signal: Purchase made for $80"); - await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(8_000)); + await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 8_000))); Console.WriteLine("Signal: Purchase made for $30"); - await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(3_000)); + await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 3_000))); var points = await handle.QueryAsync(wf => wf.Points); Console.WriteLine("Remaining points: {0}", points);