From aa7fc9face411595475ef112a507411e74527448 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=E2=98=BA=EF=B8=8F?= <251288+lorensr@users.noreply.github.com> Date: Thu, 30 May 2024 16:53:47 -0400 Subject: [PATCH] Add SignalsQueries sample (#58) --- README.md | 1 + src/SignalsQueries/LoyaltyProgram.workflow.cs | 47 +++++++++++ src/SignalsQueries/MyActivities.cs | 13 +++ src/SignalsQueries/Program.cs | 80 +++++++++++++++++++ src/SignalsQueries/README.md | 14 ++++ .../TemporalioSamples.SignalsQueries.csproj | 7 ++ 6 files changed, 162 insertions(+) create mode 100644 src/SignalsQueries/LoyaltyProgram.workflow.cs create mode 100644 src/SignalsQueries/MyActivities.cs create mode 100644 src/SignalsQueries/Program.cs create mode 100644 src/SignalsQueries/README.md create mode 100644 src/SignalsQueries/TemporalioSamples.SignalsQueries.csproj diff --git a/README.md b/README.md index 2949982..9ee39c6 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Prerequisites: * [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [Saga](src/Saga) - Demonstrates how to implement a saga pattern. * [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future. +* [SignalsQueries](src/SignalsQueries) - A loyalty program using Signals and Queries. * [Timer](src/Timer) - Use a timer to implement a monthly subscription; handle workflow cancellation. * [WorkerSpecificTaskQueues](src/WorkerSpecificTaskQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker. * [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code. diff --git a/src/SignalsQueries/LoyaltyProgram.workflow.cs b/src/SignalsQueries/LoyaltyProgram.workflow.cs new file mode 100644 index 0000000..28ff886 --- /dev/null +++ b/src/SignalsQueries/LoyaltyProgram.workflow.cs @@ -0,0 +1,47 @@ +namespace TemporalioSamples.SignalsQueries; + +using Microsoft.Extensions.Logging; +using Temporalio.Workflows; + +public record Purchase(string Id, int TotalCents); + +[Workflow] +public class LoyaltyProgram +{ + private readonly Queue toProcess = new(); + + [WorkflowQuery] + public int Points { get; private set; } + + [WorkflowRun] + public async Task RunAsync(string userId) + { + while (true) + { + // Wait for purchase + await Workflow.WaitConditionAsync(() => toProcess.Count > 0); + + // Process + var purchase = toProcess.Dequeue(); + Points += purchase.TotalCents; + Workflow.Logger.LogInformation("Added {TotalCents} points, total: {Points}", purchase.TotalCents, Points); + if (Points >= 10_000) + { + await Workflow.ExecuteActivityAsync( + () => MyActivities.SendCoupon(userId), + new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) }); + Points -= 10_000; + Workflow.Logger.LogInformation("Remaining points: {Points}", Points); + } + } + } + + [WorkflowSignal] + public async Task NotifyPurchaseAsync(Purchase purchase) + { + if (!toProcess.Contains(purchase)) + { + toProcess.Enqueue(purchase); + } + } +} \ No newline at end of file diff --git a/src/SignalsQueries/MyActivities.cs b/src/SignalsQueries/MyActivities.cs new file mode 100644 index 0000000..b22ee77 --- /dev/null +++ b/src/SignalsQueries/MyActivities.cs @@ -0,0 +1,13 @@ +namespace TemporalioSamples.SignalsQueries; + +using Microsoft.Extensions.Logging; +using Temporalio.Activities; + +public static class MyActivities +{ + [Activity] + public static void SendCoupon(string? userId) + { + ActivityExecutionContext.Current.Logger.LogInformation("Sending coupon to user {UserId}", userId); + } +} \ No newline at end of file diff --git a/src/SignalsQueries/Program.cs b/src/SignalsQueries/Program.cs new file mode 100644 index 0000000..0c68db8 --- /dev/null +++ b/src/SignalsQueries/Program.cs @@ -0,0 +1,80 @@ +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Worker; +using TemporalioSamples.SignalsQueries; + +// Create a client to localhost on default namespace +var client = await TemporalClient.ConnectAsync(new("localhost:7233") +{ + LoggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)), +}); + +async Task RunWorkerAsync() +{ + // Cancellation token cancelled on ctrl+c + using var tokenSource = new CancellationTokenSource(); + Console.CancelKeyPress += (_, eventArgs) => + { + tokenSource.Cancel(); + eventArgs.Cancel = true; + }; + + // Run worker until cancelled + Console.WriteLine("Running worker"); + using var worker = new TemporalWorker( + client, + new TemporalWorkerOptions(taskQueue: "signals-queries-sample"). + AddActivity(MyActivities.SendCoupon). + AddWorkflow()); + try + { + await worker.ExecuteAsync(tokenSource.Token); + } + catch (OperationCanceledException) + { + Console.WriteLine("Worker cancelled"); + } +} + +async Task ExecuteWorkflowAsync() +{ + // If the workflow is already running from a previous run, terminate it + try + { + await client.GetWorkflowHandle("signals-queries-workflow-id").TerminateAsync(); + } + catch (Temporalio.Exceptions.RpcException ex) when (ex.Code == Temporalio.Exceptions.RpcException.StatusCode.NotFound) + { + // Ignore + } + + Console.WriteLine("Executing workflow"); + var handle = await client.StartWorkflowAsync( + (LoyaltyProgram wf) => wf.RunAsync("user-id-123"), + new(id: "signals-queries-workflow-id", taskQueue: "signals-queries-sample")); + + Console.WriteLine("Signal: Purchase made for $80"); + await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 8_000))); + Console.WriteLine("Signal: Purchase made for $40"); + await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 4_000))); + + // Wait for workflow to process the signals + await Task.Delay(1000); + var points = await handle.QueryAsync(wf => wf.Points); + Console.WriteLine("Remaining points: {0}", points); +} + +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + await RunWorkerAsync(); + break; + case "workflow": + await ExecuteWorkflowAsync(); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument"); +} \ No newline at end of file diff --git a/src/SignalsQueries/README.md b/src/SignalsQueries/README.md new file mode 100644 index 0000000..2585947 --- /dev/null +++ b/src/SignalsQueries/README.md @@ -0,0 +1,14 @@ +# SignalsQueries + +A loyalty program implemented with Signals and Queries. + +To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory +in a separate terminal to start the worker: + + dotnet run worker + +Then in another terminal, run the workflow from this directory: + + dotnet run workflow + +The worker terminal will show logs from running the workflow. diff --git a/src/SignalsQueries/TemporalioSamples.SignalsQueries.csproj b/src/SignalsQueries/TemporalioSamples.SignalsQueries.csproj new file mode 100644 index 0000000..e3b6154 --- /dev/null +++ b/src/SignalsQueries/TemporalioSamples.SignalsQueries.csproj @@ -0,0 +1,7 @@ + + + + Exe + + + \ No newline at end of file