diff --git a/azure-piplines.yml b/azure-piplines.yml
index 6fe0395..2faee4b 100644
--- a/azure-piplines.yml
+++ b/azure-piplines.yml
@@ -31,8 +31,23 @@ steps:
inputs:
command: test
projects: '**/*Tests/*.csproj'
- arguments: '--configuration $(buildConfiguration) --collect "Code coverage" --no-build'
-
+ arguments: '--configuration $(buildConfiguration) --collect "XPlat Code Coverage" --no-build'
+ publishTestResults: true
+
+- task: reportgenerator@5
+ displayName: 'Merge code coverage results'
+ inputs:
+ reports: '$(Agent.WorkFolder)/**/coverage.cobertura.xml'
+ targetdir: '$(Build.SourcesDirectory)/CoverageResults'
+ reporttypes: 'Cobertura'
+
+# Publish the combined code coverage to the pipeline
+- task: PublishCodeCoverageResults@1
+ displayName: 'Publish code coverage report'
+ inputs:
+ codeCoverageTool: 'Cobertura'
+ summaryFileLocation: '$(Build.SourcesDirectory)/CoverageResults/Cobertura.xml'
+
- task: DotNetCoreCLI@2
displayName: dotnet pack
inputs:
diff --git a/rebus-extensions.sln b/rebus-extensions.sln
index ac40168..ba074fe 100644
--- a/rebus-extensions.sln
+++ b/rebus-extensions.sln
@@ -13,6 +13,20 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Configuration.MsmqSel
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Rebus.Configuration.SqlServerSelectors", "src\Rebus.Configuration\Rebus.Configuration.SqlServerSelectors\Rebus.Configuration.SqlServerSelectors.csproj", "{7750D5E7-41C2-4D0E-8E84-78D51C8E67FC}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.Core", "src\Rebus.Operations\Rebus.Operations.Core\Rebus.Operations.Core.csproj", "{9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.Primitives", "src\Rebus.Operations\Rebus.Operations.Primitives\Rebus.Operations.Primitives.csproj", "{1F378946-2CD9-4A63-BB5D-29A5001679FB}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.Abstractions", "src\Rebus.Operations\Rebus.Operations.Abstractions\Rebus.Operations.Abstractions.csproj", "{1A83C61F-396A-46A5-A718-06121D705C2D}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.Tests", "test\Rebus.Operations.Tests\Rebus.Operations.Tests.csproj", "{D292704B-A6D1-4577-807C-5E58F5C593BE}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.Operations.SimpleInjector", "src\Rebus.Operations\Rebus.Operations.SimpleInjector\Rebus.Operations.SimpleInjector.csproj", "{8E8CF6AA-9025-460A-B4E8-CA8A317F758D}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.SimpleInjector", "src\Rebus.SimpleInjector\Rebus.SimpleInjector.csproj", "{7A693524-32D1-4D4F-B355-7DC1BC717BEC}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.SimpleInjector.Tests", "test\Rebus.SimpleInjector.Tests\Rebus.SimpleInjector.Tests.csproj", "{539BED78-F196-4FA2-9E48-2185FE82444E}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -39,6 +53,34 @@ Global
{7750D5E7-41C2-4D0E-8E84-78D51C8E67FC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7750D5E7-41C2-4D0E-8E84-78D51C8E67FC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7750D5E7-41C2-4D0E-8E84-78D51C8E67FC}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9C8C26F8-E2B5-4D3D-B046-03CF2BF6BA3D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {1F378946-2CD9-4A63-BB5D-29A5001679FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {1F378946-2CD9-4A63-BB5D-29A5001679FB}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {1F378946-2CD9-4A63-BB5D-29A5001679FB}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {1F378946-2CD9-4A63-BB5D-29A5001679FB}.Release|Any CPU.Build.0 = Release|Any CPU
+ {1A83C61F-396A-46A5-A718-06121D705C2D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {1A83C61F-396A-46A5-A718-06121D705C2D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {1A83C61F-396A-46A5-A718-06121D705C2D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {1A83C61F-396A-46A5-A718-06121D705C2D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D292704B-A6D1-4577-807C-5E58F5C593BE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D292704B-A6D1-4577-807C-5E58F5C593BE}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D292704B-A6D1-4577-807C-5E58F5C593BE}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D292704B-A6D1-4577-807C-5E58F5C593BE}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8E8CF6AA-9025-460A-B4E8-CA8A317F758D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8E8CF6AA-9025-460A-B4E8-CA8A317F758D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8E8CF6AA-9025-460A-B4E8-CA8A317F758D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8E8CF6AA-9025-460A-B4E8-CA8A317F758D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {7A693524-32D1-4D4F-B355-7DC1BC717BEC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {7A693524-32D1-4D4F-B355-7DC1BC717BEC}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {7A693524-32D1-4D4F-B355-7DC1BC717BEC}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {7A693524-32D1-4D4F-B355-7DC1BC717BEC}.Release|Any CPU.Build.0 = Release|Any CPU
+ {539BED78-F196-4FA2-9E48-2185FE82444E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {539BED78-F196-4FA2-9E48-2185FE82444E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {539BED78-F196-4FA2-9E48-2185FE82444E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {539BED78-F196-4FA2-9E48-2185FE82444E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/rebus-extensions.sln.DotSettings b/rebus-extensions.sln.DotSettings
index 36fb221..4ad39df 100644
--- a/rebus-extensions.sln.DotSettings
+++ b/rebus-extensions.sln.DotSettings
@@ -3,5 +3,6 @@
True
True
True
+ True
True
True
\ No newline at end of file
diff --git a/src/Rebus.Operations/Directory.Build.props b/src/Rebus.Operations/Directory.Build.props
new file mode 100644
index 0000000..6d03a26
--- /dev/null
+++ b/src/Rebus.Operations/Directory.Build.props
@@ -0,0 +1,7 @@
+
+
+
+ Dbosoft.Rebus.Operations
+
+
+
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperation.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperation.cs
new file mode 100644
index 0000000..8595585
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperation.cs
@@ -0,0 +1,12 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations;
+
+public interface IOperation
+{
+ public Guid Id { get; }
+
+ public OperationStatus Status { get; }
+
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs
new file mode 100644
index 0000000..22e6de0
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs
@@ -0,0 +1,15 @@
+#nullable enable
+
+using System;
+using System.Threading.Tasks;
+
+namespace Dbosoft.Rebus.Operations
+{
+ public interface IOperationDispatcher
+ {
+ ValueTask StartNew(object? additionalData = default) where T : class, new();
+ ValueTask StartNew(Type commandType, object? additionalData = default);
+ ValueTask StartNew(object operationCommand);
+
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationLogEntry.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationLogEntry.cs
new file mode 100644
index 0000000..f5753d5
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationLogEntry.cs
@@ -0,0 +1,10 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations;
+
+public interface IOperationLogEntry
+{
+ string Message { get; }
+ public DateTimeOffset Timestamp { get; }
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs
new file mode 100644
index 0000000..0dbbaef
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs
@@ -0,0 +1,16 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations;
+
+public interface IOperationTask
+{
+ public Guid Id { get; }
+
+ public Guid OperationId { get; }
+
+ public Guid InitiatingTaskId { get; }
+
+
+ OperationTaskStatus Status { get; }
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs
new file mode 100644
index 0000000..9c18c43
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs
@@ -0,0 +1,21 @@
+#nullable enable
+
+using System;
+using System.Threading.Tasks;
+
+namespace Dbosoft.Rebus.Operations
+{
+ public interface IOperationTaskDispatcher
+ {
+ ValueTask StartNew(Guid operationId, Guid initiatingTaskId,
+ object? additionalData = default) where T : class, new();
+
+ ValueTask StartNew(Guid operationId, Guid initiatingTaskId, Type operationCommandType,
+ object? additionalData = default);
+
+ ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object command
+ , object? additionalData = default);
+
+ }
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Rebus.Operations.Abstractions.csproj b/src/Rebus.Operations/Rebus.Operations.Abstractions/Rebus.Operations.Abstractions.csproj
new file mode 100644
index 0000000..45cd955
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Rebus.Operations.Abstractions.csproj
@@ -0,0 +1,13 @@
+
+
+
+ netstandard2.1
+ enable
+ latest
+
+
+
+
+
+
+
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs
new file mode 100644
index 0000000..0a4e318
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs
@@ -0,0 +1,8 @@
+using Dbosoft.Rebus.Operations.Commands;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+public interface IMessageEnricher
+{
+ object? EnrichTaskAcceptedReply(OperationTaskSystemMessage taskMessage) where T : class, new();
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs
new file mode 100644
index 0000000..fbac907
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs
@@ -0,0 +1,14 @@
+using System;
+using System.Threading.Tasks;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+#nullable enable
+public interface IOperationManager
+{
+ ValueTask GetByIdAsync(Guid operationId);
+ ValueTask GetOrCreateAsync(Guid operationId, object command);
+
+ ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData);
+ ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, object? data);
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs
new file mode 100644
index 0000000..7ffe067
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs
@@ -0,0 +1,17 @@
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Events;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+public interface IOperationMessaging
+{
+ void DispatchTaskMessage(object command, IOperationTask task);
+ Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message);
+ Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message);
+ Task DispatchOperationStatusEventAsync(OperationStatusEvent operationStatusEvent);
+
+ IOperationDispatcher OperationDispatcher { get; }
+ IOperationTaskDispatcher TaskDispatcher { get; }
+
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs
new file mode 100644
index 0000000..fb171ff
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Threading.Tasks;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+#nullable enable
+
+public interface IOperationTaskManager
+{
+ ValueTask GetByIdAsync(Guid taskId);
+ ValueTask GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId);
+ ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, object? additionalData);
+
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IWorkflow.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IWorkflow.cs
new file mode 100644
index 0000000..e8acd67
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IWorkflow.cs
@@ -0,0 +1,10 @@
+#nullable enable
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+public interface IWorkflow
+{
+ IOperationManager Operations { get; }
+ IOperationTaskManager Tasks { get; }
+ IOperationMessaging Messaging { get; }
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs b/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs
new file mode 100644
index 0000000..9eb777a
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs
@@ -0,0 +1,57 @@
+using System;
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Events;
+using Rebus.Bus;
+using Rebus.Transport;
+
+namespace Dbosoft.Rebus.Operations;
+
+public static class BusOperationTaskExtensions
+{
+ public static Task FailTask(this IBus bus, IOperationTaskMessage message, string errorMessage)
+ {
+ return FailTask(bus, message, new ErrorData { ErrorMessage = errorMessage });
+ }
+
+ public static Task FailTask(this IBus bus, IOperationTaskMessage message, ErrorData error)
+ {
+ return bus.Publish(
+ OperationTaskStatusEvent.Failed(
+ message.OperationId, message.InitiatingTaskId,
+ message.TaskId, error));
+ }
+
+
+ public static Task CompleteTask(this IBus bus, IOperationTaskMessage message)
+ {
+ return bus.Publish(
+ OperationTaskStatusEvent.Completed(
+ message.OperationId, message.InitiatingTaskId, message.TaskId));
+ }
+
+ public static Task CompleteTask(this IBus bus, IOperationTaskMessage message, object responseMessage)
+ {
+ return bus.Publish(
+ OperationTaskStatusEvent.Completed(
+ message.OperationId, message.InitiatingTaskId, message.TaskId, responseMessage));
+ }
+
+
+ public static async Task ProgressMessage(this IBus bus, IOperationTaskMessage message, object data)
+ {
+ using var scope = new RebusTransactionScope();
+
+
+ await bus.Publish(new OperationTaskProgressEvent
+ {
+ Id = Guid.NewGuid(),
+ OperationId = message.OperationId,
+ TaskId = message.TaskId,
+ Data = data,
+ Timestamp = DateTimeOffset.UtcNow
+ }).ConfigureAwait(false);
+
+ // commit it like this
+ await scope.CompleteAsync().ConfigureAwait(false);
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs
new file mode 100644
index 0000000..eec08fc
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs
@@ -0,0 +1,27 @@
+using System;
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Workflow;
+using Microsoft.Extensions.Logging;
+using Rebus.Bus;
+
+namespace Dbosoft.Rebus.Operations;
+
+#nullable enable
+
+public class DefaultOperationDispatcher : OperationDispatcherBase
+{
+ private readonly IOperationManager _operationManager;
+
+ public DefaultOperationDispatcher(
+ IBus bus,
+ ILogger logger,
+ IOperationManager operationManager) : base(bus, logger)
+ {
+ _operationManager = operationManager;
+ }
+
+ protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData)
+ {
+ return (await _operationManager.GetOrCreateAsync(Guid.NewGuid(), command), command);
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs
new file mode 100644
index 0000000..fe87e11
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs
@@ -0,0 +1,34 @@
+using System;
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Workflow;
+using Microsoft.Extensions.Logging;
+using Rebus.Bus;
+
+namespace Dbosoft.Rebus.Operations;
+
+public class DefaultOperationTaskDispatcher : OperationTaskDispatcherBase
+{
+ private readonly IOperationManager _operationManager;
+ private readonly IOperationTaskManager _operationTaskManager;
+
+
+ public DefaultOperationTaskDispatcher(
+ IBus bus,
+ ILogger logger,
+ IOperationManager operationManager, IOperationTaskManager operationTaskManager) : base(bus, logger)
+ {
+ _operationManager = operationManager;
+ _operationTaskManager = operationTaskManager;
+ }
+
+ protected override async ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData)
+ {
+ var op = await _operationManager.GetByIdAsync(operationId);
+ if (op == null)
+ {
+ throw new ArgumentException($"Operation {operationId} not found", nameof(operationId));
+ }
+
+ return (await _operationTaskManager.GetOrCreateAsync(op, command, Guid.NewGuid(), initiatingTaskId), command);
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs
new file mode 100644
index 0000000..bb1a183
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs
@@ -0,0 +1,72 @@
+
+
+#nullable enable
+
+using System;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Commands;
+using Microsoft.Extensions.Logging;
+using Rebus.Bus;
+
+namespace Dbosoft.Rebus.Operations
+{
+
+ public abstract class OperationDispatcherBase : IOperationDispatcher
+ {
+ private readonly IBus _bus;
+ private readonly ILogger _logger;
+
+ protected OperationDispatcherBase(IBus bus, ILogger logger)
+ {
+ _bus = bus;
+ _logger = logger;
+ }
+
+ public ValueTask StartNew(object command)
+ {
+ return StartOperation(command, null);
+ }
+
+ public ValueTask StartNew(object? additionalData = default)
+ where T : class, new()
+ {
+ return StartOperation( Activator.CreateInstance(),additionalData);
+ }
+
+
+ public ValueTask StartNew(Type commandType, object? additionalData = default)
+ {
+ return StartOperation(commandType,additionalData);
+ }
+
+ protected abstract ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData);
+
+ protected async ValueTask StartOperation(object command, object? additionalData)
+ {
+ if (command == null)
+ throw new ArgumentNullException(nameof(command));
+
+ var(operation, taskCommand) = await CreateOperation(command, additionalData);
+
+ var commandJson = JsonSerializer.Serialize(taskCommand);
+
+ var taskMessage = new CreateNewOperationTaskCommand(
+ taskCommand.GetType().AssemblyQualifiedName,
+ commandJson,
+ operation.Id,
+ operation.Id,
+ Guid.NewGuid());
+
+ var message = new CreateOperationCommand { TaskMessage = taskMessage };
+ await _bus.Send(message);
+
+ _logger.LogDebug("Send new command of type {commandType}. Id: {operationId}",
+ taskCommand.GetType().Name, operation.Id);
+
+ return operation;
+
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationTask.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationTask.cs
new file mode 100644
index 0000000..3894c01
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationTask.cs
@@ -0,0 +1,20 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations
+{
+ public class OperationTask : IOperationTaskMessage where T : class, new()
+ {
+ public OperationTask(T command, Guid operationId, Guid initiatingTaskId, Guid taskId)
+ {
+ Command = command;
+ OperationId = operationId;
+ InitiatingTaskId = initiatingTaskId;
+ TaskId = taskId;
+ }
+
+ public T Command { get; }
+ public Guid OperationId { get; }
+ public Guid InitiatingTaskId { get; }
+ public Guid TaskId { get; }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs
new file mode 100644
index 0000000..2e692b8
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs
@@ -0,0 +1,68 @@
+
+
+#nullable enable
+
+using System;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Commands;
+using Microsoft.Extensions.Logging;
+using Rebus.Bus;
+
+namespace Dbosoft.Rebus.Operations;
+public abstract class OperationTaskDispatcherBase : IOperationTaskDispatcher
+{
+ private readonly IBus _bus;
+ private readonly ILogger _logger;
+
+ protected OperationTaskDispatcherBase(IBus bus, ILogger logger)
+ {
+ _bus = bus;
+ _logger = logger;
+ }
+
+ public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object command, object? additionalData = null)
+ {
+ return StartTask(operationId, initiatingTaskId, command, additionalData);
+ }
+
+ public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object? additionalData = default)
+ where T : class, new()
+ {
+ return StartTask(operationId, initiatingTaskId, Activator.CreateInstance(),
+ additionalData);
+ }
+
+ public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, Type commandType, object? additionalData = default)
+ {
+ return StartTask(operationId, initiatingTaskId, commandType, additionalData);
+ }
+
+ protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData);
+
+ protected async ValueTask StartTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData)
+ {
+ if (command == null)
+ throw new ArgumentNullException(nameof(command));
+
+ var (task, taskCommand) = await CreateTask(operationId, initiatingTaskId, command, additionalData);
+ var commandJson = JsonSerializer.Serialize(taskCommand);
+
+ var taskMessage = new CreateNewOperationTaskCommand(
+ taskCommand.GetType().AssemblyQualifiedName,
+ commandJson,
+ operationId,
+ initiatingTaskId,
+ task.Id);
+
+ await _bus.Send(taskMessage);
+
+ _logger.LogDebug("Send new command of type {commandType}. Id: {operationId}, ParentTaskId: {parentTaskId}",
+ taskCommand.GetType().Name, operationId, initiatingTaskId);
+
+ return task;
+
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs
new file mode 100644
index 0000000..30b7b00
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs
@@ -0,0 +1,19 @@
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Events;
+using Rebus.Bus;
+
+namespace Dbosoft.Rebus.Operations;
+
+public static class OperationsSetup
+{
+ public static async Task SubscribeEvents(IBus bus)
+ {
+ await bus.Subscribe();
+ await bus.Subscribe();
+ await bus.Subscribe();
+ await bus.Subscribe();
+ await bus.Subscribe();
+
+ return bus;
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj b/src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj
new file mode 100644
index 0000000..9b99b6f
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj
@@ -0,0 +1,18 @@
+
+
+
+ netstandard2.1
+ enable
+ latest
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs
new file mode 100644
index 0000000..b478117
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs
@@ -0,0 +1,11 @@
+using Dbosoft.Rebus.Operations.Commands;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+public class DefaultMessageEnricher : IMessageEnricher
+{
+ public object? EnrichTaskAcceptedReply(OperationTaskSystemMessage taskMessage) where T : class, new()
+ {
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultWorkflow.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultWorkflow.cs
new file mode 100644
index 0000000..bf523fe
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultWorkflow.cs
@@ -0,0 +1,16 @@
+#nullable enable
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+public class DefaultWorkflow : IWorkflow
+{
+ public DefaultWorkflow(IOperationManager operation, IOperationTaskManager tasks, IOperationMessaging messaging)
+ {
+ Operations = operation;
+ Tasks = tasks;
+ Messaging = messaging;
+ }
+
+ public IOperationManager Operations { get; }
+ public IOperationTaskManager Tasks { get; }
+ public IOperationMessaging Messaging { get; }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationStatusEventHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationStatusEventHandler.cs
new file mode 100644
index 0000000..94ebfc6
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationStatusEventHandler.cs
@@ -0,0 +1,14 @@
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Events;
+using Rebus.Handlers;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+public class EmptyOperationStatusEventHandler : IHandleMessages
+
+{
+ public Task Handle(OperationStatusEvent message)
+ {
+ return Task.CompletedTask;
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationTaskStatusEventHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationTaskStatusEventHandler.cs
new file mode 100644
index 0000000..2895dda
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/EmptyOperationTaskStatusEventHandler.cs
@@ -0,0 +1,14 @@
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Events;
+using Rebus.Handlers;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+public class EmptyOperationTaskStatusEventHandler : IHandleMessages>
+ where TMessage : class, new()
+{
+ public Task Handle(OperationTaskStatusEvent message)
+ {
+ return Task.CompletedTask;
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs
new file mode 100644
index 0000000..f5e7c04
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs
@@ -0,0 +1,40 @@
+
+
+#nullable enable
+
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Events;
+using Microsoft.Extensions.Logging;
+using Rebus.Handlers;
+using Rebus.Retry.Simple;
+
+namespace Dbosoft.Rebus.Operations.Workflow
+{
+ public class FailedOperationHandler : IHandleMessages> where T: IOperationTaskMessage
+ {
+ private readonly IOperationMessaging _operationMessaging;
+
+ private readonly ILogger> _logger;
+
+ public FailedOperationHandler(ILogger> logger, IOperationMessaging operationMessaging)
+ {
+ _logger = logger;
+ _operationMessaging = operationMessaging;
+ }
+
+ public async Task Handle(IFailed failedMessage)
+ {
+
+ _logger.LogError("Task {taskId} failed with message: {failedMessage}",
+ failedMessage.Message.TaskId, failedMessage.ErrorDescription
+ );
+
+ await _operationMessaging.DispatchTaskStatusEventAsync(
+ OperationTaskStatusEvent.Failed(
+ failedMessage.Message.OperationId, failedMessage.Message.InitiatingTaskId,
+ failedMessage.Message.TaskId, new ErrorData() { ErrorMessage = failedMessage.ErrorDescription }));
+
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs
new file mode 100644
index 0000000..01ea7e7
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs
@@ -0,0 +1,39 @@
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Commands;
+using Dbosoft.Rebus.Operations.Events;
+using Microsoft.Extensions.Logging;
+using Rebus.Bus;
+using Rebus.Handlers;
+
+namespace Dbosoft.Rebus.Operations.Workflow
+{
+ public class IncomingTaskMessageHandler : IHandleMessages> where T: class, new()
+ {
+ private readonly IBus _bus;
+ private readonly ILogger> _logger;
+ private readonly IMessageEnricher _messageEnricher;
+ public IncomingTaskMessageHandler(IBus bus, ILogger> logger, IMessageEnricher messageEnricher)
+ {
+ _bus = bus;
+ _logger = logger;
+ _messageEnricher = messageEnricher;
+ }
+
+ public async Task Handle(OperationTaskSystemMessage taskMessage)
+ {
+ await _bus.SendLocal(new OperationTask(taskMessage.Message, taskMessage.OperationId, taskMessage.InitiatingTaskId, taskMessage.TaskId)).ConfigureAwait(false);
+
+ _logger.LogTrace($"Accepted incoming operation message. Operation id: '{taskMessage.OperationId}'");
+
+ var reply = new OperationTaskAcceptedEvent
+ {
+ OperationId = taskMessage.OperationId,
+ InitiatingTaskId = taskMessage.InitiatingTaskId,
+ TaskId = taskMessage.TaskId,
+ AdditionalData = _messageEnricher.EnrichTaskAcceptedReply(taskMessage)
+ };
+
+ await _bus.Reply(reply).ConfigureAwait(false);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs
new file mode 100644
index 0000000..822d234
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs
@@ -0,0 +1,26 @@
+#nullable enable
+using System;
+using System.Threading.Tasks;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+public abstract class OperationManagerBase : IOperationManager
+{
+
+ public abstract ValueTask GetByIdAsync(Guid operationId);
+
+ public abstract ValueTask GetOrCreateAsync(Guid operationId, object command);
+
+
+ public abstract ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation,
+ IOperationTask task,
+ object? data);
+
+ public abstract ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus,
+ object? additionalData);
+
+
+
+
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationSagaData.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationSagaData.cs
new file mode 100644
index 0000000..c1e4063
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationSagaData.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using Rebus.Sagas;
+
+namespace Dbosoft.Rebus.Operations.Workflow
+{
+
+ public class OperationSagaData : ISagaData
+ {
+ public Guid OperationId { get; set; }
+ public Guid PrimaryTaskId { get; set; }
+
+ // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
+ public Dictionary Tasks { get; set; } = new();
+ public Guid Id { get; set; }
+ public int Revision { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs
new file mode 100644
index 0000000..0f407e7
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Threading.Tasks;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+#nullable enable
+public abstract class OperationTaskManagerBase : IOperationTaskManager
+{
+
+ public abstract ValueTask GetByIdAsync(Guid taskId);
+
+ public abstract ValueTask GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId);
+
+ public abstract ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus,
+ object? additionalData);
+
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs
new file mode 100644
index 0000000..ddc5fbf
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs
@@ -0,0 +1,58 @@
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Events;
+using JetBrains.Annotations;
+using Microsoft.Extensions.Logging;
+using Rebus.Handlers;
+
+namespace Dbosoft.Rebus.Operations.Workflow
+{
+ [UsedImplicitly]
+ public class OperationTaskProgressEventHandler : IHandleMessages
+ {
+ private readonly IWorkflow _workflow;
+ private readonly ILogger _logger;
+
+ public OperationTaskProgressEventHandler(IWorkflow workflow, ILogger logger)
+ {
+ _workflow = workflow;
+ _logger = logger;
+ }
+
+
+ public async Task Handle(OperationTaskProgressEvent message)
+ {
+ _logger.LogDebug($"Received operation task progress event. Id : '{message.OperationId}/{message.TaskId}'");
+
+ var operation = await _workflow.Operations
+ .GetByIdAsync(message.OperationId)
+ .ConfigureAwait(false);
+
+ var task = await _workflow.Tasks
+ .GetByIdAsync(message.TaskId)
+ .ConfigureAwait(false);
+
+
+ if (operation != null && task!=null)
+ {
+ await _workflow.Operations.AddProgressAsync(
+ message.Id,
+ message.Timestamp,
+ operation,
+ task,
+ message.Data);
+ }
+ else
+ {
+ _logger.LogWarning($"Received operation task progress event for a unknown operation task. Id : '{message.OperationId}/{message.TaskId}'", new
+ {
+ message.OperationId,
+ message.TaskId,
+ message.Data,
+ message.Timestamp,
+ });
+
+ }
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs
new file mode 100644
index 0000000..1f0f2ca
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs
@@ -0,0 +1,116 @@
+#nullable enable
+
+using System;
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Events;
+using Rebus.Handlers;
+using Rebus.Sagas;
+
+namespace Dbosoft.Rebus.Operations.Workflow
+{
+ public abstract class OperationTaskWorkflowSaga : Saga,
+ IAmInitiatedBy>,
+ IHandleMessages>
+ where TSagaData : TaskWorkflowSagaData, new()
+ where TMessage : class, new()
+ {
+ protected readonly IWorkflow WorkflowEngine;
+
+ protected OperationTaskWorkflowSaga(IWorkflow workflowEngine)
+ {
+ WorkflowEngine = workflowEngine;
+ }
+
+
+ public virtual Task Handle(OperationTask message)
+ {
+ Data.OperationId = message.OperationId;
+ Data.SagaTaskId = message.TaskId;
+ Data.ParentTaskId = message.InitiatingTaskId;
+ return Initiated(message.Command);
+ }
+
+ public Task Handle(OperationTaskStatusEvent message)
+ {
+ return message.OperationFailed ? InitiatingTaskFailed() : InitiatingTaskCompleted();
+ }
+
+ protected override void CorrelateMessages(ICorrelationConfig config)
+ {
+ config.Correlate>(m => m.TaskId, d => d.SagaTaskId);
+ config.Correlate>(m => m.TaskId, d => d.SagaTaskId);
+ }
+
+ protected abstract Task Initiated(TMessage message);
+
+ private Task InitiatingTaskCompleted()
+ {
+ MarkAsComplete();
+ return Task.CompletedTask;
+ }
+
+ private Task InitiatingTaskFailed()
+ {
+ MarkAsComplete();
+ return Task.CompletedTask;
+ }
+
+ protected Task Fail(object? message = null)
+ {
+ return WorkflowEngine.Messaging.DispatchTaskStatusEventAsync(OperationTaskStatusEvent.Failed(
+ Data.OperationId, Data.ParentTaskId, Data.SagaTaskId, message));
+ }
+
+
+ protected Task Complete(object? message = null)
+ {
+ return WorkflowEngine.Messaging.DispatchTaskStatusEventAsync(OperationTaskStatusEvent.Completed(
+ Data.OperationId, Data.ParentTaskId, Data.SagaTaskId, message));
+ }
+
+ protected Task FailOrRun(OperationTaskStatusEvent message, Func completedFunc)
+ where T : class, new()
+ {
+ if (message.OperationFailed)
+ return Fail(message.GetMessage());
+
+ return completedFunc();
+ }
+
+ protected Task FailOrRun(OperationTaskStatusEvent message, Func completedFunc)
+ where T : class, new()
+ where TOpMessage : class
+ {
+ return message.OperationFailed
+ ? Fail(message.GetMessage())
+ : completedFunc(message.GetMessage() as TOpMessage
+ ?? throw new InvalidOperationException(
+ $"Message {typeof(T)} has not returned a result of type {typeof(TOpMessage)}."));
+ }
+
+
+ protected ValueTask StartNewTask(object? additionalData = default) where T : class, new()
+ {
+
+ return WorkflowEngine.Messaging.TaskDispatcher.StartNew(Data.OperationId, Data.SagaTaskId, additionalData);
+ }
+
+
+ protected ValueTask StartNewTask(Type taskCommandType,
+ object? additionalData = default)
+ {
+ return WorkflowEngine.Messaging.TaskDispatcher.StartNew(Data.OperationId, Data.SagaTaskId, taskCommandType, additionalData);
+
+ }
+
+
+ protected ValueTask StartNewTask(object command, object? additionalData = default)
+ {
+ return WorkflowEngine.Messaging.TaskDispatcher.StartNew(Data.OperationId, Data.SagaTaskId, command, additionalData);
+
+ }
+
+
+
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs
new file mode 100644
index 0000000..3ed3d1d
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs
@@ -0,0 +1,207 @@
+#nullable enable
+
+using System;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Commands;
+using Dbosoft.Rebus.Operations.Events;
+using JetBrains.Annotations;
+using Microsoft.Extensions.Logging;
+using Rebus.Handlers;
+using Rebus.Sagas;
+
+namespace Dbosoft.Rebus.Operations.Workflow
+{
+ [UsedImplicitly]
+ public class ProcessOperationSaga : Saga,
+ IAmInitiatedBy,
+ IHandleMessages,
+ IHandleMessages,
+ IHandleMessages,
+ IHandleMessages
+ {
+ private readonly IWorkflow _workflow;
+ private readonly ILogger _log;
+
+ public ProcessOperationSaga(IWorkflow workflow, ILogger log)
+ {
+ _workflow = workflow;
+ _log = log;
+ }
+
+ public Task Handle(CreateOperationCommand message)
+ {
+ Data.OperationId = message.TaskMessage.OperationId;
+ Data.PrimaryTaskId = message.TaskMessage.TaskId;
+ return Handle(message.TaskMessage);
+ }
+
+ public Task Handle(OperationTimeoutEvent message)
+ {
+ return Task.CompletedTask;
+ }
+
+ protected override void CorrelateMessages(ICorrelationConfig config)
+ {
+ config.Correlate(m => m.TaskMessage.OperationId, d => d.OperationId);
+ config.Correlate(m => m.OperationId, d => d.OperationId);
+ config.Correlate(m => m.OperationId, d => d.OperationId);
+ config.Correlate(m => m.OperationId, d => d.OperationId);
+ config.Correlate(m => m.OperationId, d => d.OperationId);
+ }
+
+ public async Task Handle(CreateNewOperationTaskCommand message)
+ {
+
+ var command = JsonSerializer.Deserialize(message.CommandData,
+ Type.GetType(message.CommandType) ??
+ throw new InvalidOperationException($"Operation Workflow {message.OperationId}: unknown command type '{message.CommandType}'"));
+
+ if(command == null)
+ throw new InvalidOperationException($"Operation Workflow {message.OperationId}: invalid command data in message '{message.CommandType}'");
+
+ _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: creating new task command '{commandType}'",
+ message.OperationId, message.TaskId, command.GetType());
+
+
+ var op = await _workflow.Operations
+ .GetByIdAsync(message.OperationId)
+ .ConfigureAwait(false);
+
+
+ if (op == null)
+ {
+ _log.LogWarning("Operation Workflow {operationId}: Operation not found - cancelling workflow",
+ message.OperationId);
+ MarkAsComplete();
+ return;
+ }
+
+ var task = await _workflow.Tasks
+ .GetOrCreateAsync(op, command, message.TaskId, message.InitiatingTaskId)
+ .ConfigureAwait(false);
+
+
+ var messageType = Type.GetType(message.CommandType);
+ if (messageType == null)
+ throw new InvalidOperationException($"unknown command type '{message.CommandType}'");
+
+ Data.Tasks.Add(message.TaskId, messageType.AssemblyQualifiedName!);
+ _workflow.Messaging.DispatchTaskMessage(command,task);
+ }
+
+
+ public async Task Handle(OperationTaskAcceptedEvent message)
+ {
+ var op = await _workflow.Operations
+ .GetByIdAsync(message.OperationId)
+ .ConfigureAwait(false);
+
+ var task = await _workflow.Tasks
+ .GetByIdAsync(message.TaskId)
+ .ConfigureAwait(false);
+
+ if (op == null || task == null)
+ {
+ _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: could not accept task as it was not found.",
+ message.OperationId, message.TaskId);
+ return;
+ }
+
+ var opOldStatus = op.Status;
+ if (await _workflow.Operations.TryChangeStatusAsync(op, OperationStatus.Running, null))
+ {
+ _log.LogDebug("Operation Workflow {operationId}: Status changed: {oldStatus} -> {newStatus}",
+ message.OperationId, opOldStatus, op.Status);
+
+
+ await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent
+ {
+ OperationId = op.Id,
+ NewStatus = OperationStatus.Running
+ });
+
+
+ var taskOldStatus = task.Status;
+ if (await _workflow.Tasks.TryChangeStatusAsync(task, OperationTaskStatus.Running,
+ message.AdditionalData))
+ {
+ _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Status changed: {oldStatus} -> {newStatus}",
+ message.OperationId, message.TaskId, taskOldStatus, task.Status);
+
+ }
+ }
+
+ }
+
+ public async Task Handle(OperationTaskStatusEvent message)
+ {
+ var op = await _workflow.Operations
+ .GetByIdAsync(message.OperationId)
+ .ConfigureAwait(false);
+
+ var task = await _workflow.Tasks
+ .GetByIdAsync(message.TaskId)
+ .ConfigureAwait(false);
+
+ if (op == null || task == null)
+ {
+ _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: could not update task status as it was not found",
+ message.OperationId, message.TaskId);
+ return;
+ }
+
+ if (task.Status is OperationTaskStatus.Queued or OperationTaskStatus.Running)
+ {
+ if(!Data.Tasks.ContainsKey(message.TaskId))
+ _log.LogWarning("Operation Workflow {operationId}, Task {taskId}: could not update task status as it was not found in workflow.",
+ message.OperationId, message.TaskId);
+ else
+ {
+ var taskCommandTypeName = Data.Tasks[message.TaskId];
+ await _workflow.Messaging.DispatchTaskStatusEventAsync(taskCommandTypeName, message);
+ }
+ }
+
+ var taskOldStatus = task.Status;
+ if(await _workflow.Tasks.TryChangeStatusAsync(task,
+ message.OperationFailed
+ ? OperationTaskStatus.Failed
+ : OperationTaskStatus.Completed
+ , message.GetMessage()))
+
+ _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Status changed: {oldStatus} -> {newStatus}",
+ message.OperationId, message.TaskId, taskOldStatus, task.Status);
+
+
+
+ if (message.TaskId == Data.PrimaryTaskId)
+ {
+
+ var newStatus = message.OperationFailed
+ ? OperationStatus.Failed
+ : OperationStatus.Completed;
+
+ _log.LogDebug("Operation Workflow {operationId}: Primary task changed, updating operation status to {newStatus}",
+ message.OperationId, newStatus);
+
+
+ if (await _workflow.Operations.TryChangeStatusAsync(op,
+ newStatus, message.GetMessage()))
+ {
+ await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent
+ {
+ OperationId = op.Id,
+ NewStatus = newStatus
+ });
+ }
+
+ _log.LogDebug("Operation Workflow {operationId}: Completing workflow",
+ message.OperationId);
+
+ MarkAsComplete();
+ }
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs
new file mode 100644
index 0000000..245a923
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs
@@ -0,0 +1,54 @@
+using System;
+using System.Threading.Tasks;
+using Dbosoft.Rebus.Operations.Commands;
+using Dbosoft.Rebus.Operations.Events;
+using Rebus.Bus;
+
+namespace Dbosoft.Rebus.Operations.Workflow;
+
+public class RebusOperationMessaging : IOperationMessaging
+{
+ private readonly IBus _bus;
+
+ public RebusOperationMessaging(IBus bus,
+ IOperationDispatcher operationDispatcher, IOperationTaskDispatcher taskDispatcher)
+ {
+ _bus = bus;
+ OperationDispatcher = operationDispatcher;
+ TaskDispatcher = taskDispatcher;
+ }
+
+ public virtual void DispatchTaskMessage(object command, IOperationTask task)
+ {
+ var messageType = command.GetType();
+ var outboundMessage = Activator.CreateInstance(
+ typeof(OperationTaskSystemMessage<>).MakeGenericType(messageType),
+ command, task.OperationId, task.InitiatingTaskId, task.Id);
+
+ _bus.SendLocal(outboundMessage);
+ }
+
+ public Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message)
+ {
+ var genericType = typeof(OperationTaskStatusEvent<>);
+ var wrappedCommandType = genericType.MakeGenericType(Type.GetType(commandType)
+ ?? throw new InvalidOperationException(
+ $"Unknown task command type '{commandType}'."));
+
+ var commandInstance = Activator.CreateInstance(wrappedCommandType, message);
+ return _bus.SendLocal(commandInstance);
+ }
+
+ public Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message)
+ {
+ return _bus.Publish(message);
+ }
+
+ public Task DispatchOperationStatusEventAsync(OperationStatusEvent message)
+ {
+ return _bus.Publish(message);
+ }
+
+ public IOperationDispatcher OperationDispatcher { get; }
+ public IOperationTaskDispatcher TaskDispatcher { get; }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/TaskWorkflowSagaData.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/TaskWorkflowSagaData.cs
new file mode 100644
index 0000000..134aa66
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/TaskWorkflowSagaData.cs
@@ -0,0 +1,18 @@
+using System;
+using Rebus.Sagas;
+
+namespace Dbosoft.Rebus.Operations.Workflow
+{
+ public class TaskWorkflowSagaData : ISagaData
+ {
+ public Guid OperationId { get; set; }
+
+ public Guid SagaTaskId { get; set; }
+ public Guid ParentTaskId { get; set; }
+
+
+ // these two are required by Rebus
+ public Guid Id { get; set; }
+ public int Revision { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs
new file mode 100644
index 0000000..6a35a2b
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs
@@ -0,0 +1,29 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations.Commands
+{
+ public class CreateNewOperationTaskCommand : IOperationTaskMessage
+ {
+ // ReSharper disable once UnusedMember.Global
+ public CreateNewOperationTaskCommand()
+ {
+ }
+
+ public CreateNewOperationTaskCommand(string commandType, string commandData,
+ Guid operationId, Guid initiatingTaskId, Guid taskId)
+ {
+ CommandType = commandType;
+ CommandData = commandData;
+ OperationId = operationId;
+ InitiatingTaskId = initiatingTaskId;
+ TaskId = taskId;
+ }
+
+ public string? CommandData { get; set; }
+ public string? CommandType { get; set; }
+
+ public Guid OperationId { get; set; }
+ public Guid InitiatingTaskId { get; set; }
+ public Guid TaskId { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateOperationCommand.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateOperationCommand.cs
new file mode 100644
index 0000000..af1af56
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateOperationCommand.cs
@@ -0,0 +1,7 @@
+namespace Dbosoft.Rebus.Operations.Commands
+{
+ public class CreateOperationCommand
+ {
+ public CreateNewOperationTaskCommand? TaskMessage { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs
new file mode 100644
index 0000000..600c5c7
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs
@@ -0,0 +1,33 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations.Commands
+{
+
+ ///
+ /// Generic message that wraps a task message
+ ///
+ public class OperationTaskSystemMessage : IOperationTaskMessage
+ {
+
+ // ReSharper disable once UnusedMember.Global
+ public OperationTaskSystemMessage()
+ {
+ }
+
+ public OperationTaskSystemMessage(TMessage message, Guid operationId, Guid initiatingTaskId, Guid taskId)
+ {
+ Message = message;
+ OperationId = operationId;
+ InitiatingTaskId = initiatingTaskId;
+ TaskId = taskId;
+ }
+
+ public TMessage? Message { get; set; }
+
+ public Guid OperationId { get; set; }
+ public Guid InitiatingTaskId { get; set; }
+
+
+ public Guid TaskId { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/ErrorData.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/ErrorData.cs
new file mode 100644
index 0000000..923b8e8
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/ErrorData.cs
@@ -0,0 +1,10 @@
+using JetBrains.Annotations;
+
+namespace Dbosoft.Rebus.Operations;
+
+[PublicAPI]
+public class ErrorData
+{
+ public string? ErrorMessage { get; set; }
+ public object? AdditionalData { get; set; }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationStatusEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationStatusEvent.cs
new file mode 100644
index 0000000..6d7df1b
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationStatusEvent.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations.Events;
+
+public class OperationStatusEvent
+{
+ public Guid OperationId { get; set; }
+ public OperationStatus NewStatus { get; set; }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs
new file mode 100644
index 0000000..3e2af39
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs
@@ -0,0 +1,13 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations.Events
+{
+ public class OperationTaskAcceptedEvent : IOperationTaskMessage
+ {
+ public Guid OperationId { get; set; }
+ public Guid InitiatingTaskId { get; set; }
+ public Guid TaskId { get; set; }
+
+ public object? AdditionalData { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskProgressEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskProgressEvent.cs
new file mode 100644
index 0000000..d95d3e8
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskProgressEvent.cs
@@ -0,0 +1,14 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations.Events
+{
+ public class OperationTaskProgressEvent
+ {
+ public Guid Id { get; set; }
+
+ public Guid OperationId { get; set; }
+ public Guid TaskId { get; set; }
+ public object? Data { get; set; }
+ public DateTimeOffset Timestamp { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs
new file mode 100644
index 0000000..11db0c2
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs
@@ -0,0 +1,58 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations.Events
+{
+ public class OperationTaskStatusEvent : OperationTaskStatusEventBase
+ {
+ // ReSharper disable once UnusedMember.Global
+ // required for serialization
+ public OperationTaskStatusEvent()
+ {
+ }
+
+ protected OperationTaskStatusEvent(Guid operationId, Guid initiatingTaskId, Guid taskId, bool failed,
+ string? messageType,
+ string? messageData) : base(operationId, initiatingTaskId, taskId, failed, messageType, messageData)
+ {
+
+ }
+
+ public static OperationTaskStatusEvent Failed(Guid operationId, Guid initiatingTaskId, Guid taskId)
+ {
+ return new OperationTaskStatusEvent(operationId, initiatingTaskId, taskId, true, null, null);
+ }
+
+ public static OperationTaskStatusEvent Failed(Guid operationId, Guid initiatingTaskId, Guid taskId, object? message)
+ {
+ var (data, typeName) = SerializeMessage(message);
+ return new OperationTaskStatusEvent(operationId, initiatingTaskId, taskId, true, typeName, data);
+ }
+
+ public static OperationTaskStatusEvent Completed(Guid operationId, Guid initiatingTaskId, Guid taskId)
+ {
+ return new OperationTaskStatusEvent(operationId, initiatingTaskId, taskId, false, null, null);
+ }
+
+ public static OperationTaskStatusEvent Completed(Guid operationId, Guid initiatingTaskId, Guid taskId, object? message)
+ {
+ var (data, typeName) = SerializeMessage(message);
+ return new OperationTaskStatusEvent(operationId, initiatingTaskId,taskId, false, typeName, data);
+ }
+
+ }
+
+
+ // ReSharper disable once UnusedTypeParameter
+ public class OperationTaskStatusEvent : OperationTaskStatusEventBase where T : class, new()
+ {
+
+ public OperationTaskStatusEvent()
+ {
+ }
+
+ public OperationTaskStatusEvent(OperationTaskStatusEvent message) :
+ base(message.OperationId, message.InitiatingTaskId, message.TaskId, message.OperationFailed, message.MessageType, message.MessageData)
+ {
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs
new file mode 100644
index 0000000..b60ff65
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs
@@ -0,0 +1,71 @@
+using System;
+using System.Text.Json;
+
+namespace Dbosoft.Rebus.Operations.Events;
+
+#nullable enable
+public class OperationTaskStatusEventBase : IOperationTaskMessage
+{
+ public OperationTaskStatusEventBase() {}
+
+ protected OperationTaskStatusEventBase(Guid operationId, Guid initiatingTaskId, Guid taskId, bool failed, string? messageType,
+ string? messageData)
+ {
+ OperationId = operationId;
+ InitiatingTaskId = initiatingTaskId;
+ TaskId = taskId;
+ OperationFailed = failed;
+ MessageData = messageData;
+ MessageType = messageType;
+ }
+
+ public string? MessageData { get; set; }
+ public string? MessageType { get; set; }
+ public bool OperationFailed { get; set; }
+ public Guid OperationId { get; set; }
+ public Guid TaskId { get; set; }
+ public Guid InitiatingTaskId { get; set; }
+
+ protected static (string? data, string? type) SerializeMessage(object? message)
+ {
+ if (message == null)
+ return (null, null);
+
+
+ return (JsonSerializer.Serialize(message), message.GetType().AssemblyQualifiedName);
+ }
+
+ public object? GetMessage()
+ {
+ if (MessageData == null || MessageType == null)
+ return null;
+
+ var type = Type.GetType(MessageType);
+
+ return type == null
+ ? null
+ : JsonSerializer.Deserialize(MessageData, type);
+ }
+
+ public T? GetErrorDetails()
+ {
+ if (MessageData == null || MessageType == null)
+ return default;
+
+ var type = Type.GetType(MessageType);
+
+ if (type == null) return default;
+
+ if (type != typeof(ErrorData) && !type.IsSubclassOf(typeof(ErrorData))) return default;
+
+ var data = JsonSerializer.Deserialize(MessageData, type) as ErrorData;
+
+ if ( data?.AdditionalData is JsonElement element)
+ {
+ return element.Deserialize();
+ }
+
+ return default;
+
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs
new file mode 100644
index 0000000..2302360
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations.Events
+{
+ public class OperationTimeoutEvent
+ {
+ public Guid OperationId { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs
new file mode 100644
index 0000000..e7af6ac
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs
@@ -0,0 +1,17 @@
+using System;
+
+namespace Dbosoft.Rebus.Operations
+{
+ ///
+ /// Interface for all Messages for operation tasks
+ ///
+ public interface IOperationTaskMessage
+ {
+ Guid OperationId { get; }
+
+ Guid InitiatingTaskId { get; }
+
+ Guid TaskId { get; }
+ }
+
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/OperationStatus.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/OperationStatus.cs
new file mode 100644
index 0000000..00eb5bd
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/OperationStatus.cs
@@ -0,0 +1,9 @@
+namespace Dbosoft.Rebus.Operations;
+
+public enum OperationStatus
+{
+ Queued,
+ Running,
+ Failed,
+ Completed
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/OperationTaskStatus.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/OperationTaskStatus.cs
new file mode 100644
index 0000000..ac905c0
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/OperationTaskStatus.cs
@@ -0,0 +1,9 @@
+namespace Dbosoft.Rebus.Operations;
+
+public enum OperationTaskStatus
+{
+ Queued,
+ Running,
+ Failed,
+ Completed
+}
\ No newline at end of file
diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Rebus.Operations.Primitives.csproj b/src/Rebus.Operations/Rebus.Operations.Primitives/Rebus.Operations.Primitives.csproj
new file mode 100644
index 0000000..003815b
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Rebus.Operations.Primitives.csproj
@@ -0,0 +1,14 @@
+
+
+
+ netstandard2.1
+ enable
+ latest
+
+
+
+
+
+
+
+
diff --git a/src/Rebus.Operations/Rebus.Operations.SimpleInjector/Rebus.Operations.SimpleInjector.csproj b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/Rebus.Operations.SimpleInjector.csproj
new file mode 100644
index 0000000..0458ec1
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/Rebus.Operations.SimpleInjector.csproj
@@ -0,0 +1,18 @@
+
+
+
+ netstandard2.1
+ enable
+ latest
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs
new file mode 100644
index 0000000..03a7dad
--- /dev/null
+++ b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs
@@ -0,0 +1,56 @@
+using System;
+using Dbosoft.Rebus.Operations.Commands;
+using Dbosoft.Rebus.Operations.Events;
+using Dbosoft.Rebus.Operations.Workflow;
+using Rebus.Config;
+using Rebus.Handlers;
+using Rebus.Routing.TypeBased;
+using SimpleInjector;
+
+namespace Dbosoft.Rebus.Operations;
+
+public static class SimpleInjectorExtensions
+{
+ public static RebusConfigurer AddOperations(this RebusConfigurer configurer, string queueName)
+ {
+ return configurer.Routing(x => x.TypeBased()
+ .Map(queueName)
+ .Map(queueName)
+ .Map(queueName)
+ .Map(queueName)
+ .Map(queueName)
+ .Map(queueName)
+ .Map(queueName));
+ }
+
+ public static Container AddRebusOperationsHandlers(this Container container)
+ where TOpManager : IOperationManager
+ where TTaskManager: IOperationTaskManager
+ {
+ return container.AddRebusOperationsHandlers(typeof(TOpManager), typeof(TTaskManager));
+ }
+
+ public static Container AddRebusOperationsHandlers(this Container container,
+ Type operationManagerType,
+ Type taskManagerType )
+ {
+ //workflow engine types
+ container.Register(typeof(IOperationManager), operationManagerType, Lifestyle.Scoped);
+ container.Register(typeof(IOperationTaskManager), taskManagerType, Lifestyle.Scoped);
+ container.RegisterConditional(Lifestyle.Scoped,c=> !c.Handled);
+ container.RegisterConditional(Lifestyle.Scoped,c=> !c.Handled);
+ container.RegisterConditional(Lifestyle.Scoped,c=> !c.Handled);
+
+ container.Register(Lifestyle.Scoped);
+ container.RegisterConditional(Lifestyle.Scoped, c=> !c.Handled);
+
+ container.Collection.Append(typeof(IHandleMessages<>), typeof(ProcessOperationSaga), Lifestyle.Scoped);
+ container.Collection.Append(typeof(IHandleMessages<>), typeof(OperationTaskProgressEventHandler), Lifestyle.Scoped);
+ container.Collection.Append(typeof(IHandleMessages<>), typeof(FailedOperationHandler<>), Lifestyle.Scoped);
+ container.Collection.Append(typeof(IHandleMessages<>), typeof(IncomingTaskMessageHandler<>), Lifestyle.Scoped);
+ container.Collection.Append(typeof(IHandleMessages<>), typeof(EmptyOperationStatusEventHandler), Lifestyle.Scoped);
+ container.Collection.Append(typeof(IHandleMessages<>), typeof(EmptyOperationTaskStatusEventHandler<>), Lifestyle.Scoped);
+
+ return container;
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.SimpleInjector/IRebusUnitOfWork.cs b/src/Rebus.SimpleInjector/IRebusUnitOfWork.cs
new file mode 100644
index 0000000..4025e78
--- /dev/null
+++ b/src/Rebus.SimpleInjector/IRebusUnitOfWork.cs
@@ -0,0 +1,7 @@
+namespace Dbosoft.Rebus
+{
+ public interface IRebusUnitOfWork : IAsyncDisposable, IDisposable
+ {
+ public Task Commit();
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.SimpleInjector/Rebus.SimpleInjector.csproj b/src/Rebus.SimpleInjector/Rebus.SimpleInjector.csproj
new file mode 100644
index 0000000..6b167f9
--- /dev/null
+++ b/src/Rebus.SimpleInjector/Rebus.SimpleInjector.csproj
@@ -0,0 +1,17 @@
+
+
+
+ netstandard2.1
+ enable
+ enable
+ Dbosoft.Rebus
+ latest
+
+
+
+
+
+
+
+
+
diff --git a/src/Rebus.SimpleInjector/RebusUnitOfWorkAdapter.cs b/src/Rebus.SimpleInjector/RebusUnitOfWorkAdapter.cs
new file mode 100644
index 0000000..8abe81f
--- /dev/null
+++ b/src/Rebus.SimpleInjector/RebusUnitOfWorkAdapter.cs
@@ -0,0 +1,25 @@
+using System.Diagnostics;
+using Rebus.Pipeline;
+using SimpleInjector;
+
+namespace Dbosoft.Rebus
+{
+ public class RebusUnitOfWorkAdapter
+ {
+ public async Task Dispose(IMessageContext context)
+ {
+ var scope = context.TransactionContext.Items["SI_scope"] as Scope;
+ Debug.Assert(scope != null);
+
+ await scope.GetInstance().DisposeAsync().ConfigureAwait(false);
+ }
+
+ public async Task Commit(IMessageContext context)
+ {
+ var scope = context.TransactionContext.Items["SI_scope"] as Scope;
+ Debug.Assert(scope != null);
+
+ await scope.GetInstance().Commit().ConfigureAwait(false);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.SimpleInjector/SimpleInjectorConfigurationExtensions.cs b/src/Rebus.SimpleInjector/SimpleInjectorConfigurationExtensions.cs
new file mode 100644
index 0000000..e1c4053
--- /dev/null
+++ b/src/Rebus.SimpleInjector/SimpleInjectorConfigurationExtensions.cs
@@ -0,0 +1,163 @@
+using Rebus.Bus;
+using Rebus.Bus.Advanced;
+using Rebus.Config;
+using Rebus.Messages;
+using Rebus.Pipeline;
+using Rebus.Transport;
+using SimpleInjector;
+
+namespace Dbosoft.Rebus
+{
+ ///
+ /// Configuration extensions for configuring a Rebus endpoint in your SimpleInjector container
+ ///
+ public static class SimpleInjectorConfigurationExtensions
+ {
+ ///
+ /// Makes the necessary registrations in the container, registering the passed-in
+ ///
+ /// as a configuration callback. The callback is invoked with a which must have
+ /// its method called at the end, returning the resulting
+ /// instance.
+ /// The configuration callback is called the first time the bus is resolved, which may be done manually or simply by
+ /// calling
+ ///
+ ///
+ public static void ConfigureRebus(this Container container, Func configurationCallback)
+ {
+ if (container.GetCurrentRegistrations().Any(r => r.ServiceType == typeof(IBus)))
+ throw new InvalidOperationException(
+ "Cannot register IBus in the container because it has already been registered. If you want to host multiple Rebus instances in a single process, please use separate container instances for them.");
+
+ container.Register(() =>
+ {
+ if (container.IsVerifying) return new FakeSyncBus();
+
+ return container.GetInstance().Advanced.SyncBus;
+ });
+
+ container.Register(() =>
+ {
+ var currentMessageContext = MessageContext.Current;
+
+ if (currentMessageContext != null) return currentMessageContext;
+
+ if (container.IsVerifying) return new FakeMessageContext();
+
+ throw new InvalidOperationException(
+ "Attempted to inject the current message context from MessageContext.Current, but it was null! Did you attempt to resolve IMessageContext from outside of a Rebus message handler?");
+ });
+
+ container.Register(() =>
+ {
+ var containerAdapter = new SimpleInjectorContainerAdapter(container);
+ var rebusConfigurer = Configure.With(containerAdapter);
+ return configurationCallback(rebusConfigurer);
+ }, Lifestyle.Singleton);
+ }
+
+ ///
+ /// After having configured the bus with the bus may be started by calling this method
+ ///
+ public static void StartBus(this Container container)
+ {
+ container.GetInstance();
+ }
+
+ private class FakeSyncBus : ISyncBus
+ {
+ public void SendLocal(object commandMessage, IDictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Send(object commandMessage, IDictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Reply(object replyMessage, IDictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Defer(TimeSpan delay, object message, IDictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void DeferLocal(TimeSpan delay, object message, IDictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Subscribe()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Subscribe(Type eventType)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Unsubscribe()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Unsubscribe(Type eventType)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Publish(object eventMessage, IDictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void SendLocal(object commandMessage, Dictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Send(object commandMessage, Dictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Reply(object replyMessage, Dictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Defer(TimeSpan delay, object message, Dictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void DeferLocal(TimeSpan delay, object message, Dictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Publish(object eventMessage, Dictionary optionalHeaders = null)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ ///
+ /// Fake implementation of that can be returned by SimpleInjector while verifying the
+ /// configuration
+ ///
+ private class FakeMessageContext : IMessageContext
+ {
+ public ITransactionContext TransactionContext { get; }
+ public IncomingStepContext IncomingStepContext { get; }
+ public TransportMessage TransportMessage { get; }
+ public Message Message { get; }
+ public Dictionary Headers { get; }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.SimpleInjector/SimpleInjectorContainerAdapter.cs b/src/Rebus.SimpleInjector/SimpleInjectorContainerAdapter.cs
new file mode 100644
index 0000000..c952d4b
--- /dev/null
+++ b/src/Rebus.SimpleInjector/SimpleInjectorContainerAdapter.cs
@@ -0,0 +1,82 @@
+using Rebus.Activation;
+using Rebus.Bus;
+using Rebus.Handlers;
+using Rebus.Transport;
+using SimpleInjector;
+using SimpleInjector.Lifestyles;
+
+// ReSharper disable ArgumentsStyleLiteral
+#pragma warning disable 1998
+
+namespace Dbosoft.Rebus
+{
+ internal class SimpleInjectorContainerAdapter : IContainerAdapter
+ {
+ private readonly Container _container;
+
+ private bool _busWasSet;
+
+ ///
+ /// Constructs the container adapter
+ ///
+ public SimpleInjectorContainerAdapter(Container container)
+ {
+ _container = container ?? throw new ArgumentNullException(nameof(container));
+ }
+
+ ///
+ /// Resolves all handlers for the given message type
+ ///
+ public async Task>> GetHandlers(TMessage message,
+ ITransactionContext transactionContext)
+ {
+ var scope = AsyncScopedLifestyle.BeginScope(_container);
+ if (TryGetInstance>>(_container, out var handlerInstances))
+ {
+ var handlerList = handlerInstances.ToList();
+ transactionContext.Items["SI_scope"] = scope;
+
+ transactionContext.OnDisposed(ctx =>
+ {
+ foreach (var disposable in handlerList.OfType()) disposable.Dispose();
+
+ scope.Dispose();
+ });
+
+ return handlerList;
+ }
+
+ scope.Dispose();
+
+ return Array.Empty>();
+ }
+
+ public void SetBus(IBus bus)
+ {
+ // hack: this is just to satisfy the contract test... we are pretty sure that
+ // 1. the SetBus method is not called twice, becauwe
+ // 2. we are the only ones who create the instance of the container adapter, because
+ // 3. the container adapter class is internal
+ if (_busWasSet)
+ throw new InvalidOperationException(
+ "SetBus was called twice on the container adapter. This is a sign that something has gone wrong during the configuration process.");
+ _busWasSet = true;
+
+ // 2nd hack:
+ // again: we control calls to SetBus in this container adapter, because we create it...
+ // so, to make the contract tests happy, we need to do this:
+ var actualBusType = bus.GetType();
+
+ if (actualBusType.Name == "FakeBus" && actualBusType.DeclaringType?.Name == "ContainerTests`1")
+ bus.Dispose();
+ }
+
+ private static bool TryGetInstance(Container container, out TService instance)
+ where TService : class
+ {
+ IServiceProvider provider = container;
+ instance = (TService) provider.GetService(typeof(TService));
+ return instance != null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Rebus.SimpleInjector/SimpleInjectorUnitOfWork.cs b/src/Rebus.SimpleInjector/SimpleInjectorUnitOfWork.cs
new file mode 100644
index 0000000..bd0878d
--- /dev/null
+++ b/src/Rebus.SimpleInjector/SimpleInjectorUnitOfWork.cs
@@ -0,0 +1,33 @@
+using Rebus.Config;
+using Rebus.Pipeline;
+
+namespace Dbosoft.Rebus
+{
+ public static class SimpleInjectorUnitOfWork
+ {
+ public static void EnableSimpleInjectorUnitOfWork(
+ this OptionsConfigurer configurer)
+ {
+ configurer.EnableAsyncUnitOfWork(Create, Commit, dispose: Dispose);
+ }
+
+ private static Task Create(IMessageContext context)
+ {
+ var unitOfWork = new RebusUnitOfWorkAdapter();
+ // stash current unit of work in the transaction context's items
+ context.TransactionContext.Items["uow"] = unitOfWork;
+
+ return Task.FromResult(unitOfWork);
+ }
+
+ private static Task Commit(IMessageContext context, RebusUnitOfWorkAdapter uow)
+ {
+ return uow.Commit(context);
+ }
+
+ private static Task Dispose(IMessageContext context, RebusUnitOfWorkAdapter uow)
+ {
+ return uow.Dispose(context);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/MultiStepCommand.cs b/test/Rebus.Operations.Tests/MultiStepCommand.cs
new file mode 100644
index 0000000..2ee95fb
--- /dev/null
+++ b/test/Rebus.Operations.Tests/MultiStepCommand.cs
@@ -0,0 +1,6 @@
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class MultiStepCommand
+{
+
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/MultiStepSaga.cs b/test/Rebus.Operations.Tests/MultiStepSaga.cs
new file mode 100644
index 0000000..87aee27
--- /dev/null
+++ b/test/Rebus.Operations.Tests/MultiStepSaga.cs
@@ -0,0 +1,41 @@
+using System.Runtime.InteropServices.ComTypes;
+using Dbosoft.Rebus.Operations.Events;
+using Dbosoft.Rebus.Operations.Workflow;
+using Rebus.Handlers;
+using Rebus.Sagas;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class MultiStepSaga :
+ OperationTaskWorkflowSaga,
+ IHandleMessages>,
+ IHandleMessages>
+{
+ public MultiStepSaga(IWorkflow workflowEngine) : base(workflowEngine)
+ {
+ }
+
+ protected override void CorrelateMessages(ICorrelationConfig config)
+ {
+ config.Correlate>(m => m.InitiatingTaskId, d => d.SagaTaskId);
+ config.Correlate>(m => m.InitiatingTaskId, d => d.SagaTaskId);
+
+ base.CorrelateMessages(config);
+ }
+
+ protected override Task Initiated(MultiStepCommand message)
+ {
+ return StartNewTask().AsTask();
+ }
+
+ public Task Handle(OperationTaskStatusEvent message)
+ {
+ return FailOrRun(message,
+ () => StartNewTask().AsTask());
+ }
+
+ public Task Handle(OperationTaskStatusEvent message)
+ {
+ return FailOrRun(message, () => Complete());
+ }
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/MultiStepSagaData.cs b/test/Rebus.Operations.Tests/MultiStepSagaData.cs
new file mode 100644
index 0000000..2eea4ce
--- /dev/null
+++ b/test/Rebus.Operations.Tests/MultiStepSagaData.cs
@@ -0,0 +1,8 @@
+using Dbosoft.Rebus.Operations.Workflow;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class MultiStepSagaData : TaskWorkflowSagaData
+{
+
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/Rebus.Operations.Tests.csproj b/test/Rebus.Operations.Tests/Rebus.Operations.Tests.csproj
new file mode 100644
index 0000000..2623b76
--- /dev/null
+++ b/test/Rebus.Operations.Tests/Rebus.Operations.Tests.csproj
@@ -0,0 +1,17 @@
+
+
+
+ net6.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/Rebus.Operations.Tests/RebusTestBase.cs b/test/Rebus.Operations.Tests/RebusTestBase.cs
new file mode 100644
index 0000000..394d74d
--- /dev/null
+++ b/test/Rebus.Operations.Tests/RebusTestBase.cs
@@ -0,0 +1,79 @@
+using Dbosoft.Rebus.Operations.Commands;
+using Dbosoft.Rebus.Operations.Events;
+using Dbosoft.Rebus.Operations.Workflow;
+using Microsoft.Extensions.Logging.Abstractions;
+using Rebus.Activation;
+using Rebus.Bus;
+using Rebus.Config;
+using Rebus.Persistence.InMem;
+using Rebus.Retry.Simple;
+using Rebus.Routing.TypeBased;
+using Rebus.Transport.InMem;
+using Xunit.Abstractions;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public abstract class RebusTestBase
+{
+ private readonly ITestOutputHelper _output;
+
+ protected RebusTestBase(ITestOutputHelper output)
+ {
+ _output = output;
+ }
+
+ public async Task SetupRebus(
+
+ Action configureActivator)
+ {
+ var rebusNetwork = new InMemNetwork();
+
+ var opManager = new TestOperationManager();
+ var taskManager = new TestTaskManager();
+
+ var activator = new BuiltinHandlerActivator();
+ var busStarter =
+ Configure.With(activator)
+ .Options(o =>
+ o.SimpleRetryStrategy(maxDeliveryAttempts:1, secondLevelRetriesEnabled:true))
+ .Transport(cfg => cfg.UseInMemoryTransport(rebusNetwork, "main"))
+ .Routing(r => r.TypeBased()
+ .Map("main")
+ .Map("main")
+ .Map("main")
+ .Map("main")
+ .Map("main")
+ .Map("main")
+ .Map("main")
+ )
+ .Sagas(x => x.StoreInMemory())
+ .Subscriptions(x=>x.StoreInMemory())
+ .Logging(x=>x.Use(new RebusTestLogging(_output)))
+ .Create();
+
+
+ var opDispatcher = new DefaultOperationDispatcher(busStarter.Bus,
+ NullLogger.Instance, opManager);
+
+ var taskDispatcher = new DefaultOperationTaskDispatcher(busStarter.Bus,
+ NullLogger.Instance,
+ opManager, taskManager);
+
+ var workflow = new DefaultWorkflow(
+ opManager, taskManager, new RebusOperationMessaging(busStarter.Bus,
+ opDispatcher, taskDispatcher));
+
+ activator.Register(() => new ProcessOperationSaga(workflow, NullLogger.Instance));
+ activator.Register(() =>
+ new OperationTaskProgressEventHandler(workflow, NullLogger.Instance));
+
+ configureActivator(activator,workflow, busStarter.Bus);
+
+ var bus = busStarter.Start();
+ await OperationsSetup.SubscribeEvents(busStarter.Bus);
+
+ return new TestRebusSetup(bus, opDispatcher, opManager, taskManager);
+
+ }
+
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/RebusTestLogging.cs b/test/Rebus.Operations.Tests/RebusTestLogging.cs
new file mode 100644
index 0000000..60e7767
--- /dev/null
+++ b/test/Rebus.Operations.Tests/RebusTestLogging.cs
@@ -0,0 +1,62 @@
+using Rebus.Logging;
+using Xunit.Abstractions;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class RebusTestLogging : AbstractRebusLoggerFactory
+{
+ private readonly ITestOutputHelper _testOutputHelper;
+
+ public RebusTestLogging(ITestOutputHelper testOutputHelper)
+ {
+ _testOutputHelper = testOutputHelper;
+ }
+ protected override ILog GetLogger(Type type)
+ {
+ return new Log(_testOutputHelper, this);
+ }
+
+ public class Log : ILog
+ {
+ private readonly ITestOutputHelper _testOutputHelper;
+ private readonly RebusTestLogging _factory;
+
+ public Log(ITestOutputHelper testOutputHelper, RebusTestLogging factory)
+ {
+ _testOutputHelper = testOutputHelper;
+ _factory = factory;
+ }
+
+ public void Debug(string message, params object[] objs)
+ {
+ _testOutputHelper.WriteLine("DBG: "+ _factory.RenderString(message, objs));
+ }
+
+ public void Info(string message, params object[] objs)
+ {
+ _testOutputHelper.WriteLine("INF: "+ _factory.RenderString(message, objs));
+ }
+
+ public void Warn(string message, params object[] objs)
+ {
+ _testOutputHelper.WriteLine("WRN: "+ _factory.RenderString(message, objs));
+ }
+
+ public void Warn(Exception exception, string message, params object[] objs)
+ {
+ _testOutputHelper.WriteLine("WRN: "+ _factory.RenderString(message, objs) + exception);
+ }
+
+ public void Error(string message, params object[] objs)
+ {
+ _testOutputHelper.WriteLine("ERR: "+ _factory.RenderString(message, objs));
+ }
+
+ public void Error(Exception exception, string message, params object[] objs)
+ {
+ _testOutputHelper.WriteLine("ERR: "+ _factory.RenderString(message, objs) + exception);
+ }
+
+
+ }
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/StepOneCommand.cs b/test/Rebus.Operations.Tests/StepOneCommand.cs
new file mode 100644
index 0000000..c3cee58
--- /dev/null
+++ b/test/Rebus.Operations.Tests/StepOneCommand.cs
@@ -0,0 +1,6 @@
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class StepOneCommand
+{
+
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/StepOneCommandHandler.cs b/test/Rebus.Operations.Tests/StepOneCommandHandler.cs
new file mode 100644
index 0000000..5f78dcb
--- /dev/null
+++ b/test/Rebus.Operations.Tests/StepOneCommandHandler.cs
@@ -0,0 +1,22 @@
+using Rebus.Bus;
+using Rebus.Handlers;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class StepOneCommandHandler : IHandleMessages>
+{
+ private readonly IBus _bus;
+
+ public StepOneCommandHandler(IBus bus)
+ {
+ _bus = bus;
+ }
+
+ public static bool Called { get; set; }
+
+ public Task Handle(OperationTask message)
+ {
+ Called = true;
+ return _bus.CompleteTask(message);
+ }
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/StepTwoCommand.cs b/test/Rebus.Operations.Tests/StepTwoCommand.cs
new file mode 100644
index 0000000..7deb9bc
--- /dev/null
+++ b/test/Rebus.Operations.Tests/StepTwoCommand.cs
@@ -0,0 +1,6 @@
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class StepTwoCommand
+{
+
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs b/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs
new file mode 100644
index 0000000..ebfaa49
--- /dev/null
+++ b/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs
@@ -0,0 +1,22 @@
+using Rebus.Bus;
+using Rebus.Handlers;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class StepTwoCommandHandler : IHandleMessages>
+{
+ private readonly IBus _bus;
+
+ public StepTwoCommandHandler(IBus bus)
+ {
+ _bus = bus;
+ }
+
+ public static bool Called { get; set; }
+
+ public Task Handle(OperationTask message)
+ {
+ Called = true;
+ return _bus.CompleteTask(message);
+ }
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/TestCommand.cs b/test/Rebus.Operations.Tests/TestCommand.cs
new file mode 100644
index 0000000..5248379
--- /dev/null
+++ b/test/Rebus.Operations.Tests/TestCommand.cs
@@ -0,0 +1,6 @@
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class TestCommand
+{
+
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/TestCommandHandler.cs b/test/Rebus.Operations.Tests/TestCommandHandler.cs
new file mode 100644
index 0000000..c4a830a
--- /dev/null
+++ b/test/Rebus.Operations.Tests/TestCommandHandler.cs
@@ -0,0 +1,22 @@
+using Dbosoft.Rebus.Operations.Workflow;
+using Rebus.Bus;
+using Rebus.Handlers;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class TestCommandHandler : IHandleMessages>
+{
+ private readonly IBus _bus;
+ public bool Called { get; private set; }
+
+ public TestCommandHandler(IBus bus)
+ {
+ _bus = bus;
+ }
+
+ public Task Handle(OperationTask message)
+ {
+ Called = true;
+ return _bus.CompleteTask(message);
+ }
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs b/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs
new file mode 100644
index 0000000..7400f34
--- /dev/null
+++ b/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs
@@ -0,0 +1,24 @@
+using Rebus.Bus;
+using Rebus.Handlers;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class TestCommandHandlerWithError : IHandleMessages>
+{
+ private readonly IBus _bus;
+ private readonly bool _throws;
+
+ public TestCommandHandlerWithError(IBus bus, bool throws)
+ {
+ _bus = bus;
+ _throws = throws;
+ }
+
+ public async Task Handle(OperationTask message)
+ {
+ if (_throws)
+ throw new InvalidOperationException();
+
+ await _bus.FailTask(message, "error");
+ }
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs b/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs
new file mode 100644
index 0000000..e14e2d6
--- /dev/null
+++ b/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs
@@ -0,0 +1,21 @@
+using Rebus.Bus;
+using Rebus.Handlers;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class TestCommandHandlerWithProgress : IHandleMessages>
+{
+ private readonly IBus _bus;
+ public TestCommandHandlerWithProgress(IBus bus)
+ {
+ _bus = bus;
+ }
+
+ public async Task Handle(OperationTask message)
+ {
+ await _bus.ProgressMessage(message, "progressData");
+ await Task.Delay(500);
+
+ await _bus.CompleteTask(message);
+ }
+}
\ No newline at end of file
diff --git a/test/Rebus.Operations.Tests/TestOperationManager.cs b/test/Rebus.Operations.Tests/TestOperationManager.cs
new file mode 100644
index 0000000..4951536
--- /dev/null
+++ b/test/Rebus.Operations.Tests/TestOperationManager.cs
@@ -0,0 +1,58 @@
+using Dbosoft.Rebus.Operations.Workflow;
+
+namespace Dbosoft.Rebus.Operations.Tests;
+
+public class TestOperationManager : OperationManagerBase
+{
+ public static readonly Dictionary Operations = new();
+ public static readonly Dictionary> Progress = new();
+
+ public static void Reset()
+ {
+ Operations.Clear();
+ Progress.Clear();
+ }
+
+ public override ValueTask GetByIdAsync(Guid operationId)
+ {
+ return ValueTask.FromResult(
+ Operations.ContainsKey(operationId)
+ ? Operations[operationId] as IOperation : null);
+
+ }
+
+ public override ValueTask GetOrCreateAsync(Guid operationId, object command)
+ {
+ if (Operations.ContainsKey(operationId))
+ return GetByIdAsync(operationId)!;
+
+ var op = new TestOperationModel
+ {
+ Id = operationId
+ };
+ Operations.Add(operationId, op);
+
+ return new ValueTask(op);
+ }
+
+ public override ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData)
+ {
+ if (!Operations.ContainsKey(operation.Id))
+ return new ValueTask(false);
+
+ Operations[operation.Id].Status = newStatus;
+
+ return new ValueTask(true);
+ }
+
+ public override ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task,
+ object? data)
+ {
+ if(!Progress.ContainsKey(progressId))
+ Progress.Add(operation.Id, new List