Skip to content

Commit

Permalink
Add SignalsQueries sample (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
lorensr authored May 30, 2024
1 parent 61133fb commit aa7fc9f
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Prerequisites:
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [Saga](src/Saga) - Demonstrates how to implement a saga pattern.
* [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future.
* [SignalsQueries](src/SignalsQueries) - A loyalty program using Signals and Queries.
* [Timer](src/Timer) - Use a timer to implement a monthly subscription; handle workflow cancellation.
* [WorkerSpecificTaskQueues](src/WorkerSpecificTaskQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker.
* [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code.
Expand Down
47 changes: 47 additions & 0 deletions src/SignalsQueries/LoyaltyProgram.workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
namespace TemporalioSamples.SignalsQueries;

using Microsoft.Extensions.Logging;
using Temporalio.Workflows;

public record Purchase(string Id, int TotalCents);

[Workflow]
public class LoyaltyProgram
{
private readonly Queue<Purchase> toProcess = new();

[WorkflowQuery]
public int Points { get; private set; }

[WorkflowRun]
public async Task RunAsync(string userId)
{
while (true)
{
// Wait for purchase
await Workflow.WaitConditionAsync(() => toProcess.Count > 0);

// Process
var purchase = toProcess.Dequeue();
Points += purchase.TotalCents;
Workflow.Logger.LogInformation("Added {TotalCents} points, total: {Points}", purchase.TotalCents, Points);
if (Points >= 10_000)
{
await Workflow.ExecuteActivityAsync(
() => MyActivities.SendCoupon(userId),
new() { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) });
Points -= 10_000;
Workflow.Logger.LogInformation("Remaining points: {Points}", Points);
}
}
}

[WorkflowSignal]
public async Task NotifyPurchaseAsync(Purchase purchase)
{
if (!toProcess.Contains(purchase))
{
toProcess.Enqueue(purchase);
}
}
}
13 changes: 13 additions & 0 deletions src/SignalsQueries/MyActivities.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace TemporalioSamples.SignalsQueries;

using Microsoft.Extensions.Logging;
using Temporalio.Activities;

public static class MyActivities
{
[Activity]
public static void SendCoupon(string? userId)
{
ActivityExecutionContext.Current.Logger.LogInformation("Sending coupon to user {UserId}", userId);
}
}
80 changes: 80 additions & 0 deletions src/SignalsQueries/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Worker;
using TemporalioSamples.SignalsQueries;

// Create a client to localhost on default namespace
var client = await TemporalClient.ConnectAsync(new("localhost:7233")
{
LoggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information)),
});

async Task RunWorkerAsync()
{
// Cancellation token cancelled on ctrl+c
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

// Run worker until cancelled
Console.WriteLine("Running worker");
using var worker = new TemporalWorker(
client,
new TemporalWorkerOptions(taskQueue: "signals-queries-sample").
AddActivity(MyActivities.SendCoupon).
AddWorkflow<LoyaltyProgram>());
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Worker cancelled");
}
}

async Task ExecuteWorkflowAsync()
{
// If the workflow is already running from a previous run, terminate it
try
{
await client.GetWorkflowHandle("signals-queries-workflow-id").TerminateAsync();
}
catch (Temporalio.Exceptions.RpcException ex) when (ex.Code == Temporalio.Exceptions.RpcException.StatusCode.NotFound)
{
// Ignore
}

Console.WriteLine("Executing workflow");
var handle = await client.StartWorkflowAsync(
(LoyaltyProgram wf) => wf.RunAsync("user-id-123"),
new(id: "signals-queries-workflow-id", taskQueue: "signals-queries-sample"));

Console.WriteLine("Signal: Purchase made for $80");
await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 8_000)));
Console.WriteLine("Signal: Purchase made for $40");
await handle.SignalAsync(wf => wf.NotifyPurchaseAsync(new Purchase("purchase-1", 4_000)));

// Wait for workflow to process the signals
await Task.Delay(1000);
var points = await handle.QueryAsync(wf => wf.Points);
Console.WriteLine("Remaining points: {0}", points);
}

switch (args.ElementAtOrDefault(0))
{
case "worker":
await RunWorkerAsync();
break;
case "workflow":
await ExecuteWorkflowAsync();
break;
default:
throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument");
}
14 changes: 14 additions & 0 deletions src/SignalsQueries/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# SignalsQueries

A loyalty program implemented with Signals and Queries.

To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory
in a separate terminal to start the worker:

dotnet run worker

Then in another terminal, run the workflow from this directory:

dotnet run workflow

The worker terminal will show logs from running the workflow.
7 changes: 7 additions & 0 deletions src/SignalsQueries/TemporalioSamples.SignalsQueries.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
</PropertyGroup>

</Project>

0 comments on commit aa7fc9f

Please sign in to comment.