From 49c0c06e6996348aa3f2ebb13fdb62b13d8c6c0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Mendoza=20P=C3=A9rez?= Date: Wed, 6 Mar 2024 20:46:22 +0000 Subject: [PATCH] add sample for workflow update (#40) add sample for workflow update --- README.md | 1 + TemporalioSamples.sln | 8 ++ src/WorkflowUpdate/Program.cs | 74 +++++++++++++++++++ src/WorkflowUpdate/README.md | 19 +++++ .../TemporalioSamples.WorkflowUpdate.csproj | 7 ++ src/WorkflowUpdate/WorkflowUpdate.workflow.cs | 65 ++++++++++++++++ tests/TemporalioSamples.Tests.csproj | 1 + tests/WorkflowUpdate/WorkflowUpdateTests.cs | 62 ++++++++++++++++ 8 files changed, 237 insertions(+) create mode 100644 src/WorkflowUpdate/Program.cs create mode 100644 src/WorkflowUpdate/README.md create mode 100644 src/WorkflowUpdate/TemporalioSamples.WorkflowUpdate.csproj create mode 100644 src/WorkflowUpdate/WorkflowUpdate.workflow.cs create mode 100644 tests/WorkflowUpdate/WorkflowUpdateTests.cs diff --git a/README.md b/README.md index 69b728a..08b98c9 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Prerequisites: * [Schedules](src/Schedules) - How to schedule workflows to be run at specific times in the future. * [WorkerSpecificTaskQueues](src/WorkerSpecificTaskQueues) - Use a unique task queue per Worker to have certain Activities only run on that specific Worker. * [WorkerVersioning](src/WorkerVersioning) - How to use the Worker Versioning feature to more easily deploy changes to Workflow & other code. +* [WorkflowUpdate](src/WorkflowUpdate) - How to use the Workflow Update feature while blocking in update method for concurrent updates. ## Development diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index ad4d78d..89c2bf9 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -51,6 +51,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TemporalioSamples.Mutex", " EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TemporalioSamples.Saga", "src\Saga\TemporalioSamples.Saga.csproj", "{B79F07F7-3429-4C58-84C3-08587F748B2D}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.WorkflowUpdate", "src\WorkflowUpdate\TemporalioSamples.WorkflowUpdate.csproj", "{B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -137,6 +139,10 @@ Global {B79F07F7-3429-4C58-84C3-08587F748B2D}.Debug|Any CPU.Build.0 = Debug|Any CPU {B79F07F7-3429-4C58-84C3-08587F748B2D}.Release|Any CPU.ActiveCfg = Release|Any CPU {B79F07F7-3429-4C58-84C3-08587F748B2D}.Release|Any CPU.Build.0 = Release|Any CPU + {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -164,5 +170,7 @@ Global {CA3FD1BC-C918-4B15-96F6-D6DDA125E63C} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {3168FB2D-D821-433A-A761-309E0474DE48} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {B79F07F7-3429-4C58-84C3-08587F748B2D} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {B3DB7B8C-7BD3-4A53-A809-AB6279B1A630} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + EndGlobalSection EndGlobal diff --git a/src/WorkflowUpdate/Program.cs b/src/WorkflowUpdate/Program.cs new file mode 100644 index 0000000..9217148 --- /dev/null +++ b/src/WorkflowUpdate/Program.cs @@ -0,0 +1,74 @@ +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Worker; +using TemporalioSamples.WorkflowUpdate; + +// Create a client to localhost on default namespace +var client = await TemporalClient.ConnectAsync(new("localhost:7233") +{ + LoggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)), +}); + +async Task RunWorkerAsync() +{ + // Cancellation token cancelled on ctrl+c + using var tokenSource = new CancellationTokenSource(); + Console.CancelKeyPress += (_, eventArgs) => + { + tokenSource.Cancel(); + eventArgs.Cancel = true; + }; + + // Run worker until cancelled + Console.WriteLine("Running worker"); + using var worker = new TemporalWorker( + client, + new TemporalWorkerOptions(taskQueue: "workflow-update-queue"). + AddWorkflow()); + try + { + await worker.ExecuteAsync(tokenSource.Token); + } + catch (OperationCanceledException) + { + Console.WriteLine("Worker cancelled"); + } +} + +async Task ExecuteWorkflowAsync() +{ + Console.WriteLine("Executing workflow"); + + var handle = await client.StartWorkflowAsync( + (WorkflowUpdate wf) => wf.RunAsync(), + new(id: $"workflow-update-{Guid.NewGuid()}", taskQueue: "workflow-update-queue")); + + await handle.ExecuteUpdateAsync(wf => + wf.SubmitScreenAsync(new WorkflowUpdate.UiRequest( + $"requestId-{Guid.NewGuid()}", + WorkflowUpdate.ScreenId.Screen1))); + + await handle.ExecuteUpdateAsync(wf => + wf.SubmitScreenAsync(new WorkflowUpdate.UiRequest( + $"requestId-{Guid.NewGuid()}", + WorkflowUpdate.ScreenId.Screen2))); + + // Workflow completes + await handle.GetResultAsync(); + Console.WriteLine("Workflow completes"); +} + +switch (args.ElementAtOrDefault(0)) +{ + case "worker": + await RunWorkerAsync(); + break; + case "workflow": + await ExecuteWorkflowAsync(); + break; + default: + throw new ArgumentException("Must pass 'worker' or 'workflow' as the single argument"); +} \ No newline at end of file diff --git a/src/WorkflowUpdate/README.md b/src/WorkflowUpdate/README.md new file mode 100644 index 0000000..16b3ee2 --- /dev/null +++ b/src/WorkflowUpdate/README.md @@ -0,0 +1,19 @@ +# Workflow Update + +This workflow represents a UI Wizard. We use [Workflow Update](https://docs.temporal.io/workflows#update) +to mutate the workflow state (submit some data) and wait for the workflow update method to return the next screen +the client has to navigate to. + +The update validator is used to reject null arguments (rejected updates are not included in workflow history). + + +To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory +in a separate terminal to start the worker: + + dotnet run worker + +Then in another terminal, run the workflow from this directory: + + dotnet run workflow + +This will show logs in the worker window of the workflow running. \ No newline at end of file diff --git a/src/WorkflowUpdate/TemporalioSamples.WorkflowUpdate.csproj b/src/WorkflowUpdate/TemporalioSamples.WorkflowUpdate.csproj new file mode 100644 index 0000000..2db6435 --- /dev/null +++ b/src/WorkflowUpdate/TemporalioSamples.WorkflowUpdate.csproj @@ -0,0 +1,7 @@ + + + + Exe + + + \ No newline at end of file diff --git a/src/WorkflowUpdate/WorkflowUpdate.workflow.cs b/src/WorkflowUpdate/WorkflowUpdate.workflow.cs new file mode 100644 index 0000000..3177cd0 --- /dev/null +++ b/src/WorkflowUpdate/WorkflowUpdate.workflow.cs @@ -0,0 +1,65 @@ +namespace TemporalioSamples.WorkflowUpdate; + +using Temporalio.Workflows; + +[Workflow] +public class WorkflowUpdate +{ + private bool updateInProgress; + private ScreenId currentScreen = ScreenId.Screen1; + + [WorkflowRun] + public async Task RunAsync() => await Workflow.WaitConditionAsync(() => currentScreen == ScreenId.End); + + [WorkflowUpdateValidator(nameof(SubmitScreenAsync))] + public void ValidatorSubmitScreen(UiRequest request) + { + if (request == null) + { + throw new ArgumentException("Input can not be null"); + } + } + + [WorkflowUpdate] + public async Task SubmitScreenAsync(UiRequest request) + { + // Ensure we process the requests one by one + await Workflow.WaitConditionAsync(() => !updateInProgress); + updateInProgress = true; + + // Activities can be scheduled here + SetNextScreen(request); + updateInProgress = false; + return currentScreen; + } + + private void SetNextScreen(UiRequest currentRequest) + { + currentScreen = currentRequest.ScreenId switch + { + ScreenId.Screen1 => ScreenId.Screen2, + ScreenId.Screen2 => ScreenId.End, + _ => currentScreen, + }; + } + + public enum ScreenId + { + /// + /// Screen1. + /// + Screen1, + + /// + /// Screen2. + /// + Screen2, + + /// + /// End. + /// + End, + } + + public record UiRequest(string RequestId, ScreenId ScreenId); +} \ No newline at end of file diff --git a/tests/TemporalioSamples.Tests.csproj b/tests/TemporalioSamples.Tests.csproj index d464518..91bcc75 100644 --- a/tests/TemporalioSamples.Tests.csproj +++ b/tests/TemporalioSamples.Tests.csproj @@ -23,6 +23,7 @@ + diff --git a/tests/WorkflowUpdate/WorkflowUpdateTests.cs b/tests/WorkflowUpdate/WorkflowUpdateTests.cs new file mode 100644 index 0000000..68031ba --- /dev/null +++ b/tests/WorkflowUpdate/WorkflowUpdateTests.cs @@ -0,0 +1,62 @@ +using Temporalio.Exceptions; + +namespace TemporalioSamples.Tests.WorkflowUpdate; + +using Temporalio.Testing; +using Temporalio.Worker; +using Xunit; +using Xunit.Abstractions; + +public class WorkflowUpdateTests : TestBase +{ + public WorkflowUpdateTests(ITestOutputHelper output) + : base(output) + { + } + + [Fact] + public async Task SimpleRun_Succeed() + { + await using var env = await WorkflowEnvironment.StartLocalAsync(); + using var worker = new TemporalWorker( + env.Client, + new TemporalWorkerOptions("my-task-queue"). + AddWorkflow()); + await worker.ExecuteAsync(async () => + { + var handle = await env.Client.StartWorkflowAsync( + (TemporalioSamples.WorkflowUpdate.WorkflowUpdate wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + + Assert.Equal( + TemporalioSamples.WorkflowUpdate.WorkflowUpdate.ScreenId.Screen2, + await handle.ExecuteUpdateAsync((wf) => wf.SubmitScreenAsync(new TemporalioSamples.WorkflowUpdate.WorkflowUpdate.UiRequest($"requestId-{Guid.NewGuid()}", TemporalioSamples.WorkflowUpdate.WorkflowUpdate.ScreenId.Screen1)))); + + Assert.Equal( + TemporalioSamples.WorkflowUpdate.WorkflowUpdate.ScreenId.End, + await handle.ExecuteUpdateAsync((wf) => wf.SubmitScreenAsync(new TemporalioSamples.WorkflowUpdate.WorkflowUpdate.UiRequest($"requestId-{Guid.NewGuid()}", TemporalioSamples.WorkflowUpdate.WorkflowUpdate.ScreenId.Screen2)))); + + // Workflow completes + await handle.GetResultAsync(); + }); + } + + [Fact] + public async Task Reject_Update() + { + await using var env = await WorkflowEnvironment.StartLocalAsync(); + using var worker = new TemporalWorker( + env.Client, + new TemporalWorkerOptions("my-task-queue"). + AddWorkflow()); + await worker.ExecuteAsync(async () => + { + var handle = await env.Client.StartWorkflowAsync( + (TemporalioSamples.WorkflowUpdate.WorkflowUpdate wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + + await Assert.ThrowsAsync(() => + handle.ExecuteUpdateAsync(wf => wf.SubmitScreenAsync(null!))); + }); + } +}