From efd9ba3045c8a453ff2c65e2f0cd44118cede0b6 Mon Sep 17 00:00:00 2001 From: Frank Wagner Date: Sun, 26 Feb 2023 15:54:33 +0100 Subject: [PATCH] Added Rebus.Operations (Workflow Engine) (#1) * Added Rebus.Operations (Worklow Engine) * added coverage to pipeline * added SimpleInjector setup +semver: minor --- azure-piplines.yml | 19 +- rebus-extensions.sln | 42 ++++ rebus-extensions.sln.DotSettings | 1 + src/Rebus.Operations/Directory.Build.props | 7 + .../IOperation.cs | 12 + .../IOperationDispatcher.cs | 15 ++ .../IOperationLogEntry.cs | 10 + .../IOperationTask.cs | 16 ++ .../IOperationTaskDispatcher.cs | 21 ++ .../Rebus.Operations.Abstractions.csproj | 13 ++ .../Workflow/IMessageEnricher.cs | 8 + .../Workflow/IOperationManager.cs | 14 ++ .../Workflow/IOperationMessaging.cs | 17 ++ .../Workflow/IOperationTaskManager.cs | 15 ++ .../Workflow/IWorkflow.cs | 10 + .../BusOperationTaskExtensions.cs | 57 +++++ .../DefaultOperationDispatcher.cs | 27 +++ .../DefaultOperationTaskDispatcher.cs | 34 +++ .../OperationDispatcherBase.cs | 72 ++++++ .../Rebus.Operations.Core/OperationTask.cs | 20 ++ .../OperationTaskDispatcherBase.cs | 68 ++++++ .../Rebus.Operations.Core/OperationsSetup.cs | 19 ++ .../Rebus.Operations.Core.csproj | 18 ++ .../Workflow/DefaultMessageEnricher.cs | 11 + .../Workflow/DefaultWorkflow.cs | 16 ++ .../EmptyOperationStatusEventHandler.cs | 14 ++ .../EmptyOperationTaskStatusEventHandler.cs | 14 ++ .../Workflow/FailedOperationHandler.cs | 40 ++++ .../Workflow/IncomingTaskMessageHandler.cs | 39 ++++ .../Workflow/OperationManagerBase.cs | 26 +++ .../Workflow/OperationSagaData.cs | 18 ++ .../Workflow/OperationTaskManagerBase.cs | 18 ++ .../OperationTaskProgressEventHandler.cs | 58 +++++ .../Workflow/OperationTaskWorkflowSaga.cs | 116 ++++++++++ .../Workflow/ProcessOperationSaga.cs | 207 ++++++++++++++++++ .../Workflow/RebusOperationMessaging.cs | 54 +++++ .../Workflow/TaskWorkflowSagaData.cs | 18 ++ .../Commands/CreateNewOperationTaskCommand.cs | 29 +++ .../Commands/CreateOperationCommand.cs | 7 + .../Commands/OperationTaskSystemMessage.cs | 33 +++ .../Rebus.Operations.Primitives/ErrorData.cs | 10 + .../Events/OperationStatusEvent.cs | 9 + .../Events/OperationTaskAcceptedEvent.cs | 13 ++ .../Events/OperationTaskProgressEvent.cs | 14 ++ .../Events/OperationTaskStatusEvent.cs | 58 +++++ .../Events/OperationTaskStatusEventBase.cs | 71 ++++++ .../Events/OperationTimeoutEvent.cs | 9 + .../IOperationTaskMessage.cs | 17 ++ .../OperationStatus.cs | 9 + .../OperationTaskStatus.cs | 9 + .../Rebus.Operations.Primitives.csproj | 14 ++ .../Rebus.Operations.SimpleInjector.csproj | 18 ++ .../SimpleInjectorExtensions.cs | 56 +++++ src/Rebus.SimpleInjector/IRebusUnitOfWork.cs | 7 + .../Rebus.SimpleInjector.csproj | 17 ++ .../RebusUnitOfWorkAdapter.cs | 25 +++ .../SimpleInjectorConfigurationExtensions.cs | 163 ++++++++++++++ .../SimpleInjectorContainerAdapter.cs | 82 +++++++ .../SimpleInjectorUnitOfWork.cs | 33 +++ .../MultiStepCommand.cs | 6 + test/Rebus.Operations.Tests/MultiStepSaga.cs | 41 ++++ .../MultiStepSagaData.cs | 8 + .../Rebus.Operations.Tests.csproj | 17 ++ test/Rebus.Operations.Tests/RebusTestBase.cs | 79 +++++++ .../RebusTestLogging.cs | 62 ++++++ test/Rebus.Operations.Tests/StepOneCommand.cs | 6 + .../StepOneCommandHandler.cs | 22 ++ test/Rebus.Operations.Tests/StepTwoCommand.cs | 6 + .../StepTwoCommandHandler.cs | 22 ++ test/Rebus.Operations.Tests/TestCommand.cs | 6 + .../TestCommandHandler.cs | 22 ++ .../TestCommandHandlerWithError.cs | 24 ++ .../TestCommandHandlerWithProgress.cs | 21 ++ .../TestOperationManager.cs | 58 +++++ .../TestOperationModel.cs | 7 + .../TestOperationTaskModel.cs | 9 + test/Rebus.Operations.Tests/TestRebusSetup.cs | 11 + .../Rebus.Operations.Tests/TestTaskManager.cs | 47 ++++ test/Rebus.Operations.Tests/WorkflowTests.cs | 137 ++++++++++++ .../Rebus.SimpleInjector.Tests.csproj | 20 ++ .../SimpleInjectorTestBase.cs | 40 ++++ .../SimpleInjectorWorkflowTests.cs | 73 ++++++ .../TestRebusUnitOfWork.cs | 19 ++ 83 files changed, 2648 insertions(+), 2 deletions(-) create mode 100644 src/Rebus.Operations/Directory.Build.props create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/IOperation.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationLogEntry.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/Rebus.Operations.Abstractions.csproj create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IWorkflow.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/OperationTask.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultWorkflow.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationStatusEventHandler.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationTaskStatusEventHandler.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationSagaData.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Core/Workflow/TaskWorkflowSagaData.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateOperationCommand.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/ErrorData.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationStatusEvent.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskProgressEvent.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/OperationStatus.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/OperationTaskStatus.cs create mode 100644 src/Rebus.Operations/Rebus.Operations.Primitives/Rebus.Operations.Primitives.csproj create mode 100644 src/Rebus.Operations/Rebus.Operations.SimpleInjector/Rebus.Operations.SimpleInjector.csproj create mode 100644 src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs create mode 100644 src/Rebus.SimpleInjector/IRebusUnitOfWork.cs create mode 100644 src/Rebus.SimpleInjector/Rebus.SimpleInjector.csproj create mode 100644 src/Rebus.SimpleInjector/RebusUnitOfWorkAdapter.cs create mode 100644 src/Rebus.SimpleInjector/SimpleInjectorConfigurationExtensions.cs create mode 100644 src/Rebus.SimpleInjector/SimpleInjectorContainerAdapter.cs create mode 100644 src/Rebus.SimpleInjector/SimpleInjectorUnitOfWork.cs create mode 100644 test/Rebus.Operations.Tests/MultiStepCommand.cs create mode 100644 test/Rebus.Operations.Tests/MultiStepSaga.cs create mode 100644 test/Rebus.Operations.Tests/MultiStepSagaData.cs create mode 100644 test/Rebus.Operations.Tests/Rebus.Operations.Tests.csproj create mode 100644 test/Rebus.Operations.Tests/RebusTestBase.cs create mode 100644 test/Rebus.Operations.Tests/RebusTestLogging.cs create mode 100644 test/Rebus.Operations.Tests/StepOneCommand.cs create mode 100644 test/Rebus.Operations.Tests/StepOneCommandHandler.cs create mode 100644 test/Rebus.Operations.Tests/StepTwoCommand.cs create mode 100644 test/Rebus.Operations.Tests/StepTwoCommandHandler.cs create mode 100644 test/Rebus.Operations.Tests/TestCommand.cs create mode 100644 test/Rebus.Operations.Tests/TestCommandHandler.cs create mode 100644 test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs create mode 100644 test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs create mode 100644 test/Rebus.Operations.Tests/TestOperationManager.cs create mode 100644 test/Rebus.Operations.Tests/TestOperationModel.cs create mode 100644 test/Rebus.Operations.Tests/TestOperationTaskModel.cs create mode 100644 test/Rebus.Operations.Tests/TestRebusSetup.cs create mode 100644 test/Rebus.Operations.Tests/TestTaskManager.cs create mode 100644 test/Rebus.Operations.Tests/WorkflowTests.cs create mode 100644 test/Rebus.SimpleInjector.Tests/Rebus.SimpleInjector.Tests.csproj create mode 100644 test/Rebus.SimpleInjector.Tests/SimpleInjectorTestBase.cs create mode 100644 test/Rebus.SimpleInjector.Tests/SimpleInjectorWorkflowTests.cs create mode 100644 test/Rebus.SimpleInjector.Tests/TestRebusUnitOfWork.cs diff --git a/azure-piplines.yml b/azure-piplines.yml index 6fe0395..2faee4b 100644 --- a/azure-piplines.yml +++ b/azure-piplines.yml @@ -31,8 +31,23 @@ steps: inputs: command: test projects: '**/*Tests/*.csproj' - arguments: '--configuration $(buildConfiguration) --collect "Code coverage" --no-build' - + arguments: '--configuration $(buildConfiguration) --collect "XPlat Code Coverage" --no-build' + publishTestResults: true + +- task: reportgenerator@5 + displayName: 'Merge code coverage results' + inputs: + reports: '$(Agent.WorkFolder)/**/coverage.cobertura.xml' + targetdir: '$(Build.SourcesDirectory)/CoverageResults' + reporttypes: 'Cobertura' + +# Publish the combined code coverage to the pipeline +- task: PublishCodeCoverageResults@1 + displayName: 'Publish code coverage report' + inputs: + codeCoverageTool: 'Cobertura' + summaryFileLocation: '$(Build.SourcesDirectory)/CoverageResults/Cobertura.xml' + - task: DotNetCoreCLI@2 displayName: dotnet pack inputs: diff --git a/rebus-extensions.sln b/rebus-extensions.sln index ac40168..ba074fe 100644 --- a/rebus-extensions.sln +++ b/rebus-extensions.sln @@ -13,6 +13,20 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Configuration.MsmqSel EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Rebus.Configuration.SqlServerSelectors", "src\Rebus.Configuration\Rebus.Configuration.SqlServerSelectors\Rebus.Configuration.SqlServerSelectors.csproj", "{7750D5E7-41C2-4D0E-8E84-78D51C8E67FC}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.Core", "src\Rebus.Operations\Rebus.Operations.Core\Rebus.Operations.Core.csproj", "{9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.Primitives", "src\Rebus.Operations\Rebus.Operations.Primitives\Rebus.Operations.Primitives.csproj", "{1F378946-2CD9-4A63-BB5D-29A5001679FB}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.Abstractions", "src\Rebus.Operations\Rebus.Operations.Abstractions\Rebus.Operations.Abstractions.csproj", "{1A83C61F-396A-46A5-A718-06121D705C2D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.Tests", "test\Rebus.Operations.Tests\Rebus.Operations.Tests.csproj", "{D292704B-A6D1-4577-807C-5E58F5C593BE}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.SimpleInjector", "src\Rebus.Operations\Rebus.Operations.SimpleInjector\Rebus.Operations.SimpleInjector.csproj", "{8E8CF6AA-9025-460A-B4E8-CA8A317F758D}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.SimpleInjector", "src\Rebus.SimpleInjector\Rebus.SimpleInjector.csproj", "{7A693524-32D1-4D4F-B355-7DC1BC717BEC}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.SimpleInjector.Tests", "test\Rebus.SimpleInjector.Tests\Rebus.SimpleInjector.Tests.csproj", "{539BED78-F196-4FA2-9E48-2185FE82444E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -39,6 +53,34 @@ Global {7750D5E7-41C2-4D0E-8E84-78D51C8E67FC}.Debug|Any CPU.Build.0 = Debug|Any CPU {7750D5E7-41C2-4D0E-8E84-78D51C8E67FC}.Release|Any CPU.ActiveCfg = Release|Any CPU {7750D5E7-41C2-4D0E-8E84-78D51C8E67FC}.Release|Any CPU.Build.0 = Release|Any CPU + {9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}.Release|Any CPU.Build.0 = Release|Any CPU + {1F378946-2CD9-4A63-BB5D-29A5001679FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1F378946-2CD9-4A63-BB5D-29A5001679FB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1F378946-2CD9-4A63-BB5D-29A5001679FB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1F378946-2CD9-4A63-BB5D-29A5001679FB}.Release|Any CPU.Build.0 = Release|Any CPU + {1A83C61F-396A-46A5-A718-06121D705C2D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1A83C61F-396A-46A5-A718-06121D705C2D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1A83C61F-396A-46A5-A718-06121D705C2D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1A83C61F-396A-46A5-A718-06121D705C2D}.Release|Any CPU.Build.0 = Release|Any CPU + {D292704B-A6D1-4577-807C-5E58F5C593BE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D292704B-A6D1-4577-807C-5E58F5C593BE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D292704B-A6D1-4577-807C-5E58F5C593BE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D292704B-A6D1-4577-807C-5E58F5C593BE}.Release|Any CPU.Build.0 = Release|Any CPU + {8E8CF6AA-9025-460A-B4E8-CA8A317F758D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8E8CF6AA-9025-460A-B4E8-CA8A317F758D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8E8CF6AA-9025-460A-B4E8-CA8A317F758D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8E8CF6AA-9025-460A-B4E8-CA8A317F758D}.Release|Any CPU.Build.0 = Release|Any CPU + {7A693524-32D1-4D4F-B355-7DC1BC717BEC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7A693524-32D1-4D4F-B355-7DC1BC717BEC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7A693524-32D1-4D4F-B355-7DC1BC717BEC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7A693524-32D1-4D4F-B355-7DC1BC717BEC}.Release|Any CPU.Build.0 = Release|Any CPU + {539BED78-F196-4FA2-9E48-2185FE82444E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {539BED78-F196-4FA2-9E48-2185FE82444E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {539BED78-F196-4FA2-9E48-2185FE82444E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {539BED78-F196-4FA2-9E48-2185FE82444E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/rebus-extensions.sln.DotSettings b/rebus-extensions.sln.DotSettings index 36fb221..4ad39df 100644 --- a/rebus-extensions.sln.DotSettings +++ b/rebus-extensions.sln.DotSettings @@ -3,5 +3,6 @@ True True True + True True True \ No newline at end of file diff --git a/src/Rebus.Operations/Directory.Build.props b/src/Rebus.Operations/Directory.Build.props new file mode 100644 index 0000000..6d03a26 --- /dev/null +++ b/src/Rebus.Operations/Directory.Build.props @@ -0,0 +1,7 @@ + + + + Dbosoft.Rebus.Operations + + + diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperation.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperation.cs new file mode 100644 index 0000000..8595585 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperation.cs @@ -0,0 +1,12 @@ +using System; + +namespace Dbosoft.Rebus.Operations; + +public interface IOperation +{ + public Guid Id { get; } + + public OperationStatus Status { get; } + + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs new file mode 100644 index 0000000..22e6de0 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs @@ -0,0 +1,15 @@ +#nullable enable + +using System; +using System.Threading.Tasks; + +namespace Dbosoft.Rebus.Operations +{ + public interface IOperationDispatcher + { + ValueTask StartNew(object? additionalData = default) where T : class, new(); + ValueTask StartNew(Type commandType, object? additionalData = default); + ValueTask StartNew(object operationCommand); + + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationLogEntry.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationLogEntry.cs new file mode 100644 index 0000000..f5753d5 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationLogEntry.cs @@ -0,0 +1,10 @@ +using System; + +namespace Dbosoft.Rebus.Operations; + +public interface IOperationLogEntry +{ + string Message { get; } + public DateTimeOffset Timestamp { get; } + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs new file mode 100644 index 0000000..0dbbaef --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs @@ -0,0 +1,16 @@ +using System; + +namespace Dbosoft.Rebus.Operations; + +public interface IOperationTask +{ + public Guid Id { get; } + + public Guid OperationId { get; } + + public Guid InitiatingTaskId { get; } + + + OperationTaskStatus Status { get; } + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs new file mode 100644 index 0000000..9c18c43 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs @@ -0,0 +1,21 @@ +#nullable enable + +using System; +using System.Threading.Tasks; + +namespace Dbosoft.Rebus.Operations +{ + public interface IOperationTaskDispatcher + { + ValueTask StartNew(Guid operationId, Guid initiatingTaskId, + object? additionalData = default) where T : class, new(); + + ValueTask StartNew(Guid operationId, Guid initiatingTaskId, Type operationCommandType, + object? additionalData = default); + + ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object command + , object? additionalData = default); + + } + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Rebus.Operations.Abstractions.csproj b/src/Rebus.Operations/Rebus.Operations.Abstractions/Rebus.Operations.Abstractions.csproj new file mode 100644 index 0000000..45cd955 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Rebus.Operations.Abstractions.csproj @@ -0,0 +1,13 @@ + + + + netstandard2.1 + enable + latest + + + + + + + diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs new file mode 100644 index 0000000..0a4e318 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs @@ -0,0 +1,8 @@ +using Dbosoft.Rebus.Operations.Commands; + +namespace Dbosoft.Rebus.Operations.Workflow; + +public interface IMessageEnricher +{ + object? EnrichTaskAcceptedReply(OperationTaskSystemMessage taskMessage) where T : class, new(); +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs new file mode 100644 index 0000000..fbac907 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs @@ -0,0 +1,14 @@ +using System; +using System.Threading.Tasks; + +namespace Dbosoft.Rebus.Operations.Workflow; + +#nullable enable +public interface IOperationManager +{ + ValueTask GetByIdAsync(Guid operationId); + ValueTask GetOrCreateAsync(Guid operationId, object command); + + ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData); + ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, object? data); +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs new file mode 100644 index 0000000..7ffe067 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs @@ -0,0 +1,17 @@ +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Events; + +namespace Dbosoft.Rebus.Operations.Workflow; + +public interface IOperationMessaging +{ + void DispatchTaskMessage(object command, IOperationTask task); + Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message); + Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message); + Task DispatchOperationStatusEventAsync(OperationStatusEvent operationStatusEvent); + + IOperationDispatcher OperationDispatcher { get; } + IOperationTaskDispatcher TaskDispatcher { get; } + + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs new file mode 100644 index 0000000..fb171ff --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs @@ -0,0 +1,15 @@ +using System; +using System.Threading.Tasks; + +namespace Dbosoft.Rebus.Operations.Workflow; + +#nullable enable + +public interface IOperationTaskManager +{ + ValueTask GetByIdAsync(Guid taskId); + ValueTask GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId); + ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, object? additionalData); + + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IWorkflow.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IWorkflow.cs new file mode 100644 index 0000000..e8acd67 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IWorkflow.cs @@ -0,0 +1,10 @@ +#nullable enable +namespace Dbosoft.Rebus.Operations.Workflow; + +public interface IWorkflow +{ + IOperationManager Operations { get; } + IOperationTaskManager Tasks { get; } + IOperationMessaging Messaging { get; } + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs b/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs new file mode 100644 index 0000000..9eb777a --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs @@ -0,0 +1,57 @@ +using System; +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Events; +using Rebus.Bus; +using Rebus.Transport; + +namespace Dbosoft.Rebus.Operations; + +public static class BusOperationTaskExtensions +{ + public static Task FailTask(this IBus bus, IOperationTaskMessage message, string errorMessage) + { + return FailTask(bus, message, new ErrorData { ErrorMessage = errorMessage }); + } + + public static Task FailTask(this IBus bus, IOperationTaskMessage message, ErrorData error) + { + return bus.Publish( + OperationTaskStatusEvent.Failed( + message.OperationId, message.InitiatingTaskId, + message.TaskId, error)); + } + + + public static Task CompleteTask(this IBus bus, IOperationTaskMessage message) + { + return bus.Publish( + OperationTaskStatusEvent.Completed( + message.OperationId, message.InitiatingTaskId, message.TaskId)); + } + + public static Task CompleteTask(this IBus bus, IOperationTaskMessage message, object responseMessage) + { + return bus.Publish( + OperationTaskStatusEvent.Completed( + message.OperationId, message.InitiatingTaskId, message.TaskId, responseMessage)); + } + + + public static async Task ProgressMessage(this IBus bus, IOperationTaskMessage message, object data) + { + using var scope = new RebusTransactionScope(); + + + await bus.Publish(new OperationTaskProgressEvent + { + Id = Guid.NewGuid(), + OperationId = message.OperationId, + TaskId = message.TaskId, + Data = data, + Timestamp = DateTimeOffset.UtcNow + }).ConfigureAwait(false); + + // commit it like this + await scope.CompleteAsync().ConfigureAwait(false); + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs new file mode 100644 index 0000000..eec08fc --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Workflow; +using Microsoft.Extensions.Logging; +using Rebus.Bus; + +namespace Dbosoft.Rebus.Operations; + +#nullable enable + +public class DefaultOperationDispatcher : OperationDispatcherBase +{ + private readonly IOperationManager _operationManager; + + public DefaultOperationDispatcher( + IBus bus, + ILogger logger, + IOperationManager operationManager) : base(bus, logger) + { + _operationManager = operationManager; + } + + protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData) + { + return (await _operationManager.GetOrCreateAsync(Guid.NewGuid(), command), command); + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs new file mode 100644 index 0000000..fe87e11 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs @@ -0,0 +1,34 @@ +using System; +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Workflow; +using Microsoft.Extensions.Logging; +using Rebus.Bus; + +namespace Dbosoft.Rebus.Operations; + +public class DefaultOperationTaskDispatcher : OperationTaskDispatcherBase +{ + private readonly IOperationManager _operationManager; + private readonly IOperationTaskManager _operationTaskManager; + + + public DefaultOperationTaskDispatcher( + IBus bus, + ILogger logger, + IOperationManager operationManager, IOperationTaskManager operationTaskManager) : base(bus, logger) + { + _operationManager = operationManager; + _operationTaskManager = operationTaskManager; + } + + protected override async ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData) + { + var op = await _operationManager.GetByIdAsync(operationId); + if (op == null) + { + throw new ArgumentException($"Operation {operationId} not found", nameof(operationId)); + } + + return (await _operationTaskManager.GetOrCreateAsync(op, command, Guid.NewGuid(), initiatingTaskId), command); + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs new file mode 100644 index 0000000..bb1a183 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs @@ -0,0 +1,72 @@ + + +#nullable enable + +using System; +using System.Text.Json; +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Commands; +using Microsoft.Extensions.Logging; +using Rebus.Bus; + +namespace Dbosoft.Rebus.Operations +{ + + public abstract class OperationDispatcherBase : IOperationDispatcher + { + private readonly IBus _bus; + private readonly ILogger _logger; + + protected OperationDispatcherBase(IBus bus, ILogger logger) + { + _bus = bus; + _logger = logger; + } + + public ValueTask StartNew(object command) + { + return StartOperation(command, null); + } + + public ValueTask StartNew(object? additionalData = default) + where T : class, new() + { + return StartOperation( Activator.CreateInstance(),additionalData); + } + + + public ValueTask StartNew(Type commandType, object? additionalData = default) + { + return StartOperation(commandType,additionalData); + } + + protected abstract ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData); + + protected async ValueTask StartOperation(object command, object? additionalData) + { + if (command == null) + throw new ArgumentNullException(nameof(command)); + + var(operation, taskCommand) = await CreateOperation(command, additionalData); + + var commandJson = JsonSerializer.Serialize(taskCommand); + + var taskMessage = new CreateNewOperationTaskCommand( + taskCommand.GetType().AssemblyQualifiedName, + commandJson, + operation.Id, + operation.Id, + Guid.NewGuid()); + + var message = new CreateOperationCommand { TaskMessage = taskMessage }; + await _bus.Send(message); + + _logger.LogDebug("Send new command of type {commandType}. Id: {operationId}", + taskCommand.GetType().Name, operation.Id); + + return operation; + + } + + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationTask.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationTask.cs new file mode 100644 index 0000000..3894c01 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationTask.cs @@ -0,0 +1,20 @@ +using System; + +namespace Dbosoft.Rebus.Operations +{ + public class OperationTask : IOperationTaskMessage where T : class, new() + { + public OperationTask(T command, Guid operationId, Guid initiatingTaskId, Guid taskId) + { + Command = command; + OperationId = operationId; + InitiatingTaskId = initiatingTaskId; + TaskId = taskId; + } + + public T Command { get; } + public Guid OperationId { get; } + public Guid InitiatingTaskId { get; } + public Guid TaskId { get; } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs new file mode 100644 index 0000000..2e692b8 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs @@ -0,0 +1,68 @@ + + +#nullable enable + +using System; +using System.Text.Json; +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Commands; +using Microsoft.Extensions.Logging; +using Rebus.Bus; + +namespace Dbosoft.Rebus.Operations; +public abstract class OperationTaskDispatcherBase : IOperationTaskDispatcher +{ + private readonly IBus _bus; + private readonly ILogger _logger; + + protected OperationTaskDispatcherBase(IBus bus, ILogger logger) + { + _bus = bus; + _logger = logger; + } + + public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object command, object? additionalData = null) + { + return StartTask(operationId, initiatingTaskId, command, additionalData); + } + + public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object? additionalData = default) + where T : class, new() + { + return StartTask(operationId, initiatingTaskId, Activator.CreateInstance(), + additionalData); + } + + public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, Type commandType, object? additionalData = default) + { + return StartTask(operationId, initiatingTaskId, commandType, additionalData); + } + + protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData); + + protected async ValueTask StartTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData) + { + if (command == null) + throw new ArgumentNullException(nameof(command)); + + var (task, taskCommand) = await CreateTask(operationId, initiatingTaskId, command, additionalData); + var commandJson = JsonSerializer.Serialize(taskCommand); + + var taskMessage = new CreateNewOperationTaskCommand( + taskCommand.GetType().AssemblyQualifiedName, + commandJson, + operationId, + initiatingTaskId, + task.Id); + + await _bus.Send(taskMessage); + + _logger.LogDebug("Send new command of type {commandType}. Id: {operationId}, ParentTaskId: {parentTaskId}", + taskCommand.GetType().Name, operationId, initiatingTaskId); + + return task; + + } + + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs new file mode 100644 index 0000000..30b7b00 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs @@ -0,0 +1,19 @@ +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Events; +using Rebus.Bus; + +namespace Dbosoft.Rebus.Operations; + +public static class OperationsSetup +{ + public static async Task SubscribeEvents(IBus bus) + { + await bus.Subscribe(); + await bus.Subscribe(); + await bus.Subscribe(); + await bus.Subscribe(); + await bus.Subscribe(); + + return bus; + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj b/src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj new file mode 100644 index 0000000..9b99b6f --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj @@ -0,0 +1,18 @@ + + + + netstandard2.1 + enable + latest + + + + + + + + + + + + diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs new file mode 100644 index 0000000..b478117 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs @@ -0,0 +1,11 @@ +using Dbosoft.Rebus.Operations.Commands; + +namespace Dbosoft.Rebus.Operations.Workflow; + +public class DefaultMessageEnricher : IMessageEnricher +{ + public object? EnrichTaskAcceptedReply(OperationTaskSystemMessage taskMessage) where T : class, new() + { + return null; + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultWorkflow.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultWorkflow.cs new file mode 100644 index 0000000..bf523fe --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultWorkflow.cs @@ -0,0 +1,16 @@ +#nullable enable +namespace Dbosoft.Rebus.Operations.Workflow; + +public class DefaultWorkflow : IWorkflow +{ + public DefaultWorkflow(IOperationManager operation, IOperationTaskManager tasks, IOperationMessaging messaging) + { + Operations = operation; + Tasks = tasks; + Messaging = messaging; + } + + public IOperationManager Operations { get; } + public IOperationTaskManager Tasks { get; } + public IOperationMessaging Messaging { get; } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationStatusEventHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationStatusEventHandler.cs new file mode 100644 index 0000000..94ebfc6 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationStatusEventHandler.cs @@ -0,0 +1,14 @@ +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Events; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.Operations.Workflow; + +public class EmptyOperationStatusEventHandler : IHandleMessages + +{ + public Task Handle(OperationStatusEvent message) + { + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationTaskStatusEventHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationTaskStatusEventHandler.cs new file mode 100644 index 0000000..2895dda --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationTaskStatusEventHandler.cs @@ -0,0 +1,14 @@ +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Events; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.Operations.Workflow; + +public class EmptyOperationTaskStatusEventHandler : IHandleMessages> + where TMessage : class, new() +{ + public Task Handle(OperationTaskStatusEvent message) + { + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs new file mode 100644 index 0000000..f5e7c04 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs @@ -0,0 +1,40 @@ + + +#nullable enable + +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Events; +using Microsoft.Extensions.Logging; +using Rebus.Handlers; +using Rebus.Retry.Simple; + +namespace Dbosoft.Rebus.Operations.Workflow +{ + public class FailedOperationHandler : IHandleMessages> where T: IOperationTaskMessage + { + private readonly IOperationMessaging _operationMessaging; + + private readonly ILogger> _logger; + + public FailedOperationHandler(ILogger> logger, IOperationMessaging operationMessaging) + { + _logger = logger; + _operationMessaging = operationMessaging; + } + + public async Task Handle(IFailed failedMessage) + { + + _logger.LogError("Task {taskId} failed with message: {failedMessage}", + failedMessage.Message.TaskId, failedMessage.ErrorDescription + ); + + await _operationMessaging.DispatchTaskStatusEventAsync( + OperationTaskStatusEvent.Failed( + failedMessage.Message.OperationId, failedMessage.Message.InitiatingTaskId, + failedMessage.Message.TaskId, new ErrorData() { ErrorMessage = failedMessage.ErrorDescription })); + + + } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs new file mode 100644 index 0000000..01ea7e7 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs @@ -0,0 +1,39 @@ +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Events; +using Microsoft.Extensions.Logging; +using Rebus.Bus; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.Operations.Workflow +{ + public class IncomingTaskMessageHandler : IHandleMessages> where T: class, new() + { + private readonly IBus _bus; + private readonly ILogger> _logger; + private readonly IMessageEnricher _messageEnricher; + public IncomingTaskMessageHandler(IBus bus, ILogger> logger, IMessageEnricher messageEnricher) + { + _bus = bus; + _logger = logger; + _messageEnricher = messageEnricher; + } + + public async Task Handle(OperationTaskSystemMessage taskMessage) + { + await _bus.SendLocal(new OperationTask(taskMessage.Message, taskMessage.OperationId, taskMessage.InitiatingTaskId, taskMessage.TaskId)).ConfigureAwait(false); + + _logger.LogTrace($"Accepted incoming operation message. Operation id: '{taskMessage.OperationId}'"); + + var reply = new OperationTaskAcceptedEvent + { + OperationId = taskMessage.OperationId, + InitiatingTaskId = taskMessage.InitiatingTaskId, + TaskId = taskMessage.TaskId, + AdditionalData = _messageEnricher.EnrichTaskAcceptedReply(taskMessage) + }; + + await _bus.Reply(reply).ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs new file mode 100644 index 0000000..822d234 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs @@ -0,0 +1,26 @@ +#nullable enable +using System; +using System.Threading.Tasks; + +namespace Dbosoft.Rebus.Operations.Workflow; + +public abstract class OperationManagerBase : IOperationManager +{ + + public abstract ValueTask GetByIdAsync(Guid operationId); + + public abstract ValueTask GetOrCreateAsync(Guid operationId, object command); + + + public abstract ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, + IOperationTask task, + object? data); + + public abstract ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, + object? additionalData); + + + + + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationSagaData.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationSagaData.cs new file mode 100644 index 0000000..c1e4063 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationSagaData.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using Rebus.Sagas; + +namespace Dbosoft.Rebus.Operations.Workflow +{ + + public class OperationSagaData : ISagaData + { + public Guid OperationId { get; set; } + public Guid PrimaryTaskId { get; set; } + + // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global + public Dictionary Tasks { get; set; } = new(); + public Guid Id { get; set; } + public int Revision { get; set; } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs new file mode 100644 index 0000000..0f407e7 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs @@ -0,0 +1,18 @@ +using System; +using System.Threading.Tasks; + +namespace Dbosoft.Rebus.Operations.Workflow; + +#nullable enable +public abstract class OperationTaskManagerBase : IOperationTaskManager +{ + + public abstract ValueTask GetByIdAsync(Guid taskId); + + public abstract ValueTask GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId); + + public abstract ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, + object? additionalData); + + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs new file mode 100644 index 0000000..ddc5fbf --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs @@ -0,0 +1,58 @@ +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Events; +using JetBrains.Annotations; +using Microsoft.Extensions.Logging; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.Operations.Workflow +{ + [UsedImplicitly] + public class OperationTaskProgressEventHandler : IHandleMessages + { + private readonly IWorkflow _workflow; + private readonly ILogger _logger; + + public OperationTaskProgressEventHandler(IWorkflow workflow, ILogger logger) + { + _workflow = workflow; + _logger = logger; + } + + + public async Task Handle(OperationTaskProgressEvent message) + { + _logger.LogDebug($"Received operation task progress event. Id : '{message.OperationId}/{message.TaskId}'"); + + var operation = await _workflow.Operations + .GetByIdAsync(message.OperationId) + .ConfigureAwait(false); + + var task = await _workflow.Tasks + .GetByIdAsync(message.TaskId) + .ConfigureAwait(false); + + + if (operation != null && task!=null) + { + await _workflow.Operations.AddProgressAsync( + message.Id, + message.Timestamp, + operation, + task, + message.Data); + } + else + { + _logger.LogWarning($"Received operation task progress event for a unknown operation task. Id : '{message.OperationId}/{message.TaskId}'", new + { + message.OperationId, + message.TaskId, + message.Data, + message.Timestamp, + }); + + } + + } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs new file mode 100644 index 0000000..1f0f2ca --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs @@ -0,0 +1,116 @@ +#nullable enable + +using System; +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Events; +using Rebus.Handlers; +using Rebus.Sagas; + +namespace Dbosoft.Rebus.Operations.Workflow +{ + public abstract class OperationTaskWorkflowSaga : Saga, + IAmInitiatedBy>, + IHandleMessages> + where TSagaData : TaskWorkflowSagaData, new() + where TMessage : class, new() + { + protected readonly IWorkflow WorkflowEngine; + + protected OperationTaskWorkflowSaga(IWorkflow workflowEngine) + { + WorkflowEngine = workflowEngine; + } + + + public virtual Task Handle(OperationTask message) + { + Data.OperationId = message.OperationId; + Data.SagaTaskId = message.TaskId; + Data.ParentTaskId = message.InitiatingTaskId; + return Initiated(message.Command); + } + + public Task Handle(OperationTaskStatusEvent message) + { + return message.OperationFailed ? InitiatingTaskFailed() : InitiatingTaskCompleted(); + } + + protected override void CorrelateMessages(ICorrelationConfig config) + { + config.Correlate>(m => m.TaskId, d => d.SagaTaskId); + config.Correlate>(m => m.TaskId, d => d.SagaTaskId); + } + + protected abstract Task Initiated(TMessage message); + + private Task InitiatingTaskCompleted() + { + MarkAsComplete(); + return Task.CompletedTask; + } + + private Task InitiatingTaskFailed() + { + MarkAsComplete(); + return Task.CompletedTask; + } + + protected Task Fail(object? message = null) + { + return WorkflowEngine.Messaging.DispatchTaskStatusEventAsync(OperationTaskStatusEvent.Failed( + Data.OperationId, Data.ParentTaskId, Data.SagaTaskId, message)); + } + + + protected Task Complete(object? message = null) + { + return WorkflowEngine.Messaging.DispatchTaskStatusEventAsync(OperationTaskStatusEvent.Completed( + Data.OperationId, Data.ParentTaskId, Data.SagaTaskId, message)); + } + + protected Task FailOrRun(OperationTaskStatusEvent message, Func completedFunc) + where T : class, new() + { + if (message.OperationFailed) + return Fail(message.GetMessage()); + + return completedFunc(); + } + + protected Task FailOrRun(OperationTaskStatusEvent message, Func completedFunc) + where T : class, new() + where TOpMessage : class + { + return message.OperationFailed + ? Fail(message.GetMessage()) + : completedFunc(message.GetMessage() as TOpMessage + ?? throw new InvalidOperationException( + $"Message {typeof(T)} has not returned a result of type {typeof(TOpMessage)}.")); + } + + + protected ValueTask StartNewTask(object? additionalData = default) where T : class, new() + { + + return WorkflowEngine.Messaging.TaskDispatcher.StartNew(Data.OperationId, Data.SagaTaskId, additionalData); + } + + + protected ValueTask StartNewTask(Type taskCommandType, + object? additionalData = default) + { + return WorkflowEngine.Messaging.TaskDispatcher.StartNew(Data.OperationId, Data.SagaTaskId, taskCommandType, additionalData); + + } + + + protected ValueTask StartNewTask(object command, object? additionalData = default) + { + return WorkflowEngine.Messaging.TaskDispatcher.StartNew(Data.OperationId, Data.SagaTaskId, command, additionalData); + + } + + + + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs new file mode 100644 index 0000000..3ed3d1d --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs @@ -0,0 +1,207 @@ +#nullable enable + +using System; +using System.Text.Json; +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Events; +using JetBrains.Annotations; +using Microsoft.Extensions.Logging; +using Rebus.Handlers; +using Rebus.Sagas; + +namespace Dbosoft.Rebus.Operations.Workflow +{ + [UsedImplicitly] + public class ProcessOperationSaga : Saga, + IAmInitiatedBy, + IHandleMessages, + IHandleMessages, + IHandleMessages, + IHandleMessages + { + private readonly IWorkflow _workflow; + private readonly ILogger _log; + + public ProcessOperationSaga(IWorkflow workflow, ILogger log) + { + _workflow = workflow; + _log = log; + } + + public Task Handle(CreateOperationCommand message) + { + Data.OperationId = message.TaskMessage.OperationId; + Data.PrimaryTaskId = message.TaskMessage.TaskId; + return Handle(message.TaskMessage); + } + + public Task Handle(OperationTimeoutEvent message) + { + return Task.CompletedTask; + } + + protected override void CorrelateMessages(ICorrelationConfig config) + { + config.Correlate(m => m.TaskMessage.OperationId, d => d.OperationId); + config.Correlate(m => m.OperationId, d => d.OperationId); + config.Correlate(m => m.OperationId, d => d.OperationId); + config.Correlate(m => m.OperationId, d => d.OperationId); + config.Correlate(m => m.OperationId, d => d.OperationId); + } + + public async Task Handle(CreateNewOperationTaskCommand message) + { + + var command = JsonSerializer.Deserialize(message.CommandData, + Type.GetType(message.CommandType) ?? + throw new InvalidOperationException($"Operation Workflow {message.OperationId}: unknown command type '{message.CommandType}'")); + + if(command == null) + throw new InvalidOperationException($"Operation Workflow {message.OperationId}: invalid command data in message '{message.CommandType}'"); + + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: creating new task command '{commandType}'", + message.OperationId, message.TaskId, command.GetType()); + + + var op = await _workflow.Operations + .GetByIdAsync(message.OperationId) + .ConfigureAwait(false); + + + if (op == null) + { + _log.LogWarning("Operation Workflow {operationId}: Operation not found - cancelling workflow", + message.OperationId); + MarkAsComplete(); + return; + } + + var task = await _workflow.Tasks + .GetOrCreateAsync(op, command, message.TaskId, message.InitiatingTaskId) + .ConfigureAwait(false); + + + var messageType = Type.GetType(message.CommandType); + if (messageType == null) + throw new InvalidOperationException($"unknown command type '{message.CommandType}'"); + + Data.Tasks.Add(message.TaskId, messageType.AssemblyQualifiedName!); + _workflow.Messaging.DispatchTaskMessage(command,task); + } + + + public async Task Handle(OperationTaskAcceptedEvent message) + { + var op = await _workflow.Operations + .GetByIdAsync(message.OperationId) + .ConfigureAwait(false); + + var task = await _workflow.Tasks + .GetByIdAsync(message.TaskId) + .ConfigureAwait(false); + + if (op == null || task == null) + { + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: could not accept task as it was not found.", + message.OperationId, message.TaskId); + return; + } + + var opOldStatus = op.Status; + if (await _workflow.Operations.TryChangeStatusAsync(op, OperationStatus.Running, null)) + { + _log.LogDebug("Operation Workflow {operationId}: Status changed: {oldStatus} -> {newStatus}", + message.OperationId, opOldStatus, op.Status); + + + await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent + { + OperationId = op.Id, + NewStatus = OperationStatus.Running + }); + + + var taskOldStatus = task.Status; + if (await _workflow.Tasks.TryChangeStatusAsync(task, OperationTaskStatus.Running, + message.AdditionalData)) + { + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Status changed: {oldStatus} -> {newStatus}", + message.OperationId, message.TaskId, taskOldStatus, task.Status); + + } + } + + } + + public async Task Handle(OperationTaskStatusEvent message) + { + var op = await _workflow.Operations + .GetByIdAsync(message.OperationId) + .ConfigureAwait(false); + + var task = await _workflow.Tasks + .GetByIdAsync(message.TaskId) + .ConfigureAwait(false); + + if (op == null || task == null) + { + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: could not update task status as it was not found", + message.OperationId, message.TaskId); + return; + } + + if (task.Status is OperationTaskStatus.Queued or OperationTaskStatus.Running) + { + if(!Data.Tasks.ContainsKey(message.TaskId)) + _log.LogWarning("Operation Workflow {operationId}, Task {taskId}: could not update task status as it was not found in workflow.", + message.OperationId, message.TaskId); + else + { + var taskCommandTypeName = Data.Tasks[message.TaskId]; + await _workflow.Messaging.DispatchTaskStatusEventAsync(taskCommandTypeName, message); + } + } + + var taskOldStatus = task.Status; + if(await _workflow.Tasks.TryChangeStatusAsync(task, + message.OperationFailed + ? OperationTaskStatus.Failed + : OperationTaskStatus.Completed + , message.GetMessage())) + + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Status changed: {oldStatus} -> {newStatus}", + message.OperationId, message.TaskId, taskOldStatus, task.Status); + + + + if (message.TaskId == Data.PrimaryTaskId) + { + + var newStatus = message.OperationFailed + ? OperationStatus.Failed + : OperationStatus.Completed; + + _log.LogDebug("Operation Workflow {operationId}: Primary task changed, updating operation status to {newStatus}", + message.OperationId, newStatus); + + + if (await _workflow.Operations.TryChangeStatusAsync(op, + newStatus, message.GetMessage())) + { + await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent + { + OperationId = op.Id, + NewStatus = newStatus + }); + } + + _log.LogDebug("Operation Workflow {operationId}: Completing workflow", + message.OperationId); + + MarkAsComplete(); + } + } + + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs new file mode 100644 index 0000000..245a923 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Events; +using Rebus.Bus; + +namespace Dbosoft.Rebus.Operations.Workflow; + +public class RebusOperationMessaging : IOperationMessaging +{ + private readonly IBus _bus; + + public RebusOperationMessaging(IBus bus, + IOperationDispatcher operationDispatcher, IOperationTaskDispatcher taskDispatcher) + { + _bus = bus; + OperationDispatcher = operationDispatcher; + TaskDispatcher = taskDispatcher; + } + + public virtual void DispatchTaskMessage(object command, IOperationTask task) + { + var messageType = command.GetType(); + var outboundMessage = Activator.CreateInstance( + typeof(OperationTaskSystemMessage<>).MakeGenericType(messageType), + command, task.OperationId, task.InitiatingTaskId, task.Id); + + _bus.SendLocal(outboundMessage); + } + + public Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message) + { + var genericType = typeof(OperationTaskStatusEvent<>); + var wrappedCommandType = genericType.MakeGenericType(Type.GetType(commandType) + ?? throw new InvalidOperationException( + $"Unknown task command type '{commandType}'.")); + + var commandInstance = Activator.CreateInstance(wrappedCommandType, message); + return _bus.SendLocal(commandInstance); + } + + public Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message) + { + return _bus.Publish(message); + } + + public Task DispatchOperationStatusEventAsync(OperationStatusEvent message) + { + return _bus.Publish(message); + } + + public IOperationDispatcher OperationDispatcher { get; } + public IOperationTaskDispatcher TaskDispatcher { get; } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/TaskWorkflowSagaData.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/TaskWorkflowSagaData.cs new file mode 100644 index 0000000..134aa66 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/TaskWorkflowSagaData.cs @@ -0,0 +1,18 @@ +using System; +using Rebus.Sagas; + +namespace Dbosoft.Rebus.Operations.Workflow +{ + public class TaskWorkflowSagaData : ISagaData + { + public Guid OperationId { get; set; } + + public Guid SagaTaskId { get; set; } + public Guid ParentTaskId { get; set; } + + + // these two are required by Rebus + public Guid Id { get; set; } + public int Revision { get; set; } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs new file mode 100644 index 0000000..6a35a2b --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs @@ -0,0 +1,29 @@ +using System; + +namespace Dbosoft.Rebus.Operations.Commands +{ + public class CreateNewOperationTaskCommand : IOperationTaskMessage + { + // ReSharper disable once UnusedMember.Global + public CreateNewOperationTaskCommand() + { + } + + public CreateNewOperationTaskCommand(string commandType, string commandData, + Guid operationId, Guid initiatingTaskId, Guid taskId) + { + CommandType = commandType; + CommandData = commandData; + OperationId = operationId; + InitiatingTaskId = initiatingTaskId; + TaskId = taskId; + } + + public string? CommandData { get; set; } + public string? CommandType { get; set; } + + public Guid OperationId { get; set; } + public Guid InitiatingTaskId { get; set; } + public Guid TaskId { get; set; } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateOperationCommand.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateOperationCommand.cs new file mode 100644 index 0000000..af1af56 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateOperationCommand.cs @@ -0,0 +1,7 @@ +namespace Dbosoft.Rebus.Operations.Commands +{ + public class CreateOperationCommand + { + public CreateNewOperationTaskCommand? TaskMessage { get; set; } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs new file mode 100644 index 0000000..600c5c7 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs @@ -0,0 +1,33 @@ +using System; + +namespace Dbosoft.Rebus.Operations.Commands +{ + + /// + /// Generic message that wraps a task message + /// + public class OperationTaskSystemMessage : IOperationTaskMessage + { + + // ReSharper disable once UnusedMember.Global + public OperationTaskSystemMessage() + { + } + + public OperationTaskSystemMessage(TMessage message, Guid operationId, Guid initiatingTaskId, Guid taskId) + { + Message = message; + OperationId = operationId; + InitiatingTaskId = initiatingTaskId; + TaskId = taskId; + } + + public TMessage? Message { get; set; } + + public Guid OperationId { get; set; } + public Guid InitiatingTaskId { get; set; } + + + public Guid TaskId { get; set; } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/ErrorData.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/ErrorData.cs new file mode 100644 index 0000000..923b8e8 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/ErrorData.cs @@ -0,0 +1,10 @@ +using JetBrains.Annotations; + +namespace Dbosoft.Rebus.Operations; + +[PublicAPI] +public class ErrorData +{ + public string? ErrorMessage { get; set; } + public object? AdditionalData { get; set; } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationStatusEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationStatusEvent.cs new file mode 100644 index 0000000..6d7df1b --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationStatusEvent.cs @@ -0,0 +1,9 @@ +using System; + +namespace Dbosoft.Rebus.Operations.Events; + +public class OperationStatusEvent +{ + public Guid OperationId { get; set; } + public OperationStatus NewStatus { get; set; } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs new file mode 100644 index 0000000..3e2af39 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs @@ -0,0 +1,13 @@ +using System; + +namespace Dbosoft.Rebus.Operations.Events +{ + public class OperationTaskAcceptedEvent : IOperationTaskMessage + { + public Guid OperationId { get; set; } + public Guid InitiatingTaskId { get; set; } + public Guid TaskId { get; set; } + + public object? AdditionalData { get; set; } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskProgressEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskProgressEvent.cs new file mode 100644 index 0000000..d95d3e8 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskProgressEvent.cs @@ -0,0 +1,14 @@ +using System; + +namespace Dbosoft.Rebus.Operations.Events +{ + public class OperationTaskProgressEvent + { + public Guid Id { get; set; } + + public Guid OperationId { get; set; } + public Guid TaskId { get; set; } + public object? Data { get; set; } + public DateTimeOffset Timestamp { get; set; } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs new file mode 100644 index 0000000..11db0c2 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs @@ -0,0 +1,58 @@ +using System; + +namespace Dbosoft.Rebus.Operations.Events +{ + public class OperationTaskStatusEvent : OperationTaskStatusEventBase + { + // ReSharper disable once UnusedMember.Global + // required for serialization + public OperationTaskStatusEvent() + { + } + + protected OperationTaskStatusEvent(Guid operationId, Guid initiatingTaskId, Guid taskId, bool failed, + string? messageType, + string? messageData) : base(operationId, initiatingTaskId, taskId, failed, messageType, messageData) + { + + } + + public static OperationTaskStatusEvent Failed(Guid operationId, Guid initiatingTaskId, Guid taskId) + { + return new OperationTaskStatusEvent(operationId, initiatingTaskId, taskId, true, null, null); + } + + public static OperationTaskStatusEvent Failed(Guid operationId, Guid initiatingTaskId, Guid taskId, object? message) + { + var (data, typeName) = SerializeMessage(message); + return new OperationTaskStatusEvent(operationId, initiatingTaskId, taskId, true, typeName, data); + } + + public static OperationTaskStatusEvent Completed(Guid operationId, Guid initiatingTaskId, Guid taskId) + { + return new OperationTaskStatusEvent(operationId, initiatingTaskId, taskId, false, null, null); + } + + public static OperationTaskStatusEvent Completed(Guid operationId, Guid initiatingTaskId, Guid taskId, object? message) + { + var (data, typeName) = SerializeMessage(message); + return new OperationTaskStatusEvent(operationId, initiatingTaskId,taskId, false, typeName, data); + } + + } + + + // ReSharper disable once UnusedTypeParameter + public class OperationTaskStatusEvent : OperationTaskStatusEventBase where T : class, new() + { + + public OperationTaskStatusEvent() + { + } + + public OperationTaskStatusEvent(OperationTaskStatusEvent message) : + base(message.OperationId, message.InitiatingTaskId, message.TaskId, message.OperationFailed, message.MessageType, message.MessageData) + { + } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs new file mode 100644 index 0000000..b60ff65 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs @@ -0,0 +1,71 @@ +using System; +using System.Text.Json; + +namespace Dbosoft.Rebus.Operations.Events; + +#nullable enable +public class OperationTaskStatusEventBase : IOperationTaskMessage +{ + public OperationTaskStatusEventBase() {} + + protected OperationTaskStatusEventBase(Guid operationId, Guid initiatingTaskId, Guid taskId, bool failed, string? messageType, + string? messageData) + { + OperationId = operationId; + InitiatingTaskId = initiatingTaskId; + TaskId = taskId; + OperationFailed = failed; + MessageData = messageData; + MessageType = messageType; + } + + public string? MessageData { get; set; } + public string? MessageType { get; set; } + public bool OperationFailed { get; set; } + public Guid OperationId { get; set; } + public Guid TaskId { get; set; } + public Guid InitiatingTaskId { get; set; } + + protected static (string? data, string? type) SerializeMessage(object? message) + { + if (message == null) + return (null, null); + + + return (JsonSerializer.Serialize(message), message.GetType().AssemblyQualifiedName); + } + + public object? GetMessage() + { + if (MessageData == null || MessageType == null) + return null; + + var type = Type.GetType(MessageType); + + return type == null + ? null + : JsonSerializer.Deserialize(MessageData, type); + } + + public T? GetErrorDetails() + { + if (MessageData == null || MessageType == null) + return default; + + var type = Type.GetType(MessageType); + + if (type == null) return default; + + if (type != typeof(ErrorData) && !type.IsSubclassOf(typeof(ErrorData))) return default; + + var data = JsonSerializer.Deserialize(MessageData, type) as ErrorData; + + if ( data?.AdditionalData is JsonElement element) + { + return element.Deserialize(); + } + + return default; + + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs new file mode 100644 index 0000000..2302360 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs @@ -0,0 +1,9 @@ +using System; + +namespace Dbosoft.Rebus.Operations.Events +{ + public class OperationTimeoutEvent + { + public Guid OperationId { get; set; } + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs new file mode 100644 index 0000000..e7af6ac --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs @@ -0,0 +1,17 @@ +using System; + +namespace Dbosoft.Rebus.Operations +{ + /// + /// Interface for all Messages for operation tasks + /// + public interface IOperationTaskMessage + { + Guid OperationId { get; } + + Guid InitiatingTaskId { get; } + + Guid TaskId { get; } + } + +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/OperationStatus.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/OperationStatus.cs new file mode 100644 index 0000000..00eb5bd --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/OperationStatus.cs @@ -0,0 +1,9 @@ +namespace Dbosoft.Rebus.Operations; + +public enum OperationStatus +{ + Queued, + Running, + Failed, + Completed +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/OperationTaskStatus.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/OperationTaskStatus.cs new file mode 100644 index 0000000..ac905c0 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/OperationTaskStatus.cs @@ -0,0 +1,9 @@ +namespace Dbosoft.Rebus.Operations; + +public enum OperationTaskStatus +{ + Queued, + Running, + Failed, + Completed +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Rebus.Operations.Primitives.csproj b/src/Rebus.Operations/Rebus.Operations.Primitives/Rebus.Operations.Primitives.csproj new file mode 100644 index 0000000..003815b --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Rebus.Operations.Primitives.csproj @@ -0,0 +1,14 @@ + + + + netstandard2.1 + enable + latest + + + + + + + + diff --git a/src/Rebus.Operations/Rebus.Operations.SimpleInjector/Rebus.Operations.SimpleInjector.csproj b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/Rebus.Operations.SimpleInjector.csproj new file mode 100644 index 0000000..0458ec1 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/Rebus.Operations.SimpleInjector.csproj @@ -0,0 +1,18 @@ + + + + netstandard2.1 + enable + latest + + + + + + + + + + + + diff --git a/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs new file mode 100644 index 0000000..03a7dad --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs @@ -0,0 +1,56 @@ +using System; +using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Events; +using Dbosoft.Rebus.Operations.Workflow; +using Rebus.Config; +using Rebus.Handlers; +using Rebus.Routing.TypeBased; +using SimpleInjector; + +namespace Dbosoft.Rebus.Operations; + +public static class SimpleInjectorExtensions +{ + public static RebusConfigurer AddOperations(this RebusConfigurer configurer, string queueName) + { + return configurer.Routing(x => x.TypeBased() + .Map(queueName) + .Map(queueName) + .Map(queueName) + .Map(queueName) + .Map(queueName) + .Map(queueName) + .Map(queueName)); + } + + public static Container AddRebusOperationsHandlers(this Container container) + where TOpManager : IOperationManager + where TTaskManager: IOperationTaskManager + { + return container.AddRebusOperationsHandlers(typeof(TOpManager), typeof(TTaskManager)); + } + + public static Container AddRebusOperationsHandlers(this Container container, + Type operationManagerType, + Type taskManagerType ) + { + //workflow engine types + container.Register(typeof(IOperationManager), operationManagerType, Lifestyle.Scoped); + container.Register(typeof(IOperationTaskManager), taskManagerType, Lifestyle.Scoped); + container.RegisterConditional(Lifestyle.Scoped,c=> !c.Handled); + container.RegisterConditional(Lifestyle.Scoped,c=> !c.Handled); + container.RegisterConditional(Lifestyle.Scoped,c=> !c.Handled); + + container.Register(Lifestyle.Scoped); + container.RegisterConditional(Lifestyle.Scoped, c=> !c.Handled); + + container.Collection.Append(typeof(IHandleMessages<>), typeof(ProcessOperationSaga), Lifestyle.Scoped); + container.Collection.Append(typeof(IHandleMessages<>), typeof(OperationTaskProgressEventHandler), Lifestyle.Scoped); + container.Collection.Append(typeof(IHandleMessages<>), typeof(FailedOperationHandler<>), Lifestyle.Scoped); + container.Collection.Append(typeof(IHandleMessages<>), typeof(IncomingTaskMessageHandler<>), Lifestyle.Scoped); + container.Collection.Append(typeof(IHandleMessages<>), typeof(EmptyOperationStatusEventHandler), Lifestyle.Scoped); + container.Collection.Append(typeof(IHandleMessages<>), typeof(EmptyOperationTaskStatusEventHandler<>), Lifestyle.Scoped); + + return container; + } +} \ No newline at end of file diff --git a/src/Rebus.SimpleInjector/IRebusUnitOfWork.cs b/src/Rebus.SimpleInjector/IRebusUnitOfWork.cs new file mode 100644 index 0000000..4025e78 --- /dev/null +++ b/src/Rebus.SimpleInjector/IRebusUnitOfWork.cs @@ -0,0 +1,7 @@ +namespace Dbosoft.Rebus +{ + public interface IRebusUnitOfWork : IAsyncDisposable, IDisposable + { + public Task Commit(); + } +} \ No newline at end of file diff --git a/src/Rebus.SimpleInjector/Rebus.SimpleInjector.csproj b/src/Rebus.SimpleInjector/Rebus.SimpleInjector.csproj new file mode 100644 index 0000000..6b167f9 --- /dev/null +++ b/src/Rebus.SimpleInjector/Rebus.SimpleInjector.csproj @@ -0,0 +1,17 @@ + + + + netstandard2.1 + enable + enable + Dbosoft.Rebus + latest + + + + + + + + + diff --git a/src/Rebus.SimpleInjector/RebusUnitOfWorkAdapter.cs b/src/Rebus.SimpleInjector/RebusUnitOfWorkAdapter.cs new file mode 100644 index 0000000..8abe81f --- /dev/null +++ b/src/Rebus.SimpleInjector/RebusUnitOfWorkAdapter.cs @@ -0,0 +1,25 @@ +using System.Diagnostics; +using Rebus.Pipeline; +using SimpleInjector; + +namespace Dbosoft.Rebus +{ + public class RebusUnitOfWorkAdapter + { + public async Task Dispose(IMessageContext context) + { + var scope = context.TransactionContext.Items["SI_scope"] as Scope; + Debug.Assert(scope != null); + + await scope.GetInstance().DisposeAsync().ConfigureAwait(false); + } + + public async Task Commit(IMessageContext context) + { + var scope = context.TransactionContext.Items["SI_scope"] as Scope; + Debug.Assert(scope != null); + + await scope.GetInstance().Commit().ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/src/Rebus.SimpleInjector/SimpleInjectorConfigurationExtensions.cs b/src/Rebus.SimpleInjector/SimpleInjectorConfigurationExtensions.cs new file mode 100644 index 0000000..e1c4053 --- /dev/null +++ b/src/Rebus.SimpleInjector/SimpleInjectorConfigurationExtensions.cs @@ -0,0 +1,163 @@ +using Rebus.Bus; +using Rebus.Bus.Advanced; +using Rebus.Config; +using Rebus.Messages; +using Rebus.Pipeline; +using Rebus.Transport; +using SimpleInjector; + +namespace Dbosoft.Rebus +{ + /// + /// Configuration extensions for configuring a Rebus endpoint in your SimpleInjector container + /// + public static class SimpleInjectorConfigurationExtensions + { + /// + /// Makes the necessary registrations in the container, registering the passed-in + /// + /// as a configuration callback. The callback is invoked with a which must have + /// its method called at the end, returning the resulting + /// instance. + /// The configuration callback is called the first time the bus is resolved, which may be done manually or simply by + /// calling + /// + /// + public static void ConfigureRebus(this Container container, Func configurationCallback) + { + if (container.GetCurrentRegistrations().Any(r => r.ServiceType == typeof(IBus))) + throw new InvalidOperationException( + "Cannot register IBus in the container because it has already been registered. If you want to host multiple Rebus instances in a single process, please use separate container instances for them."); + + container.Register(() => + { + if (container.IsVerifying) return new FakeSyncBus(); + + return container.GetInstance().Advanced.SyncBus; + }); + + container.Register(() => + { + var currentMessageContext = MessageContext.Current; + + if (currentMessageContext != null) return currentMessageContext; + + if (container.IsVerifying) return new FakeMessageContext(); + + throw new InvalidOperationException( + "Attempted to inject the current message context from MessageContext.Current, but it was null! Did you attempt to resolve IMessageContext from outside of a Rebus message handler?"); + }); + + container.Register(() => + { + var containerAdapter = new SimpleInjectorContainerAdapter(container); + var rebusConfigurer = Configure.With(containerAdapter); + return configurationCallback(rebusConfigurer); + }, Lifestyle.Singleton); + } + + /// + /// After having configured the bus with the bus may be started by calling this method + /// + public static void StartBus(this Container container) + { + container.GetInstance(); + } + + private class FakeSyncBus : ISyncBus + { + public void SendLocal(object commandMessage, IDictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void Send(object commandMessage, IDictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void Reply(object replyMessage, IDictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void Defer(TimeSpan delay, object message, IDictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void DeferLocal(TimeSpan delay, object message, IDictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void Subscribe() + { + throw new NotImplementedException(); + } + + public void Subscribe(Type eventType) + { + throw new NotImplementedException(); + } + + public void Unsubscribe() + { + throw new NotImplementedException(); + } + + public void Unsubscribe(Type eventType) + { + throw new NotImplementedException(); + } + + public void Publish(object eventMessage, IDictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void SendLocal(object commandMessage, Dictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void Send(object commandMessage, Dictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void Reply(object replyMessage, Dictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void Defer(TimeSpan delay, object message, Dictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void DeferLocal(TimeSpan delay, object message, Dictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + + public void Publish(object eventMessage, Dictionary optionalHeaders = null) + { + throw new NotImplementedException(); + } + } + + /// + /// Fake implementation of that can be returned by SimpleInjector while verifying the + /// configuration + /// + private class FakeMessageContext : IMessageContext + { + public ITransactionContext TransactionContext { get; } + public IncomingStepContext IncomingStepContext { get; } + public TransportMessage TransportMessage { get; } + public Message Message { get; } + public Dictionary Headers { get; } + } + } +} \ No newline at end of file diff --git a/src/Rebus.SimpleInjector/SimpleInjectorContainerAdapter.cs b/src/Rebus.SimpleInjector/SimpleInjectorContainerAdapter.cs new file mode 100644 index 0000000..c952d4b --- /dev/null +++ b/src/Rebus.SimpleInjector/SimpleInjectorContainerAdapter.cs @@ -0,0 +1,82 @@ +using Rebus.Activation; +using Rebus.Bus; +using Rebus.Handlers; +using Rebus.Transport; +using SimpleInjector; +using SimpleInjector.Lifestyles; + +// ReSharper disable ArgumentsStyleLiteral +#pragma warning disable 1998 + +namespace Dbosoft.Rebus +{ + internal class SimpleInjectorContainerAdapter : IContainerAdapter + { + private readonly Container _container; + + private bool _busWasSet; + + /// + /// Constructs the container adapter + /// + public SimpleInjectorContainerAdapter(Container container) + { + _container = container ?? throw new ArgumentNullException(nameof(container)); + } + + /// + /// Resolves all handlers for the given message type + /// + public async Task>> GetHandlers(TMessage message, + ITransactionContext transactionContext) + { + var scope = AsyncScopedLifestyle.BeginScope(_container); + if (TryGetInstance>>(_container, out var handlerInstances)) + { + var handlerList = handlerInstances.ToList(); + transactionContext.Items["SI_scope"] = scope; + + transactionContext.OnDisposed(ctx => + { + foreach (var disposable in handlerList.OfType()) disposable.Dispose(); + + scope.Dispose(); + }); + + return handlerList; + } + + scope.Dispose(); + + return Array.Empty>(); + } + + public void SetBus(IBus bus) + { + // hack: this is just to satisfy the contract test... we are pretty sure that + // 1. the SetBus method is not called twice, becauwe + // 2. we are the only ones who create the instance of the container adapter, because + // 3. the container adapter class is internal + if (_busWasSet) + throw new InvalidOperationException( + "SetBus was called twice on the container adapter. This is a sign that something has gone wrong during the configuration process."); + _busWasSet = true; + + // 2nd hack: + // again: we control calls to SetBus in this container adapter, because we create it... + // so, to make the contract tests happy, we need to do this: + var actualBusType = bus.GetType(); + + if (actualBusType.Name == "FakeBus" && actualBusType.DeclaringType?.Name == "ContainerTests`1") + bus.Dispose(); + } + + private static bool TryGetInstance(Container container, out TService instance) + where TService : class + { + IServiceProvider provider = container; + instance = (TService) provider.GetService(typeof(TService)); + return instance != null; + } + } +} \ No newline at end of file diff --git a/src/Rebus.SimpleInjector/SimpleInjectorUnitOfWork.cs b/src/Rebus.SimpleInjector/SimpleInjectorUnitOfWork.cs new file mode 100644 index 0000000..bd0878d --- /dev/null +++ b/src/Rebus.SimpleInjector/SimpleInjectorUnitOfWork.cs @@ -0,0 +1,33 @@ +using Rebus.Config; +using Rebus.Pipeline; + +namespace Dbosoft.Rebus +{ + public static class SimpleInjectorUnitOfWork + { + public static void EnableSimpleInjectorUnitOfWork( + this OptionsConfigurer configurer) + { + configurer.EnableAsyncUnitOfWork(Create, Commit, dispose: Dispose); + } + + private static Task Create(IMessageContext context) + { + var unitOfWork = new RebusUnitOfWorkAdapter(); + // stash current unit of work in the transaction context's items + context.TransactionContext.Items["uow"] = unitOfWork; + + return Task.FromResult(unitOfWork); + } + + private static Task Commit(IMessageContext context, RebusUnitOfWorkAdapter uow) + { + return uow.Commit(context); + } + + private static Task Dispose(IMessageContext context, RebusUnitOfWorkAdapter uow) + { + return uow.Dispose(context); + } + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/MultiStepCommand.cs b/test/Rebus.Operations.Tests/MultiStepCommand.cs new file mode 100644 index 0000000..2ee95fb --- /dev/null +++ b/test/Rebus.Operations.Tests/MultiStepCommand.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.Operations.Tests; + +public class MultiStepCommand +{ + +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/MultiStepSaga.cs b/test/Rebus.Operations.Tests/MultiStepSaga.cs new file mode 100644 index 0000000..87aee27 --- /dev/null +++ b/test/Rebus.Operations.Tests/MultiStepSaga.cs @@ -0,0 +1,41 @@ +using System.Runtime.InteropServices.ComTypes; +using Dbosoft.Rebus.Operations.Events; +using Dbosoft.Rebus.Operations.Workflow; +using Rebus.Handlers; +using Rebus.Sagas; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class MultiStepSaga : + OperationTaskWorkflowSaga, + IHandleMessages>, + IHandleMessages> +{ + public MultiStepSaga(IWorkflow workflowEngine) : base(workflowEngine) + { + } + + protected override void CorrelateMessages(ICorrelationConfig config) + { + config.Correlate>(m => m.InitiatingTaskId, d => d.SagaTaskId); + config.Correlate>(m => m.InitiatingTaskId, d => d.SagaTaskId); + + base.CorrelateMessages(config); + } + + protected override Task Initiated(MultiStepCommand message) + { + return StartNewTask().AsTask(); + } + + public Task Handle(OperationTaskStatusEvent message) + { + return FailOrRun(message, + () => StartNewTask().AsTask()); + } + + public Task Handle(OperationTaskStatusEvent message) + { + return FailOrRun(message, () => Complete()); + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/MultiStepSagaData.cs b/test/Rebus.Operations.Tests/MultiStepSagaData.cs new file mode 100644 index 0000000..2eea4ce --- /dev/null +++ b/test/Rebus.Operations.Tests/MultiStepSagaData.cs @@ -0,0 +1,8 @@ +using Dbosoft.Rebus.Operations.Workflow; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class MultiStepSagaData : TaskWorkflowSagaData +{ + +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/Rebus.Operations.Tests.csproj b/test/Rebus.Operations.Tests/Rebus.Operations.Tests.csproj new file mode 100644 index 0000000..2623b76 --- /dev/null +++ b/test/Rebus.Operations.Tests/Rebus.Operations.Tests.csproj @@ -0,0 +1,17 @@ + + + + net6.0 + enable + enable + + + + + + + + + + + diff --git a/test/Rebus.Operations.Tests/RebusTestBase.cs b/test/Rebus.Operations.Tests/RebusTestBase.cs new file mode 100644 index 0000000..394d74d --- /dev/null +++ b/test/Rebus.Operations.Tests/RebusTestBase.cs @@ -0,0 +1,79 @@ +using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Events; +using Dbosoft.Rebus.Operations.Workflow; +using Microsoft.Extensions.Logging.Abstractions; +using Rebus.Activation; +using Rebus.Bus; +using Rebus.Config; +using Rebus.Persistence.InMem; +using Rebus.Retry.Simple; +using Rebus.Routing.TypeBased; +using Rebus.Transport.InMem; +using Xunit.Abstractions; + +namespace Dbosoft.Rebus.Operations.Tests; + +public abstract class RebusTestBase +{ + private readonly ITestOutputHelper _output; + + protected RebusTestBase(ITestOutputHelper output) + { + _output = output; + } + + public async Task SetupRebus( + + Action configureActivator) + { + var rebusNetwork = new InMemNetwork(); + + var opManager = new TestOperationManager(); + var taskManager = new TestTaskManager(); + + var activator = new BuiltinHandlerActivator(); + var busStarter = + Configure.With(activator) + .Options(o => + o.SimpleRetryStrategy(maxDeliveryAttempts:1, secondLevelRetriesEnabled:true)) + .Transport(cfg => cfg.UseInMemoryTransport(rebusNetwork, "main")) + .Routing(r => r.TypeBased() + .Map("main") + .Map("main") + .Map("main") + .Map("main") + .Map("main") + .Map("main") + .Map("main") + ) + .Sagas(x => x.StoreInMemory()) + .Subscriptions(x=>x.StoreInMemory()) + .Logging(x=>x.Use(new RebusTestLogging(_output))) + .Create(); + + + var opDispatcher = new DefaultOperationDispatcher(busStarter.Bus, + NullLogger.Instance, opManager); + + var taskDispatcher = new DefaultOperationTaskDispatcher(busStarter.Bus, + NullLogger.Instance, + opManager, taskManager); + + var workflow = new DefaultWorkflow( + opManager, taskManager, new RebusOperationMessaging(busStarter.Bus, + opDispatcher, taskDispatcher)); + + activator.Register(() => new ProcessOperationSaga(workflow, NullLogger.Instance)); + activator.Register(() => + new OperationTaskProgressEventHandler(workflow, NullLogger.Instance)); + + configureActivator(activator,workflow, busStarter.Bus); + + var bus = busStarter.Start(); + await OperationsSetup.SubscribeEvents(busStarter.Bus); + + return new TestRebusSetup(bus, opDispatcher, opManager, taskManager); + + } + +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/RebusTestLogging.cs b/test/Rebus.Operations.Tests/RebusTestLogging.cs new file mode 100644 index 0000000..60e7767 --- /dev/null +++ b/test/Rebus.Operations.Tests/RebusTestLogging.cs @@ -0,0 +1,62 @@ +using Rebus.Logging; +using Xunit.Abstractions; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class RebusTestLogging : AbstractRebusLoggerFactory +{ + private readonly ITestOutputHelper _testOutputHelper; + + public RebusTestLogging(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + protected override ILog GetLogger(Type type) + { + return new Log(_testOutputHelper, this); + } + + public class Log : ILog + { + private readonly ITestOutputHelper _testOutputHelper; + private readonly RebusTestLogging _factory; + + public Log(ITestOutputHelper testOutputHelper, RebusTestLogging factory) + { + _testOutputHelper = testOutputHelper; + _factory = factory; + } + + public void Debug(string message, params object[] objs) + { + _testOutputHelper.WriteLine("DBG: "+ _factory.RenderString(message, objs)); + } + + public void Info(string message, params object[] objs) + { + _testOutputHelper.WriteLine("INF: "+ _factory.RenderString(message, objs)); + } + + public void Warn(string message, params object[] objs) + { + _testOutputHelper.WriteLine("WRN: "+ _factory.RenderString(message, objs)); + } + + public void Warn(Exception exception, string message, params object[] objs) + { + _testOutputHelper.WriteLine("WRN: "+ _factory.RenderString(message, objs) + exception); + } + + public void Error(string message, params object[] objs) + { + _testOutputHelper.WriteLine("ERR: "+ _factory.RenderString(message, objs)); + } + + public void Error(Exception exception, string message, params object[] objs) + { + _testOutputHelper.WriteLine("ERR: "+ _factory.RenderString(message, objs) + exception); + } + + + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/StepOneCommand.cs b/test/Rebus.Operations.Tests/StepOneCommand.cs new file mode 100644 index 0000000..c3cee58 --- /dev/null +++ b/test/Rebus.Operations.Tests/StepOneCommand.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.Operations.Tests; + +public class StepOneCommand +{ + +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/StepOneCommandHandler.cs b/test/Rebus.Operations.Tests/StepOneCommandHandler.cs new file mode 100644 index 0000000..5f78dcb --- /dev/null +++ b/test/Rebus.Operations.Tests/StepOneCommandHandler.cs @@ -0,0 +1,22 @@ +using Rebus.Bus; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class StepOneCommandHandler : IHandleMessages> +{ + private readonly IBus _bus; + + public StepOneCommandHandler(IBus bus) + { + _bus = bus; + } + + public static bool Called { get; set; } + + public Task Handle(OperationTask message) + { + Called = true; + return _bus.CompleteTask(message); + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/StepTwoCommand.cs b/test/Rebus.Operations.Tests/StepTwoCommand.cs new file mode 100644 index 0000000..7deb9bc --- /dev/null +++ b/test/Rebus.Operations.Tests/StepTwoCommand.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.Operations.Tests; + +public class StepTwoCommand +{ + +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs b/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs new file mode 100644 index 0000000..ebfaa49 --- /dev/null +++ b/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs @@ -0,0 +1,22 @@ +using Rebus.Bus; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class StepTwoCommandHandler : IHandleMessages> +{ + private readonly IBus _bus; + + public StepTwoCommandHandler(IBus bus) + { + _bus = bus; + } + + public static bool Called { get; set; } + + public Task Handle(OperationTask message) + { + Called = true; + return _bus.CompleteTask(message); + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestCommand.cs b/test/Rebus.Operations.Tests/TestCommand.cs new file mode 100644 index 0000000..5248379 --- /dev/null +++ b/test/Rebus.Operations.Tests/TestCommand.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.Operations.Tests; + +public class TestCommand +{ + +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestCommandHandler.cs b/test/Rebus.Operations.Tests/TestCommandHandler.cs new file mode 100644 index 0000000..c4a830a --- /dev/null +++ b/test/Rebus.Operations.Tests/TestCommandHandler.cs @@ -0,0 +1,22 @@ +using Dbosoft.Rebus.Operations.Workflow; +using Rebus.Bus; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class TestCommandHandler : IHandleMessages> +{ + private readonly IBus _bus; + public bool Called { get; private set; } + + public TestCommandHandler(IBus bus) + { + _bus = bus; + } + + public Task Handle(OperationTask message) + { + Called = true; + return _bus.CompleteTask(message); + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs b/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs new file mode 100644 index 0000000..7400f34 --- /dev/null +++ b/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs @@ -0,0 +1,24 @@ +using Rebus.Bus; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class TestCommandHandlerWithError : IHandleMessages> +{ + private readonly IBus _bus; + private readonly bool _throws; + + public TestCommandHandlerWithError(IBus bus, bool throws) + { + _bus = bus; + _throws = throws; + } + + public async Task Handle(OperationTask message) + { + if (_throws) + throw new InvalidOperationException(); + + await _bus.FailTask(message, "error"); + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs b/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs new file mode 100644 index 0000000..e14e2d6 --- /dev/null +++ b/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs @@ -0,0 +1,21 @@ +using Rebus.Bus; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class TestCommandHandlerWithProgress : IHandleMessages> +{ + private readonly IBus _bus; + public TestCommandHandlerWithProgress(IBus bus) + { + _bus = bus; + } + + public async Task Handle(OperationTask message) + { + await _bus.ProgressMessage(message, "progressData"); + await Task.Delay(500); + + await _bus.CompleteTask(message); + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestOperationManager.cs b/test/Rebus.Operations.Tests/TestOperationManager.cs new file mode 100644 index 0000000..4951536 --- /dev/null +++ b/test/Rebus.Operations.Tests/TestOperationManager.cs @@ -0,0 +1,58 @@ +using Dbosoft.Rebus.Operations.Workflow; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class TestOperationManager : OperationManagerBase +{ + public static readonly Dictionary Operations = new(); + public static readonly Dictionary> Progress = new(); + + public static void Reset() + { + Operations.Clear(); + Progress.Clear(); + } + + public override ValueTask GetByIdAsync(Guid operationId) + { + return ValueTask.FromResult( + Operations.ContainsKey(operationId) + ? Operations[operationId] as IOperation : null); + + } + + public override ValueTask GetOrCreateAsync(Guid operationId, object command) + { + if (Operations.ContainsKey(operationId)) + return GetByIdAsync(operationId)!; + + var op = new TestOperationModel + { + Id = operationId + }; + Operations.Add(operationId, op); + + return new ValueTask(op); + } + + public override ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData) + { + if (!Operations.ContainsKey(operation.Id)) + return new ValueTask(false); + + Operations[operation.Id].Status = newStatus; + + return new ValueTask(true); + } + + public override ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, + object? data) + { + if(!Progress.ContainsKey(progressId)) + Progress.Add(operation.Id, new List()); + + if(data!= null) + Progress[operation.Id].Add(data); + return ValueTask.CompletedTask; + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestOperationModel.cs b/test/Rebus.Operations.Tests/TestOperationModel.cs new file mode 100644 index 0000000..5f2dc7e --- /dev/null +++ b/test/Rebus.Operations.Tests/TestOperationModel.cs @@ -0,0 +1,7 @@ +namespace Dbosoft.Rebus.Operations.Tests; + +public class TestOperationModel : IOperation +{ + public Guid Id { get; set; } + public OperationStatus Status { get; set; } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestOperationTaskModel.cs b/test/Rebus.Operations.Tests/TestOperationTaskModel.cs new file mode 100644 index 0000000..06c2638 --- /dev/null +++ b/test/Rebus.Operations.Tests/TestOperationTaskModel.cs @@ -0,0 +1,9 @@ +namespace Dbosoft.Rebus.Operations.Tests; + +public class TestOperationTaskModel : IOperationTask +{ + public Guid Id { get; set; } + public Guid OperationId { get; set; } + public Guid InitiatingTaskId { get; set; } + public OperationTaskStatus Status { get; set; } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestRebusSetup.cs b/test/Rebus.Operations.Tests/TestRebusSetup.cs new file mode 100644 index 0000000..aadc1e0 --- /dev/null +++ b/test/Rebus.Operations.Tests/TestRebusSetup.cs @@ -0,0 +1,11 @@ +using Rebus.Bus; + +namespace Dbosoft.Rebus.Operations.Tests; + +public record TestRebusSetup(IBus Bus, IOperationDispatcher OperationDispatcher, TestOperationManager OperationManager, TestTaskManager TaskManager) : IDisposable +{ + public void Dispose() + { + Bus.Dispose(); + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestTaskManager.cs b/test/Rebus.Operations.Tests/TestTaskManager.cs new file mode 100644 index 0000000..5d71a51 --- /dev/null +++ b/test/Rebus.Operations.Tests/TestTaskManager.cs @@ -0,0 +1,47 @@ +using Dbosoft.Rebus.Operations.Workflow; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class TestTaskManager : OperationTaskManagerBase +{ + public static readonly Dictionary Tasks = new(); + + public static void Reset() + { + Tasks.Clear(); + } + + public override ValueTask GetByIdAsync(Guid taskId) + { + return ValueTask.FromResult( + Tasks.ContainsKey(taskId) + ? Tasks[taskId] as IOperationTask : null); + + } + + public override ValueTask GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId) + { + if (Tasks.ContainsKey(taskId)) + return GetByIdAsync(taskId)!; + + var task = new TestOperationTaskModel + { + Id = taskId, + OperationId = operation.Id, + InitiatingTaskId = parentTaskId, + Status = OperationTaskStatus.Queued + }; + Tasks.Add(taskId, task); + return new ValueTask(task); + } + + public override ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, object? additionalData) + { + if (!Tasks.ContainsKey(task.Id)) + return new ValueTask(false); + + Tasks[task.Id].Status = newStatus; + + return new ValueTask(true); + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/WorkflowTests.cs b/test/Rebus.Operations.Tests/WorkflowTests.cs new file mode 100644 index 0000000..946f272 --- /dev/null +++ b/test/Rebus.Operations.Tests/WorkflowTests.cs @@ -0,0 +1,137 @@ +using System.Text.Json; +using Dbosoft.Rebus.Operations.Workflow; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; +using Xunit.Abstractions; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class WorkflowTests : RebusTestBase +{ + + public WorkflowTests(ITestOutputHelper output) + : base(output) + { + } + + [Fact] + public async Task SingleStep_Operation_is_processed() + { + TestCommandHandler? taskHandler = null; + + using var setup = await SetupRebus(configureActivator: (activator, _, bus) => + { + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, new DefaultMessageEnricher())); + taskHandler = new TestCommandHandler(bus); + activator.Register(() => taskHandler); + activator.Register(() => new EmptyOperationStatusEventHandler()); + activator.Register(() => new EmptyOperationTaskStatusEventHandler()); + }); + + TestOperationManager.Reset(); + TestTaskManager.Reset(); + + await setup.OperationDispatcher.StartNew(); + await Task.Delay(1000); + Assert.True(taskHandler!.Called); + Assert.Single(TestOperationManager.Operations); + Assert.Equal(OperationStatus.Completed ,TestOperationManager.Operations.First().Value.Status); + + } + + [Fact] + public async Task MultiStep_Operation_is_processed() + { + StepOneCommandHandler? stepOneHandler; + StepTwoCommandHandler? stepTwoHandler; + using var setup = await SetupRebus(configureActivator: (activator, wf, bus) => + { + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, new DefaultMessageEnricher())); + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, new DefaultMessageEnricher())); + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, new DefaultMessageEnricher())); + + activator.Register(() => new EmptyOperationStatusEventHandler()); + activator.Register(() => new MultiStepSaga(wf)); + + stepOneHandler = new StepOneCommandHandler(bus); + stepTwoHandler = new StepTwoCommandHandler(bus); + activator.Register(() => stepOneHandler); + activator.Register(() => stepTwoHandler); + }); + + TestOperationManager.Reset(); + TestTaskManager.Reset(); + StepOneCommandHandler.Called = false; + StepTwoCommandHandler.Called = false; + + await setup.OperationDispatcher.StartNew(); + await Task.Delay(1000); + Assert.True(StepOneCommandHandler.Called); + Assert.True(StepTwoCommandHandler.Called); + Assert.Single(TestOperationManager.Operations); + Assert.Equal(3, TestTaskManager.Tasks.Count); + Assert.Equal(OperationStatus.Completed ,TestOperationManager.Operations.First().Value.Status); + + foreach (var taskModel in TestTaskManager.Tasks) + { + Assert.Equal(OperationTaskStatus.Completed, taskModel.Value.Status); + } + } + + [Fact] + public async Task Progress_is_reported() + { + using var setup = await SetupRebus(configureActivator: (activator, wf, bus) => + { + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, new DefaultMessageEnricher())); + + activator.Register(() => new OperationTaskProgressEventHandler(wf, + NullLogger.Instance)); + + activator.Register(() => new EmptyOperationTaskStatusEventHandler()); + activator.Register(() => new TestCommandHandlerWithProgress(bus)); + }); + + TestOperationManager.Reset(); + TestTaskManager.Reset(); + + await setup.OperationDispatcher.StartNew(); + await Task.Delay(1000); + Assert.Single(TestOperationManager.Progress); + var progressData = (JsonElement) TestOperationManager.Progress + .First().Value.First(); + Assert.Equal("progressData",progressData.GetString() ); + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task SingleStep_Operation_failure_is_reported(bool throws) + { + + using var setup = await SetupRebus(configureActivator: (activator, wf, bus) => + { + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, new DefaultMessageEnricher())); + activator.Register(() => new TestCommandHandlerWithError(bus, throws)); + activator.Register(() => new EmptyOperationStatusEventHandler()); + activator.Register(() => new EmptyOperationTaskStatusEventHandler()); + activator.Register(() => + new FailedOperationHandler>( + NullLogger>>.Instance, + wf.Messaging)); + }); + TestOperationManager.Reset(); + TestTaskManager.Reset(); + + await setup.OperationDispatcher.StartNew(); + await Task.Delay(throws ? 2000: 1000); + Assert.Equal(OperationStatus.Failed ,TestOperationManager.Operations.First().Value.Status); + + } +} \ No newline at end of file diff --git a/test/Rebus.SimpleInjector.Tests/Rebus.SimpleInjector.Tests.csproj b/test/Rebus.SimpleInjector.Tests/Rebus.SimpleInjector.Tests.csproj new file mode 100644 index 0000000..9b516b8 --- /dev/null +++ b/test/Rebus.SimpleInjector.Tests/Rebus.SimpleInjector.Tests.csproj @@ -0,0 +1,20 @@ + + + + net6.0 + enable + enable + + + + + + + + + + + + + + diff --git a/test/Rebus.SimpleInjector.Tests/SimpleInjectorTestBase.cs b/test/Rebus.SimpleInjector.Tests/SimpleInjectorTestBase.cs new file mode 100644 index 0000000..38e6451 --- /dev/null +++ b/test/Rebus.SimpleInjector.Tests/SimpleInjectorTestBase.cs @@ -0,0 +1,40 @@ +using Dbosoft.Rebus.Operations; +using Dbosoft.Rebus.Operations.Tests; +using Rebus.Persistence.InMem; +using Rebus.Transport.InMem; +using SimpleInjector; +using Xunit.Abstractions; + +namespace Dbosoft.Rebus.SimpleInjector.Tests; + +public abstract class SimpleInjectorTestBase +{ + private readonly ITestOutputHelper _output; + + protected SimpleInjectorTestBase(ITestOutputHelper output) + { + _output = output; + } + + public void SetupRebus(Container container) + { + var rebusNetwork = new InMemNetwork(); + + container.Register(Lifestyle.Scoped); + container.AddRebusOperationsHandlers(); + container.ConfigureRebus(configurer => + { + return configurer + .Options(o => o.EnableSimpleInjectorUnitOfWork()) + .Transport(cfg => cfg.UseInMemoryTransport(rebusNetwork, "main")) + .AddOperations("main") + .Sagas(x => x.StoreInMemory()) + .Subscriptions(x=>x.StoreInMemory()) + .Logging(x=>x.Use(new RebusTestLogging(_output))) + + .Start(); + }); + + } + +} \ No newline at end of file diff --git a/test/Rebus.SimpleInjector.Tests/SimpleInjectorWorkflowTests.cs b/test/Rebus.SimpleInjector.Tests/SimpleInjectorWorkflowTests.cs new file mode 100644 index 0000000..ab54493 --- /dev/null +++ b/test/Rebus.SimpleInjector.Tests/SimpleInjectorWorkflowTests.cs @@ -0,0 +1,73 @@ +using Dbosoft.Rebus.Operations; +using Dbosoft.Rebus.Operations.Tests; +using Microsoft.Extensions.DependencyInjection; +using Rebus.Bus; +using Rebus.Handlers; +using SimpleInjector; +using SimpleInjector.Lifestyles; +using Xunit; +using Xunit.Abstractions; + +namespace Dbosoft.Rebus.SimpleInjector.Tests; + +public class SimpleInjectorWorkflowTests : SimpleInjectorTestBase +{ + + public SimpleInjectorWorkflowTests(ITestOutputHelper output) + : base(output) + { + } + + + [Fact] + public async Task MultiStep_Operation_is_processed() + { + var sc = new ServiceCollection(); + sc.AddLogging(); + + var container = new Container(); + container.Options.DefaultScopedLifestyle = new AsyncScopedLifestyle(); + sc.AddSimpleInjector(container, cfg => cfg.AddLogging()); + + SetupRebus(container); + container.Collection.Append(typeof(IHandleMessages<>), typeof(MultiStepSaga)); + container.Collection.Append(typeof(IHandleMessages<>), typeof(StepOneCommandHandler), Lifestyle.Scoped); + container.Collection.Append(typeof(IHandleMessages<>), typeof(StepTwoCommandHandler), Lifestyle.Scoped); + + container.Register(Lifestyle.Scoped); + container.Register(Lifestyle.Scoped); + container.Register(Lifestyle.Scoped); + container.Register(Lifestyle.Scoped); + + var sp = sc.BuildServiceProvider(); + sp.UseSimpleInjector(container); + + container.Verify(); + + TestOperationManager.Reset(); + TestTaskManager.Reset(); + StepOneCommandHandler.Called = false; + StepTwoCommandHandler.Called = false; + + await using var scope = AsyncScopedLifestyle.BeginScope(container); + var bus = scope.GetInstance(); + await OperationsSetup.SubscribeEvents(bus); + + var dispatcher = scope.GetInstance(); + + await dispatcher.StartNew(); + await Task.Delay(1000); + Assert.True(StepOneCommandHandler.Called); + Assert.True(StepTwoCommandHandler.Called); + + Assert.Single(TestOperationManager.Operations); + Assert.Equal(3, TestTaskManager.Tasks.Count); + Assert.Equal(OperationStatus.Completed, TestOperationManager.Operations.First().Value.Status); + + foreach (var taskModel in TestTaskManager.Tasks) + { + Assert.Equal(OperationTaskStatus.Completed, taskModel.Value.Status); + } + } + +} \ No newline at end of file diff --git a/test/Rebus.SimpleInjector.Tests/TestRebusUnitOfWork.cs b/test/Rebus.SimpleInjector.Tests/TestRebusUnitOfWork.cs new file mode 100644 index 0000000..3f68378 --- /dev/null +++ b/test/Rebus.SimpleInjector.Tests/TestRebusUnitOfWork.cs @@ -0,0 +1,19 @@ +namespace Dbosoft.Rebus.SimpleInjector.Tests; + +public class TestRebusUnitOfWork : IRebusUnitOfWork +{ + public ValueTask DisposeAsync() + { + return ValueTask.CompletedTask; + } + + public void Dispose() + { + + } + + public Task Commit() + { + return Task.CompletedTask; + } +} \ No newline at end of file