Skip to content

Commit

Permalink
use queue
Browse files Browse the repository at this point in the history
  • Loading branch information
lorensr committed Mar 30, 2024
1 parent ce8a6d3 commit 64d8186
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
35 changes: 22 additions & 13 deletions src/SignalsQueries/LoyaltyProgram.workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,44 @@ namespace TemporalioSamples.SignalsQueries;
using Microsoft.Extensions.Logging;
using Temporalio.Workflows;

public record Purchase(string Id, int TotalCents);

[Workflow]
public class LoyaltyProgram
{
private string? userId;
private readonly Queue<Purchase> toProcess = new();

[WorkflowQuery]
public int Points { get; private set; }

[WorkflowRun]
public async Task RunAsync(string userId)
{
this.userId = userId;
while (true)
{
// Wait for purchase
await Workflow.WaitConditionAsync(() => toProcess.Count > 0);

// Keep this workflow running forever
await Workflow.WaitConditionAsync(() => false);
// Process
var purchase = toProcess.Dequeue();
Points += purchase.TotalCents;
Workflow.Logger.LogInformation("Added {TotalCents} points, total: {Points}", purchase.TotalCents, Points);
if (Points >= 10_000)
{
await Workflow.ExecuteActivityAsync(
() => MyActivities.SendCoupon(userId),
new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) });
Points -= 10_000;
}
}
}

[WorkflowSignal]
public async Task NotifyPurchaseAsync(int purchaseTotalCents)
public async Task NotifyPurchaseAsync(Purchase purchase)
{
Points += purchaseTotalCents;
Workflow.Logger.LogInformation("Added {Result} points, total: {Total}", purchaseTotalCents, Points);

if (Points >= 10_000)
if (!toProcess.Contains(purchase))
{
Points -= 10_000;
await Workflow.ExecuteActivityAsync(
() => MyActivities.SendCoupon(userId),
new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) });
toProcess.Enqueue(purchase);
}
}
}
13 changes: 10 additions & 3 deletions src/SignalsQueries/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,24 @@ async Task RunWorkerAsync()
async Task ExecuteWorkflowAsync()
{
// If the workflow is already running from a previous run, terminate it
await client.GetWorkflowHandle("signals-queries-workflow-id").TerminateAsync();
try
{
await client.GetWorkflowHandle("signals-queries-workflow-id").TerminateAsync();
}
catch
{
// Ignore
}

Console.WriteLine("Executing workflow");
var handle = await client.StartWorkflowAsync(
(LoyaltyProgram wf) => wf.RunAsync("user-id-123"),
new(id: "signals-queries-workflow-id", taskQueue: "signals-queries-sample"));

Console.WriteLine("Signal: Purchase made for $80");
await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(8_000));
await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 8_000)));
Console.WriteLine("Signal: Purchase made for $30");
await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(3_000));
await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 3_000)));

var points = await handle.QueryAsync(wf => wf.Points);
Console.WriteLine("Remaining points: {0}", points);
Expand Down

0 comments on commit 64d8186

Please sign in to comment.