diff --git a/rebus-extensions.sln b/rebus-extensions.sln index ba074fe..164b32e 100644 --- a/rebus-extensions.sln +++ b/rebus-extensions.sln @@ -27,6 +27,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.SimpleInjector", "src EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.SimpleInjector.Tests", "test\Rebus.SimpleInjector.Tests\Rebus.SimpleInjector.Tests.csproj", "{539BED78-F196-4FA2-9E48-2185FE82444E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rebus.OperationsDB.Tests", "test\Rebus.OperationsDB.Tests\Rebus.OperationsDB.Tests.csproj", "{73DED33B-5E78-4843-B3B6-2C9BB0766E42}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -81,6 +83,10 @@ Global {539BED78-F196-4FA2-9E48-2185FE82444E}.Debug|Any CPU.Build.0 = Debug|Any CPU {539BED78-F196-4FA2-9E48-2185FE82444E}.Release|Any CPU.ActiveCfg = Release|Any CPU {539BED78-F196-4FA2-9E48-2185FE82444E}.Release|Any CPU.Build.0 = Release|Any CPU + {73DED33B-5E78-4843-B3B6-2C9BB0766E42}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {73DED33B-5E78-4843-B3B6-2C9BB0766E42}.Debug|Any CPU.Build.0 = Debug|Any CPU + {73DED33B-5E78-4843-B3B6-2C9BB0766E42}.Release|Any CPU.ActiveCfg = Release|Any CPU + {73DED33B-5E78-4843-B3B6-2C9BB0766E42}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs index 0dbbaef..2604412 100644 --- a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTask.cs @@ -11,6 +11,6 @@ public interface IOperationTask public Guid InitiatingTaskId { get; } - OperationTaskStatus Status { get; } + OperationTaskStatus Status { get; } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs index f915dba..36d5664 100644 --- a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationManager.cs @@ -9,9 +9,11 @@ public interface IOperationManager { ValueTask GetByIdAsync(Guid operationId); ValueTask GetOrCreateAsync(Guid operationId, object command, + DateTimeOffset timestamp, object? additionalData, IDictionary? additionalHeaders); - ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData, + ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, + DateTimeOffset timestamp, object? additionalData, IDictionary? messageHeaders); ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, object? data, IDictionary? messageHeaders); diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs index 54a0e75..31002ae 100644 --- a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Events; @@ -13,4 +14,8 @@ public interface IOperationMessaging IOperationDispatcher OperationDispatcher { get; } IOperationTaskDispatcher TaskDispatcher { get; } + + Task SendDeferredMessage(object message, TimeSpan defer); + Task DeferredCurrentMessage(TimeSpan defer); + } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs index fb171ff..54367c4 100644 --- a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationTaskManager.cs @@ -8,8 +8,12 @@ namespace Dbosoft.Rebus.Operations.Workflow; public interface IOperationTaskManager { ValueTask GetByIdAsync(Guid taskId); - ValueTask GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId); - ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, object? additionalData); + ValueTask GetOrCreateAsync(IOperation operation, object command, + DateTimeOffset created, + Guid taskId, Guid parentTaskId); + ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, + DateTimeOffset timestamp, + object? additionalData); } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs index 9737d80..8e01970 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs @@ -22,10 +22,13 @@ public DefaultOperationDispatcher( _operationManager = operationManager; } - protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData, + protected override async ValueTask<(IOperation, object)> CreateOperation(object command, + DateTimeOffset created, + object? additionalData, IDictionary? additionalHeaders) { return (await _operationManager.GetOrCreateAsync(Guid.NewGuid(), command, - additionalData,additionalHeaders), command); + created, + additionalData,additionalHeaders).ConfigureAwait(false), command); } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs index ac74f81..550bb82 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs @@ -24,14 +24,14 @@ public DefaultOperationTaskDispatcher( } protected override async ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, - object command, object? additionalData, IDictionary? additionalHeaders) + object command, DateTimeOffset created, object? additionalData, IDictionary? additionalHeaders) { - var op = await _operationManager.GetByIdAsync(operationId); + var op = await _operationManager.GetByIdAsync(operationId).ConfigureAwait(false); if (op == null) { throw new ArgumentException($"Operation {operationId} not found", nameof(operationId)); } - return (await _operationTaskManager.GetOrCreateAsync(op, command, Guid.NewGuid(), initiatingTaskId), command); + return (await _operationTaskManager.GetOrCreateAsync(op, command, created, Guid.NewGuid(), initiatingTaskId).ConfigureAwait(false), command); } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs index 0c48fa0..954c013 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs @@ -43,15 +43,18 @@ protected OperationDispatcherBase(IBus bus, WorkflowOptions options, ILogger CreateOperation(object command, object? additionalData, IDictionary? additionalHeaders); + protected abstract ValueTask<(IOperation, object)> CreateOperation(object command, DateTimeOffset created, object? additionalData, IDictionary? additionalHeaders); protected async ValueTask StartOperation(object command, object? additionalData, IDictionary? additionalHeaders = null) { + if (command == null) throw new ArgumentNullException(nameof(command)); - var(operation, taskCommand) = await CreateOperation(command, additionalData, additionalHeaders); - + var created = DateTimeOffset.Now; + var (operation, taskCommand) = + await CreateOperation(command, created, additionalData, additionalHeaders).ConfigureAwait(false); + var commandJson = JsonSerializer.Serialize(taskCommand, _options.JsonSerializerOptions); var taskMessage = new CreateNewOperationTaskCommand( @@ -59,18 +62,19 @@ protected OperationDispatcherBase(IBus bus, WorkflowOptions options, ILogger : IOperationTaskMessage where T : class, new() { - public OperationTask(T command, Guid operationId, Guid initiatingTaskId, Guid taskId) + + public OperationTask(T command, Guid operationId, Guid initiatingTaskId, Guid taskId, + DateTimeOffset created) { Command = command; OperationId = operationId; InitiatingTaskId = initiatingTaskId; TaskId = taskId; + Created = created; } public T Command { get; } public Guid OperationId { get; } public Guid InitiatingTaskId { get; } public Guid TaskId { get; } + public DateTimeOffset Created { get; } } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs index af9c2c8..c565e38 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs @@ -42,7 +42,7 @@ protected OperationTaskDispatcherBase(IBus bus, WorkflowOptions options, ILogger return StartTask(operationId, initiatingTaskId, commandType, additionalData, additionalHeaders); } - protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData, IDictionary? additionalHeaders); + protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, DateTimeOffset created, object? additionalData, IDictionary? additionalHeaders); protected async ValueTask StartTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData, IDictionary? additionalHeaders = null) @@ -50,7 +50,8 @@ protected OperationTaskDispatcherBase(IBus bus, WorkflowOptions options, ILogger if (command == null) throw new ArgumentNullException(nameof(command)); - var (task, taskCommand) = await CreateTask(operationId, initiatingTaskId, command, additionalData, additionalHeaders); + var created = DateTimeOffset.UtcNow; + var (task, taskCommand) = await CreateTask(operationId, initiatingTaskId, command, created, additionalData, additionalHeaders).ConfigureAwait(false); var commandJson = JsonSerializer.Serialize(taskCommand, _options.JsonSerializerOptions); var taskMessage = new CreateNewOperationTaskCommand( @@ -58,11 +59,11 @@ protected OperationTaskDispatcherBase(IBus bus, WorkflowOptions options, ILogger commandJson, operationId, initiatingTaskId, - task.Id); + task.Id, created); await (string.IsNullOrWhiteSpace(_options.OperationsDestination) ? _bus.Send(taskMessage, additionalHeaders) - : _bus.Advanced.Routing.Send(_options.OperationsDestination, taskMessage, additionalHeaders)) ; + : _bus.Advanced.Routing.Send(_options.OperationsDestination, taskMessage, additionalHeaders)).ConfigureAwait(false) ; _logger.LogDebug("Send new command of type {commandType}. Id: {operationId}, ParentTaskId: {parentTaskId}", taskCommand.GetType().Name, operationId, initiatingTaskId); diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs index 63656b5..6754fe3 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs @@ -14,15 +14,15 @@ public static async Task SubscribeEvents(IBus bus, WorkflowOptions options if (!string.IsNullOrWhiteSpace(options.EventDestination)) { - await bus.Advanced.Topics.Subscribe(options.EventDestination); + await bus.Advanced.Topics.Subscribe(options.EventDestination).ConfigureAwait(false); return bus; } - await bus.Subscribe(); - await bus.Subscribe(); - await bus.Subscribe(); - await bus.Subscribe(); - await bus.Subscribe(); + await bus.Subscribe().ConfigureAwait(false); + await bus.Subscribe().ConfigureAwait(false); + await bus.Subscribe().ConfigureAwait(false); + await bus.Subscribe().ConfigureAwait(false); + await bus.Subscribe().ConfigureAwait(false); return bus; } diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj.DotSettings b/src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj.DotSettings new file mode 100644 index 0000000..89316e4 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/Rebus.Operations.Core.csproj.DotSettings @@ -0,0 +1,2 @@ + + Library \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs index bd59d3b..306a175 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/FailedOperationHandler.cs @@ -43,7 +43,7 @@ await _operationMessaging.DispatchTaskStatusEventAsync( OperationTaskStatusEvent.Failed( failedMessage.Message.OperationId, failedMessage.Message.InitiatingTaskId, failedTaskId, new ErrorData() { ErrorMessage = failedMessage.ErrorDescription }, - _workflowOptions.JsonSerializerOptions)); + _workflowOptions.JsonSerializerOptions)).ConfigureAwait(false); } diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs index bac9cc6..0231680 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Commands; using Dbosoft.Rebus.Operations.Events; @@ -6,6 +7,7 @@ using Rebus.Bus; using Rebus.Handlers; using Rebus.Pipeline; +using Rebus.Transport; namespace Dbosoft.Rebus.Operations.Workflow @@ -22,27 +24,39 @@ public IncomingTaskMessageHandler(IBus bus, ILogger taskMessage, IDictionary? headers) + { + using var scope = new RebusTransactionScope(); + await bus.SendLocal(new OperationTask(taskMessage.Message, + taskMessage.OperationId, taskMessage.InitiatingTaskId, + taskMessage.TaskId, taskMessage.Created) + , headers + ).ConfigureAwait(false); + + await scope.CompleteAsync().ConfigureAwait(false); + } + public async Task Handle(OperationTaskSystemMessage taskMessage) { if(taskMessage.Message==null) throw new InvalidOperationException($"Operation Workflow {taskMessage.OperationId}/{taskMessage.TaskId}: missing command message"); var headers = _messageEnricher.EnrichHeadersFromIncomingSystemMessage(taskMessage, MessageContext.Current.Headers); - await _bus.SendLocal(new OperationTask(taskMessage.Message, taskMessage.OperationId, taskMessage.InitiatingTaskId, taskMessage.TaskId) - , headers - ).ConfigureAwait(false); - - _logger.LogTrace($"Accepted incoming operation message. Operation id: '{taskMessage.OperationId}'"); - var reply = new OperationTaskAcceptedEvent { OperationId = taskMessage.OperationId, InitiatingTaskId = taskMessage.InitiatingTaskId, TaskId = taskMessage.TaskId, - AdditionalData = _messageEnricher.EnrichTaskAcceptedReply(taskMessage) + AdditionalData = _messageEnricher.EnrichTaskAcceptedReply(taskMessage), + Created = taskMessage.Created }; await _bus.Reply(reply).ConfigureAwait(false); + _logger.LogTrace($"Accepted incoming operation message. Operation id: '{taskMessage.OperationId}'"); + + Resubmit(_bus, taskMessage, headers); } } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs index 3d04b31..2aaab91 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationManagerBase.cs @@ -10,7 +10,8 @@ public abstract class OperationManagerBase : IOperationManager public abstract ValueTask GetByIdAsync(Guid operationId); - public abstract ValueTask GetOrCreateAsync(Guid operationId, object command, + public abstract ValueTask GetOrCreateAsync(Guid operationId, object command, + DateTimeOffset timestamp, object? additionalData,IDictionary? additionalHeaders); @@ -18,7 +19,9 @@ public abstract ValueTask AddProgressAsync(Guid progressId, DateTimeOffset times IOperationTask task, object? data, IDictionary? messageHeaders); - public abstract ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, + public abstract ValueTask TryChangeStatusAsync(IOperation operation, + OperationStatus newStatus, + DateTimeOffset timestamp, object? additionalData, IDictionary? messageHeaders); diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs index 0f407e7..714e7f9 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskManagerBase.cs @@ -9,9 +9,12 @@ public abstract class OperationTaskManagerBase : IOperationTaskManager public abstract ValueTask GetByIdAsync(Guid taskId); - public abstract ValueTask GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId); + public abstract ValueTask GetOrCreateAsync(IOperation operation, object command, + DateTimeOffset created, Guid taskId, Guid parentTaskId); - public abstract ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, + public abstract ValueTask TryChangeStatusAsync(IOperationTask task, + OperationTaskStatus newStatus, + DateTimeOffset timestamp, object? additionalData); diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs index 2d4f074..3cd5222 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskProgressEventHandler.cs @@ -1,8 +1,11 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Events; using JetBrains.Annotations; using Microsoft.Extensions.Logging; using Rebus.Handlers; +using Rebus.Logging; +using Rebus.Messages; using Rebus.Pipeline; namespace Dbosoft.Rebus.Operations.Workflow @@ -35,12 +38,30 @@ public async Task Handle(OperationTaskProgressEvent message) if (operation != null && task!=null) { + if (task.Status == OperationTaskStatus.Queued) + { + var deferCount = 0; + if (MessageContext.Current.Headers.TryGetValue(Headers.DeferCount, + out var deferCountString)) + { + deferCount = int.Parse(deferCountString); + } + + if (deferCount < 5) + { + _logger.LogDebug("Operation Workflow {operationId}, Task {taskId}: Progress event received for queued task, deferred {deferCount} times, deferring for {deferTime} ms", + message.OperationId, message.TaskId, deferCount, 100 * (deferCount + 1)); + await _workflow.Messaging.DeferredCurrentMessage(TimeSpan.FromMilliseconds(100 * (deferCount + 1))).ConfigureAwait(false); + return; + } + } + await _workflow.Operations.AddProgressAsync( message.Id, message.Timestamp, operation, task, - message.Data, MessageContext.Current.Headers); + message.Data, MessageContext.Current.Headers).ConfigureAwait(false); } else { diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs index 66770d2..1ebf0fd 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/OperationTaskWorkflowSaga.cs @@ -70,24 +70,32 @@ protected Task Complete(object? message = null) WorkflowEngine.WorkflowOptions.JsonSerializerOptions)); } - protected Task FailOrRun(OperationTaskStatusEvent message, Func completedFunc) + protected async Task FailOrRun(OperationTaskStatusEvent message, Func completedFunc) where T : class, new() { + if (message.InitiatingTaskId != Data.SagaTaskId) + return; + if (message.OperationFailed) - return Fail(message.GetMessage(WorkflowEngine.WorkflowOptions.JsonSerializerOptions)); + await Fail(message.GetMessage(WorkflowEngine.WorkflowOptions.JsonSerializerOptions)).ConfigureAwait(false); - return completedFunc(); + await completedFunc().ConfigureAwait(false); } - protected Task FailOrRun(OperationTaskStatusEvent message, Func completedFunc) + protected async Task FailOrRun(OperationTaskStatusEvent message, Func completedFunc) where T : class, new() where TOpMessage : class { - return message.OperationFailed - ? Fail(message.GetMessage(WorkflowEngine.WorkflowOptions.JsonSerializerOptions)) - : completedFunc(message.GetMessage(WorkflowEngine.WorkflowOptions.JsonSerializerOptions) as TOpMessage - ?? throw new InvalidOperationException( - $"Message {typeof(T)} has not returned a result of type {typeof(TOpMessage)}.")); + if (message.InitiatingTaskId != Data.SagaTaskId) + return; + + if (message.OperationFailed) + await Fail(message.GetMessage(WorkflowEngine.WorkflowOptions.JsonSerializerOptions)).ConfigureAwait(false); + else + await completedFunc( + message.GetMessage(WorkflowEngine.WorkflowOptions.JsonSerializerOptions) as TOpMessage + ?? throw new InvalidOperationException( + $"Message {typeof(T)} has not returned a result of type {typeof(TOpMessage)}.")).ConfigureAwait(false); } diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs index 640242f..ed1cebd 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs @@ -1,6 +1,4 @@ -#nullable enable - -using System; +using System; using System.Text.Json; using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Commands; @@ -8,6 +6,7 @@ using JetBrains.Annotations; using Microsoft.Extensions.Logging; using Rebus.Handlers; +using Rebus.Messages; using Rebus.Pipeline; using Rebus.Sagas; @@ -19,7 +18,8 @@ public class ProcessOperationSaga : Saga, IHandleMessages, IHandleMessages, IHandleMessages, - IHandleMessages + IHandleMessages, + IHandleMessages { private readonly IWorkflow _workflow; private readonly ILogger _log; @@ -46,11 +46,21 @@ public Task Handle(OperationTimeoutEvent? message) return Task.CompletedTask; } + public Task Handle(OperationCompleteEvent? message) + { + _log.LogDebug("Operation Workflow {operationId}: Completing workflow", + Data.OperationId); + + MarkAsComplete(); + return Task.CompletedTask; + } + protected override void CorrelateMessages(ICorrelationConfig config) { config.Correlate(m => m.TaskMessage?.OperationId, d => d.OperationId); config.Correlate(m => m.OperationId, d => d.OperationId); config.Correlate(m => m.OperationId, d => d.OperationId); + config.Correlate(m => m.OperationId, d => d.OperationId); config.Correlate(m => m.OperationId, d => d.OperationId); config.Correlate(m => m.OperationId, d => d.OperationId); } @@ -83,12 +93,12 @@ public async Task Handle(CreateNewOperationTaskCommand message) { _log.LogWarning("Operation Workflow {operationId}: Operation not found - cancelling workflow", message.OperationId); - MarkAsComplete(); + Complete(); return; } var task = await _workflow.Tasks - .GetOrCreateAsync(op, command, message.TaskId, message.InitiatingTaskId) + .GetOrCreateAsync(op, command, message.Created, message.TaskId, message.InitiatingTaskId) .ConfigureAwait(false); @@ -96,8 +106,8 @@ public async Task Handle(CreateNewOperationTaskCommand message) if (messageType == null) throw new InvalidOperationException($"unknown command type '{message.CommandType}'"); - Data.Tasks.Add(message.TaskId, messageType.AssemblyQualifiedName!); - await _workflow.Messaging.DispatchTaskMessage(command,task); + Data.Tasks.TryAdd(message.TaskId, messageType.AssemblyQualifiedName!); + await _workflow.Messaging.DispatchTaskMessage(command,task).ConfigureAwait(false); } @@ -119,32 +129,73 @@ public async Task Handle(OperationTaskAcceptedEvent message) } var opOldStatus = op.Status; - if (await _workflow.Operations.TryChangeStatusAsync(op, OperationStatus.Running, null, - MessageContext.Current.Headers)) + if (await _workflow.Operations.TryChangeStatusAsync(op, OperationStatus.Running, + message.Created, + null, + MessageContext.Current.Headers).ConfigureAwait(false)) { - _log.LogDebug("Operation Workflow {operationId}: Status changed: {oldStatus} -> {newStatus}", - message.OperationId, opOldStatus, op.Status); - + if(opOldStatus != OperationStatus.Running) + _log.LogDebug("Operation Workflow {operationId}: Operation status change: {oldStatus} -> {newStatus}", + message.OperationId, opOldStatus, OperationStatus.Running); + else + _log.LogDebug("Operation Workflow {operationId}: Operation status already {newStatus}, only updated state", + message.OperationId, OperationStatus.Running); + await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent { OperationId = op.Id, NewStatus = OperationStatus.Running - }); + }).ConfigureAwait(false); + } + else + { + _log.LogDebug("Operation Workflow {operationId}: Status NOT changed to {newStatus}, keeping {oldStatus}. Message: {messageType}", + message.OperationId, OperationStatus.Running, opOldStatus, nameof(OperationTaskAcceptedEvent)); + } - var taskOldStatus = task.Status; - if (await _workflow.Tasks.TryChangeStatusAsync(task, OperationTaskStatus.Running, - message.AdditionalData)) - { - _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Status changed: {oldStatus} -> {newStatus}", - message.OperationId, message.TaskId, taskOldStatus, task.Status); + var taskOldStatus = task.Status; + if (await _workflow.Tasks.TryChangeStatusAsync(task, OperationTaskStatus.Running, + message.Created, + message.AdditionalData).ConfigureAwait(false)) + { + if(taskOldStatus != OperationTaskStatus.Running) + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Task accepted: {oldStatus} -> {newStatus}", + message.OperationId, message.TaskId, taskOldStatus, OperationTaskStatus.Running); + else + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Task status already {newStatus}, only updated state", + message.OperationId, message.TaskId, OperationTaskStatus.Running); + } + else + { + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Status NOT changed to {newStatus}, keeping {oldStatus}. Message: {messageType}", + message.OperationId, message.TaskId, OperationTaskStatus.Running, opOldStatus, nameof(OperationTaskAcceptedEvent)); - } } } + private void Complete() + { + if (_workflow.WorkflowOptions.DeferCompletion == TimeSpan.Zero) + { + _log.LogDebug("Operation Workflow {operationId}: Completing workflow", + Data.OperationId); + + MarkAsComplete(); + return; + } + + _log.LogDebug("Operation Workflow {operationId}: workflow can be completed, completion deferred for {deferred} seconds", + Data.OperationId, _workflow.WorkflowOptions.DeferCompletion.TotalSeconds); + + _workflow.Messaging.SendDeferredMessage(new OperationCompleteEvent + { + OperationId = Data.OperationId + }, _workflow.WorkflowOptions.DeferCompletion); + } + public async Task Handle(OperationTaskStatusEvent message) { var op = await _workflow.Operations @@ -162,6 +213,24 @@ public async Task Handle(OperationTaskStatusEvent message) return; } + if (task.Status == OperationTaskStatus.Queued) + { + var deferCount = 0; + if (MessageContext.Current.Headers.TryGetValue(Headers.DeferCount, + out var deferCountString)) + { + deferCount = int.Parse(deferCountString); + } + + if (deferCount < 5) + { + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Status change event received for queued task, deferred {deferCount} times, deferring for {deferTime} ms", + message.OperationId, message.TaskId, deferCount, 100*(deferCount+1)); + await _workflow.Messaging.DeferredCurrentMessage(TimeSpan.FromMilliseconds(100*(deferCount+1))).ConfigureAwait(false); + return; + } + } + if (task.Status is OperationTaskStatus.Queued or OperationTaskStatus.Running) { if(!Data.Tasks.ContainsKey(message.TaskId)) @@ -170,20 +239,24 @@ public async Task Handle(OperationTaskStatusEvent message) else { var taskCommandTypeName = Data.Tasks[message.TaskId]; - await _workflow.Messaging.DispatchTaskStatusEventAsync(taskCommandTypeName, message); + await _workflow.Messaging.DispatchTaskStatusEventAsync(taskCommandTypeName, message).ConfigureAwait(false); } } var taskOldStatus = task.Status; if(await _workflow.Tasks.TryChangeStatusAsync(task, - message.OperationFailed - ? OperationTaskStatus.Failed - : OperationTaskStatus.Completed - , message.GetMessage(_workflow.WorkflowOptions.JsonSerializerOptions))) - - _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Status changed: {oldStatus} -> {newStatus}", - message.OperationId, message.TaskId, taskOldStatus, task.Status); - + message.OperationFailed + ? OperationTaskStatus.Failed + : OperationTaskStatus.Completed + ,message.Created, + message.GetMessage(_workflow.WorkflowOptions.JsonSerializerOptions)).ConfigureAwait(false)) + + if(taskOldStatus != task.Status) + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Task status changed: {oldStatus} -> {newStatus}", + message.OperationId, message.TaskId, taskOldStatus, task.Status); + else + _log.LogDebug("Operation Workflow {operationId}, Task {taskId}: Task status already {newStatus}, only updated state", + message.OperationId, message.TaskId, task.Status); if (message.TaskId == Data.PrimaryTaskId) @@ -198,19 +271,18 @@ public async Task Handle(OperationTaskStatusEvent message) if (await _workflow.Operations.TryChangeStatusAsync(op, - newStatus, message.GetMessage(_workflow.WorkflowOptions.JsonSerializerOptions), MessageContext.Current.Headers)) + newStatus, + message.Created, + message.GetMessage(_workflow.WorkflowOptions.JsonSerializerOptions), MessageContext.Current.Headers).ConfigureAwait(false)) { await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent { OperationId = op.Id, NewStatus = newStatus - }); + }).ConfigureAwait(false); } - _log.LogDebug("Operation Workflow {operationId}: Completing workflow", - message.OperationId); - - MarkAsComplete(); + Complete(); } else { @@ -225,7 +297,7 @@ await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusE { message.InitiatingTaskId = initiatingTask.InitiatingTaskId; message.TaskId = initiatingTask.Id; - await _workflow.Messaging.DispatchTaskStatusEventAsync(taskCommandTypeName, message); + await _workflow.Messaging.DispatchTaskStatusEventAsync(taskCommandTypeName, message).ConfigureAwait(false); } } } diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs index f879ff1..6affc95 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs @@ -5,6 +5,7 @@ using Dbosoft.Rebus.Operations.Commands; using Dbosoft.Rebus.Operations.Events; using Rebus.Bus; +using Rebus.Messages; using Rebus.Pipeline; namespace Dbosoft.Rebus.Operations.Workflow; @@ -49,7 +50,8 @@ public virtual Task DispatchTaskMessage(object command, IOperationTask task, IDi var messageType = command.GetType(); var outboundMessage = Activator.CreateInstance( typeof(OperationTaskSystemMessage<>).MakeGenericType(messageType), - command, task.OperationId, task.InitiatingTaskId, task.Id); + command, task.OperationId, task.InitiatingTaskId, task.Id, + DateTimeOffset.UtcNow); var taskHeaders = _messageEnricher.EnrichHeadersOfOutgoingSystemMessage(command, JoinHeaders(additionalHeaders, MessageContext.Current.Headers)); @@ -86,6 +88,13 @@ public Task DispatchOperationStatusEventAsync(OperationStatusEvent message, IDic public IOperationDispatcher OperationDispatcher { get; } public IOperationTaskDispatcher TaskDispatcher { get; } - - + public Task SendDeferredMessage(object message, TimeSpan defer) + { + return _bus.DeferLocal(defer, message); + } + + public Task DeferredCurrentMessage(TimeSpan defer) + { + return _bus.Advanced.TransportMessage.Defer(defer); + } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs index 6a35a2b..78b4e67 100644 --- a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/CreateNewOperationTaskCommand.cs @@ -10,13 +10,14 @@ public CreateNewOperationTaskCommand() } public CreateNewOperationTaskCommand(string commandType, string commandData, - Guid operationId, Guid initiatingTaskId, Guid taskId) + Guid operationId, Guid initiatingTaskId, Guid taskId, DateTimeOffset created) { CommandType = commandType; CommandData = commandData; OperationId = operationId; InitiatingTaskId = initiatingTaskId; TaskId = taskId; + Created = created; } public string? CommandData { get; set; } @@ -25,5 +26,6 @@ public CreateNewOperationTaskCommand(string commandType, string commandData, public Guid OperationId { get; set; } public Guid InitiatingTaskId { get; set; } public Guid TaskId { get; set; } + public DateTimeOffset Created { get; set; } } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs index 600c5c7..aab953a 100644 --- a/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Commands/OperationTaskSystemMessage.cs @@ -14,12 +14,14 @@ public OperationTaskSystemMessage() { } - public OperationTaskSystemMessage(TMessage message, Guid operationId, Guid initiatingTaskId, Guid taskId) + public OperationTaskSystemMessage( + TMessage message, Guid operationId, Guid initiatingTaskId, Guid taskId, DateTimeOffset created) { Message = message; OperationId = operationId; InitiatingTaskId = initiatingTaskId; TaskId = taskId; + Created = created; } public TMessage? Message { get; set; } @@ -29,5 +31,6 @@ public OperationTaskSystemMessage(TMessage message, Guid operationId, Guid initi public Guid TaskId { get; set; } + public DateTimeOffset Created { get; set; } } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs index 3e2af39..5c36fe8 100644 --- a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskAcceptedEvent.cs @@ -9,5 +9,6 @@ public class OperationTaskAcceptedEvent : IOperationTaskMessage public Guid TaskId { get; set; } public object? AdditionalData { get; set; } + public DateTimeOffset Created { get; set; } } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs index 6cea1d4..7b05889 100644 --- a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEvent.cs @@ -13,7 +13,7 @@ public OperationTaskStatusEvent() protected OperationTaskStatusEvent(Guid operationId, Guid initiatingTaskId, Guid taskId, bool failed, string? messageType, - string? messageData) : base(operationId, initiatingTaskId, taskId, failed, messageType, messageData) + string? messageData) : base(operationId, initiatingTaskId, taskId, DateTimeOffset.Now, failed, messageType, messageData) { } @@ -52,7 +52,7 @@ public OperationTaskStatusEvent() } public OperationTaskStatusEvent(OperationTaskStatusEvent message) : - base(message.OperationId, message.InitiatingTaskId, message.TaskId, message.OperationFailed, message.MessageType, message.MessageData) + base(message.OperationId, message.InitiatingTaskId, message.TaskId, message.Created, message.OperationFailed, message.MessageType, message.MessageData) { } } diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs index 10f16ee..8d43e67 100644 --- a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTaskStatusEventBase.cs @@ -8,7 +8,9 @@ public class OperationTaskStatusEventBase : IOperationTaskStatusEvent { public OperationTaskStatusEventBase() {} - protected OperationTaskStatusEventBase(Guid operationId, Guid initiatingTaskId, Guid taskId, bool failed, string? messageType, + protected OperationTaskStatusEventBase(Guid operationId, Guid initiatingTaskId, Guid taskId, + DateTimeOffset created, + bool failed, string? messageType, string? messageData) { OperationId = operationId; @@ -17,6 +19,7 @@ protected OperationTaskStatusEventBase(Guid operationId, Guid initiatingTaskId, OperationFailed = failed; MessageData = messageData; MessageType = messageType; + Created = created; } public string? MessageData { get; set; } @@ -24,6 +27,7 @@ protected OperationTaskStatusEventBase(Guid operationId, Guid initiatingTaskId, public bool OperationFailed { get; set; } public Guid OperationId { get; set; } public Guid TaskId { get; set; } + public DateTimeOffset Created { get; set; } public Guid InitiatingTaskId { get; set; } protected static (string? data, string? type) SerializeMessage(object? message, JsonSerializerOptions serializerOptions) diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs index 2302360..f717d75 100644 --- a/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/Events/OperationTimeoutEvent.cs @@ -6,4 +6,9 @@ public class OperationTimeoutEvent { public Guid OperationId { get; set; } } + + public class OperationCompleteEvent + { + public Guid OperationId { get; set; } + } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs index 8d2e24c..63e870d 100644 --- a/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/IOperationTaskMessage.cs @@ -12,5 +12,6 @@ public interface IOperationTaskMessage Guid InitiatingTaskId { get; } Guid TaskId { get; } + DateTimeOffset Created { get; } } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs index e3ee2e6..e8eb301 100644 --- a/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs @@ -1,3 +1,4 @@ +using System; using System.Text.Json; namespace Dbosoft.Rebus.Operations; @@ -14,4 +15,6 @@ public WorkflowOptions() public string? OperationsDestination { get; set; } public JsonSerializerOptions JsonSerializerOptions { get; set; } + + public TimeSpan DeferCompletion { get; set; } = TimeSpan.Zero; } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestOperationManager.cs b/test/Rebus.Operations.Tests/TestOperationManager.cs index 7ef81da..c1e5d48 100644 --- a/test/Rebus.Operations.Tests/TestOperationManager.cs +++ b/test/Rebus.Operations.Tests/TestOperationManager.cs @@ -22,6 +22,7 @@ public static void Reset() } public override ValueTask GetOrCreateAsync(Guid operationId, object command, + DateTimeOffset created, object? additionalData, IDictionary? additionalHeaders) { if (Operations.ContainsKey(operationId)) @@ -36,7 +37,9 @@ public override ValueTask GetOrCreateAsync(Guid operationId, object return new ValueTask(op); } - public override ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, + public override ValueTask TryChangeStatusAsync(IOperation operation, + OperationStatus newStatus, + DateTimeOffset timestamp, object? additionalData, IDictionary? messageHeaders) { if (!Operations.ContainsKey(operation.Id)) diff --git a/test/Rebus.Operations.Tests/TestOperationTaskModel.cs b/test/Rebus.Operations.Tests/TestOperationTaskModel.cs index 06c2638..9e1fe47 100644 --- a/test/Rebus.Operations.Tests/TestOperationTaskModel.cs +++ b/test/Rebus.Operations.Tests/TestOperationTaskModel.cs @@ -6,4 +6,5 @@ public class TestOperationTaskModel : IOperationTask public Guid OperationId { get; set; } public Guid InitiatingTaskId { get; set; } public OperationTaskStatus Status { get; set; } + public DateTimeOffset Created { get; set; } } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestTaskManager.cs b/test/Rebus.Operations.Tests/TestTaskManager.cs index 5d71a51..bfb099f 100644 --- a/test/Rebus.Operations.Tests/TestTaskManager.cs +++ b/test/Rebus.Operations.Tests/TestTaskManager.cs @@ -19,7 +19,8 @@ public static void Reset() } - public override ValueTask GetOrCreateAsync(IOperation operation, object command, Guid taskId, Guid parentTaskId) + public override ValueTask GetOrCreateAsync(IOperation operation, + object command, DateTimeOffset created, Guid taskId, Guid parentTaskId) { if (Tasks.ContainsKey(taskId)) return GetByIdAsync(taskId)!; @@ -35,7 +36,7 @@ public override ValueTask GetOrCreateAsync(IOperation operation, return new ValueTask(task); } - public override ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, object? additionalData) + public override ValueTask TryChangeStatusAsync(IOperationTask task, OperationTaskStatus newStatus, DateTimeOffset created, object? additionalData) { if (!Tasks.ContainsKey(task.Id)) return new ValueTask(false); diff --git a/test/Rebus.OperationsDB.Tests/Commands/InitialSagaCommand.cs b/test/Rebus.OperationsDB.Tests/Commands/InitialSagaCommand.cs new file mode 100644 index 0000000..be29229 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Commands/InitialSagaCommand.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.OperationsDB.Tests.Commands; + +public class InitialSagaCommand +{ + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Commands/NestedNestedSagaCommand.cs b/test/Rebus.OperationsDB.Tests/Commands/NestedNestedSagaCommand.cs new file mode 100644 index 0000000..f4cbd07 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Commands/NestedNestedSagaCommand.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.OperationsDB.Tests.Commands; + +public class NestedNestedSagaCommand +{ + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Commands/NestedSagaCommand.cs b/test/Rebus.OperationsDB.Tests/Commands/NestedSagaCommand.cs new file mode 100644 index 0000000..5a3de08 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Commands/NestedSagaCommand.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.OperationsDB.Tests.Commands; + +public class NestedSagaCommand +{ + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Commands/SimpleCommand.cs b/test/Rebus.OperationsDB.Tests/Commands/SimpleCommand.cs new file mode 100644 index 0000000..219a01d --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Commands/SimpleCommand.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.OperationsDB.Tests.Commands; + +public class SimpleCommand +{ + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Commands/SubCommand1.cs b/test/Rebus.OperationsDB.Tests/Commands/SubCommand1.cs new file mode 100644 index 0000000..8b3bfe4 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Commands/SubCommand1.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.OperationsDB.Tests.Commands; + +public class SubCommand1 +{ + +} diff --git a/test/Rebus.OperationsDB.Tests/Commands/SubCommand2.cs b/test/Rebus.OperationsDB.Tests/Commands/SubCommand2.cs new file mode 100644 index 0000000..50dff91 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Commands/SubCommand2.cs @@ -0,0 +1,5 @@ +namespace Dbosoft.Rebus.OperationsDB.Tests.Commands; + +public class SubCommand2 +{ +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Commands/SubCommand3.cs b/test/Rebus.OperationsDB.Tests/Commands/SubCommand3.cs new file mode 100644 index 0000000..e5daa75 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Commands/SubCommand3.cs @@ -0,0 +1,6 @@ +namespace Dbosoft.Rebus.OperationsDB.Tests.Commands; + +public class SubCommand3 +{ + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/DatabaseTests.cs b/test/Rebus.OperationsDB.Tests/DatabaseTests.cs new file mode 100644 index 0000000..462f792 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/DatabaseTests.cs @@ -0,0 +1,259 @@ +using Dbosoft.Rebus.Operations; +using Dbosoft.Rebus.OperationsDB.Tests.Models; +using JetBrains.Annotations; +using MartinCostello.Logging.XUnit; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Rebus.Bus; +using Rebus.Config; +using Rebus.Handlers; +using Rebus.Persistence.InMem; +using Rebus.Retry.Simple; +using Rebus.Routing.TypeBased; +using Rebus.Sagas.Exclusive; +using Rebus.Transport.InMem; +using SimpleInjector; +using SimpleInjector.Lifestyles; +using System.Transactions; +using Microsoft.EntityFrameworkCore.Diagnostics; +using Rebus.TransactionScopes; +using Xunit; +using Xunit.Abstractions; +using Dbosoft.Rebus.OperationsDB.Tests.Commands; + +namespace Dbosoft.Rebus.OperationsDB.Tests; + +public class DatabaseTests : IClassFixture +{ + private readonly ITestOutputHelper _outputHelper; + + public DatabaseTests(ITestOutputHelper outputHelper) + { + _outputHelper = outputHelper; + + } + + private async Task SetupAndRunWorkflow( + int workers, + int timeout, + Func>> starter, + Func? validator = null) + + { + var inMemNetwork = new InMemNetwork(); + var workflowOptions = new WorkflowOptions + { + DispatchMode = WorkflowEventDispatchMode.Publish, + OperationsDestination = "workflow", + DeferCompletion = TimeSpan.FromMinutes(1) + }; + + var container = new Container(); + container.Options.DefaultScopedLifestyle = new AsyncScopedLifestyle(); + + container.Collection.Register(typeof(IHandleMessages<>), typeof(DatabaseTests).Assembly); + + container.RegisterInstance(workflowOptions); + container.AddRebusOperationsHandlers(); + + container.Register(typeof(IStateStoreRepository<>), typeof(StateStoreRepository<>), Lifestyle.Scoped); + + var contextOptions = new DbContextOptionsBuilder() + .UseSqlite("Data Source=state.db") + .ConfigureWarnings(x => x.Ignore(RelationalEventId.AmbientTransactionWarning)) + .Options; + + await using (var setupContext = new StateStoreContext(contextOptions)) + { + await setupContext.Database.EnsureCreatedAsync().ConfigureAwait(false); + } + + container.Register(() => new StateStoreContext(contextOptions), Lifestyle.Scoped); + container.Register(Lifestyle.Scoped); + + container.ConfigureRebus(configurer => configurer + .Transport(t => t.UseInMemoryTransport(inMemNetwork, "workflow")) + .Routing(r => r.TypeBased().AddOperations("workflow")) + .Options(x => + { + x.SimpleRetryStrategy(secondLevelRetriesEnabled: true, errorDetailsHeaderMaxLength: 5, maxDeliveryAttempts: 5); + x.SetNumberOfWorkers(workers); + x.EnableSimpleInjectorUnitOfWork(); + }) + .Logging(x=>x.MicrosoftExtensionsLogging(new XUnitLogger("rebus", _outputHelper, + new XUnitLoggerOptions()))) + .Subscriptions(c => c.StoreInMemory()) + .Timeouts(x=>x.StoreInMemory()) + .Sagas(s => + { + s.StoreInMemory(); + s.EnforceExclusiveAccess(); + }) + .Start()); + + _ = new HostBuilder() + .ConfigureLogging(l => + { + l.AddXUnit(_outputHelper); + l.SetMinimumLevel(LogLevel.Debug); + }) + .ConfigureServices(s=>s.AddSimpleInjector(container, + cfg => cfg.AddLogging())) + .Build() + .UseSimpleInjector(container); + + (IOperation?, OperationStatus)[] operations; + await using (var startScope = AsyncScopedLifestyle.BeginScope(container)) + { + // starts the bus + var bus = startScope.GetInstance(); + await OperationsSetup.SubscribeEvents(bus, workflowOptions).ConfigureAwait(false); + + var context = startScope.GetInstance(); + context.Operations?.RemoveRange(await context.Operations.ToListAsync().ConfigureAwait(false)); + context.OperationTasks?.RemoveRange(await context.OperationTasks.ToListAsync().ConfigureAwait(false)); + context.OperationLogs?.RemoveRange(await context.OperationLogs.ToListAsync().ConfigureAwait(false)); + await context.SaveChangesAsync().ConfigureAwait(false); + operations = (await starter(startScope).ConfigureAwait(false)).ToArray(); + + } + + var cancelTokenSource = new CancellationTokenSource(timeout); + var pendingOperations = operations.Select(x=>x.Item1?.Id).ToList(); + + while (!cancelTokenSource.IsCancellationRequested) + { + await Task.Delay(500, CancellationToken.None).ConfigureAwait(false); + await using var scope = AsyncScopedLifestyle.BeginScope(container); + var repository = scope.GetInstance>(); + var taskRepository = scope.GetInstance>(); + + var allOperations = await repository.ListAsync(CancellationToken.None).ConfigureAwait(false); + var totalCount = allOperations.Count; + var completedCount = allOperations.Count(x => x.Status == OperationStatus.Completed); + + var allTasks = await taskRepository + .ListAsync(CancellationToken.None).ConfigureAwait(false); + var totalTasksCount = allTasks.Count; + var completedTasksCount = allTasks.Count(x => x.Status == OperationTaskStatus.Completed); + + + _outputHelper.WriteLine($"Operations Total: {totalCount}, Completed: {completedCount}"); + _outputHelper.WriteLine($"Tasks Total: {totalTasksCount}, Completed: {completedTasksCount}"); + + for (var index = 0; index < allOperations.Count; index++) + { + var operation = allOperations[index]; + _outputHelper.WriteLine($"Operation {index} {operation.Id} Status: {operation.Status}"); + } + + foreach (var id in pendingOperations.ToArray()) + { + if(id == null) + pendingOperations.Remove(id); + else + { + var currentOperation = allOperations.FirstOrDefault(x => x.Id == id); + if (currentOperation == null) + throw new NullReferenceException($"Operation {id} is null"); + + if (currentOperation.Status is OperationStatus.Completed or OperationStatus.Failed ) + { + pendingOperations.Remove(id); + } + } + } + + if (pendingOperations.Count == 0) + break; + } + await using var validatorScope = AsyncScopedLifestyle.BeginScope(container); + + foreach (var (operation, expectedStatus) in operations) + { + var repository = validatorScope.GetInstance>(); + if (operation == null) + throw new NullReferenceException($"Operation is null is null"); + + var currentOperation = await repository.GetByIdAsync(operation.Id, CancellationToken.None).ConfigureAwait(false); + if (currentOperation == null) + throw new NullReferenceException($"Operation {operation.Id} is null"); + + Assert.Equal(expectedStatus, currentOperation.Status); + validator?.Invoke(validatorScope, currentOperation); + + } + } + + [Theory] + [InlineData(1, 1, 5000)] + [InlineData(1, 5, 5000)] + [InlineData(3, 5, 10000)] + [InlineData(2, 10, 10000)] + public async Task Runs_and_reports_a_simple_Workflow(int workers, int commands, int timeout) + { + await SetupAndRunWorkflow(workers,timeout, async sp => + { + await using var startContext = sp.GetRequiredService(); + var dispatcher = sp.GetRequiredService(); + + var result = new List<(IOperation?, OperationStatus)>(); + foreach (var _ in Enumerable.Range(0, commands)) + { + using var ta = new TransactionScope(TransactionScopeOption.Required, TransactionScopeAsyncFlowOption.Enabled); + ta.EnlistRebus(); + result.Add((await dispatcher.StartNew().ConfigureAwait(false), OperationStatus.Completed)); + await startContext.SaveChangesAsync().ConfigureAwait(false); + ta.Complete(); + await Task.Delay(10).ConfigureAwait(false); + } + + return result; + }).ConfigureAwait(false); + + + } + + [Theory] + [InlineData(1, 1, 5000)] + [InlineData(3, 5, 20000)] + [InlineData(5, 20, 40000)] + public async Task Runs_and_reports_a_complex_Workflow(int workers, int commands, int timeout) + { + await SetupAndRunWorkflow(workers,timeout, async sp => + { + var result = new List<(IOperation?, OperationStatus)>(); + await using var startContext = sp.GetRequiredService(); + var dispatcher = sp.GetRequiredService(); + + foreach(var _ in Enumerable.Range(0, commands)) + { + using var ta = new TransactionScope(TransactionScopeOption.Required, TransactionScopeAsyncFlowOption.Enabled); + ta.EnlistRebus(); + result.Add((await dispatcher.StartNew().ConfigureAwait(false), OperationStatus.Completed)); + await startContext.SaveChangesAsync().ConfigureAwait(false); + ta.Complete(); + await Task.Delay(10).ConfigureAwait(false); + + } + + return result; + }).ConfigureAwait(false); + + + } + + + [UsedImplicitly] + private class DeleteDb + { + public DeleteDb() + { + if(File.Exists("state.db")) + File.Delete("state.db"); + } + } + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/InitialSaga.cs b/test/Rebus.OperationsDB.Tests/Handlers/InitialSaga.cs new file mode 100644 index 0000000..f38d7eb --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/InitialSaga.cs @@ -0,0 +1,58 @@ +using Dbosoft.Rebus.Operations.Events; +using Dbosoft.Rebus.Operations.Workflow; +using Dbosoft.Rebus.OperationsDB.Tests.Commands; +using Rebus.Handlers; +using Rebus.Sagas; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +public class InitialSaga : OperationTaskWorkflowSaga, + IHandleMessages>, + IHandleMessages> + +{ + public InitialSaga(IWorkflow workflowEngine) : base(workflowEngine) + { + } + + protected override void CorrelateMessages(ICorrelationConfig config) + { + base.CorrelateMessages(config); + + config.Correlate>(m => m.InitiatingTaskId, + d => d.SagaTaskId); + config.Correlate>(m => m.InitiatingTaskId, + d => d.SagaTaskId); + } + + protected override async Task Initiated(InitialSagaCommand message) + { + await StartNewTask().ConfigureAwait(false); + await StartNewTask().ConfigureAwait(false); + } + + + public async Task Handle(OperationTaskStatusEvent message) + { + await FailOrRun(message, async () => + { + Data.SubCommand1Completed = true; + + if (Data.SubCommand1Completed && Data.SagaCompleted) + await Complete().ConfigureAwait(false); + }).ConfigureAwait(false); + } + + public async Task Handle(OperationTaskStatusEvent message) + { + await FailOrRun(message, async () => + { + Data.SagaCompleted = true; + + if (Data.SubCommand1Completed && Data.SagaCompleted) + await Complete().ConfigureAwait(false); + }).ConfigureAwait(false); + } + + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/InitialSagaData.cs b/test/Rebus.OperationsDB.Tests/Handlers/InitialSagaData.cs new file mode 100644 index 0000000..102d00d --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/InitialSagaData.cs @@ -0,0 +1,9 @@ +using Dbosoft.Rebus.Operations.Workflow; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +public class InitialSagaData : TaskWorkflowSagaData +{ + public bool SubCommand1Completed { get; set; } + public bool SagaCompleted { get; set; } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/NestedNestedSaga.cs b/test/Rebus.OperationsDB.Tests/Handlers/NestedNestedSaga.cs new file mode 100644 index 0000000..ee27a30 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/NestedNestedSaga.cs @@ -0,0 +1,56 @@ +using Dbosoft.Rebus.Operations.Events; +using Dbosoft.Rebus.Operations.Workflow; +using Dbosoft.Rebus.OperationsDB.Tests.Commands; +using Rebus.Handlers; +using Rebus.Sagas; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +public class NestedNestedSaga : OperationTaskWorkflowSaga, + IHandleMessages>, + IHandleMessages> + +{ + protected override void CorrelateMessages(ICorrelationConfig config) + { + base.CorrelateMessages(config); + + config.Correlate>(m => m.InitiatingTaskId, + d => d.SagaTaskId); + config.Correlate>(m => m.InitiatingTaskId, + d => d.SagaTaskId); + } + + protected override async Task Initiated(NestedNestedSagaCommand message) + { + await StartNewTask().ConfigureAwait(false); + await StartNewTask().ConfigureAwait(false); + } + + + public async Task Handle(OperationTaskStatusEvent message) + { + await FailOrRun(message, async () => + { + Data.SubCommand2Completed = true; + + if (Data.SubCommand2Completed && Data.SubCommand3Completed) + await Complete().ConfigureAwait(false); + }).ConfigureAwait(false); + } + + public async Task Handle(OperationTaskStatusEvent message) + { + await FailOrRun(message, async () => + { + Data.SubCommand3Completed = true; + + if (Data.SubCommand2Completed && Data.SubCommand3Completed) + await Complete().ConfigureAwait(false); + }).ConfigureAwait(false); + } + + public NestedNestedSaga(IWorkflow workflowEngine) : base(workflowEngine) + { + } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/NestedNestedSagaData.cs b/test/Rebus.OperationsDB.Tests/Handlers/NestedNestedSagaData.cs new file mode 100644 index 0000000..3aada18 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/NestedNestedSagaData.cs @@ -0,0 +1,10 @@ +using Dbosoft.Rebus.Operations.Workflow; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +public class NestedNestedSagaData : TaskWorkflowSagaData +{ + public bool SubCommand2Completed { get; set; } + public bool SubCommand3Completed { get; set; } + public bool SagaCompleted { get; set; } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/NestedSaga.cs b/test/Rebus.OperationsDB.Tests/Handlers/NestedSaga.cs new file mode 100644 index 0000000..8d41eeb --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/NestedSaga.cs @@ -0,0 +1,69 @@ +using Dbosoft.Rebus.Operations.Events; +using Dbosoft.Rebus.Operations.Workflow; +using Dbosoft.Rebus.OperationsDB.Tests.Commands; +using Rebus.Handlers; +using Rebus.Sagas; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +public class NestedSaga : OperationTaskWorkflowSaga, + IHandleMessages>, + IHandleMessages>, + IHandleMessages> +{ + protected override void CorrelateMessages(ICorrelationConfig config) + { + base.CorrelateMessages(config); + + config.Correlate>(m => m.InitiatingTaskId, + d => d.SagaTaskId); + config.Correlate>(m => m.InitiatingTaskId, + d => d.SagaTaskId); + config.Correlate>(m => m.InitiatingTaskId, + d => d.SagaTaskId); + } + + protected override async Task Initiated(NestedSagaCommand message) + { + await StartNewTask().ConfigureAwait(false); + await StartNewTask().ConfigureAwait(false); + await StartNewTask().ConfigureAwait(false); + } + + public async Task Handle(OperationTaskStatusEvent message) + { + await FailOrRun(message, async () => + { + Data.SagaCompleted = true; + + if (Data.SagaCompleted && Data.SubCommand2Completed && Data.SubCommand3Completed) + await Complete().ConfigureAwait(false); + }).ConfigureAwait(false); + } + + public async Task Handle(OperationTaskStatusEvent message) + { + await FailOrRun(message, async () => + { + Data.SubCommand2Completed = true; + + if (Data.SagaCompleted && Data.SubCommand2Completed && Data.SubCommand3Completed) + await Complete().ConfigureAwait(false); + }).ConfigureAwait(false); + } + + public async Task Handle(OperationTaskStatusEvent message) + { + await FailOrRun(message, async () => + { + Data.SubCommand3Completed = true; + + if (Data.SagaCompleted && Data.SubCommand2Completed && Data.SubCommand3Completed) + await Complete().ConfigureAwait(false); + }).ConfigureAwait(false); + } + + public NestedSaga(IWorkflow workflowEngine) : base(workflowEngine) + { + } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/NestedSagaData.cs b/test/Rebus.OperationsDB.Tests/Handlers/NestedSagaData.cs new file mode 100644 index 0000000..82cd43a --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/NestedSagaData.cs @@ -0,0 +1,10 @@ +using Dbosoft.Rebus.Operations.Workflow; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +public class NestedSagaData : TaskWorkflowSagaData +{ + public bool SubCommand2Completed { get; set; } + public bool SubCommand3Completed { get; set; } + public bool SagaCompleted { get; set; } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/SimpleCommandHandler.cs b/test/Rebus.OperationsDB.Tests/Handlers/SimpleCommandHandler.cs new file mode 100644 index 0000000..f311a7b --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/SimpleCommandHandler.cs @@ -0,0 +1,24 @@ +using Dbosoft.Rebus.Operations; +using Dbosoft.Rebus.OperationsDB.Tests.Commands; +using JetBrains.Annotations; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +[UsedImplicitly] +public class SimpleCommandHandler : IHandleMessages> +{ + private readonly ITaskMessaging _messaging; + + public SimpleCommandHandler(ITaskMessaging messaging) + { + _messaging = messaging; + } + + + public async Task Handle(OperationTask message) + { + await Task.Delay(1).ConfigureAwait(false); + await _messaging.CompleteTask(message).ConfigureAwait(false); + } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/Subcommand1Handler.cs b/test/Rebus.OperationsDB.Tests/Handlers/Subcommand1Handler.cs new file mode 100644 index 0000000..ada96fa --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/Subcommand1Handler.cs @@ -0,0 +1,22 @@ +using Dbosoft.Rebus.Operations; +using Dbosoft.Rebus.OperationsDB.Tests.Commands; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +public class Subcommand1Handler : IHandleMessages> +{ + private readonly ITaskMessaging _taskMessaging; + + public Subcommand1Handler(ITaskMessaging taskMessaging) + { + _taskMessaging = taskMessaging; + } + + public async Task Handle(OperationTask message) + { + await _taskMessaging.ProgressMessage(message, "started task Subcommand1").ConfigureAwait(false); + await Task.Delay(1000).ConfigureAwait(false); + await _taskMessaging.CompleteTask(message).ConfigureAwait(false); + } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/Subcommand2Handler.cs b/test/Rebus.OperationsDB.Tests/Handlers/Subcommand2Handler.cs new file mode 100644 index 0000000..b8ad93d --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/Subcommand2Handler.cs @@ -0,0 +1,23 @@ +using Dbosoft.Rebus.Operations; +using Dbosoft.Rebus.OperationsDB.Tests.Commands; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +public class Subcommand2Handler : IHandleMessages> +{ + private readonly ITaskMessaging _taskMessaging; + + public Subcommand2Handler(ITaskMessaging taskMessaging) + { + _taskMessaging = taskMessaging; + } + + public async Task Handle(OperationTask message) + { + await Task.Delay(1).ConfigureAwait(false); + await _taskMessaging.ProgressMessage(message, "started task Subcommand2").ConfigureAwait(false); + await Task.Delay(1).ConfigureAwait(false); + await _taskMessaging.CompleteTask(message).ConfigureAwait(false); + } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Handlers/Subcommand3Handler.cs b/test/Rebus.OperationsDB.Tests/Handlers/Subcommand3Handler.cs new file mode 100644 index 0000000..955056e --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Handlers/Subcommand3Handler.cs @@ -0,0 +1,26 @@ +using Dbosoft.Rebus.Operations; +using Dbosoft.Rebus.OperationsDB.Tests.Commands; +using Rebus.Handlers; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Handlers; + +public class Subcommand3Handler : IHandleMessages> +{ + private readonly ITaskMessaging _taskMessaging; + + public Subcommand3Handler(ITaskMessaging taskMessaging) + { + _taskMessaging = taskMessaging; + } + + public async Task Handle(OperationTask message) + { + await _taskMessaging.ProgressMessage(message, "started task Subcommand3").ConfigureAwait(false); + await Task.Delay(1).ConfigureAwait(false); + await _taskMessaging.ProgressMessage(message, 50).ConfigureAwait(false); + await Task.Delay(1).ConfigureAwait(false); + await _taskMessaging.ProgressMessage(message, 80).ConfigureAwait(false); + await Task.Delay(500).ConfigureAwait(false); + await _taskMessaging.CompleteTask(message).ConfigureAwait(false); + } +} diff --git a/test/Rebus.OperationsDB.Tests/IStateStoreRepository.cs b/test/Rebus.OperationsDB.Tests/IStateStoreRepository.cs new file mode 100644 index 0000000..5d24b97 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/IStateStoreRepository.cs @@ -0,0 +1,8 @@ +using Ardalis.Specification; + +namespace Dbosoft.Rebus.OperationsDB.Tests; + +public interface IStateStoreRepository : IRepositoryBase where T: class +{ + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Models/OperationAndTaskModel.cs b/test/Rebus.OperationsDB.Tests/Models/OperationAndTaskModel.cs new file mode 100644 index 0000000..ced233f --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Models/OperationAndTaskModel.cs @@ -0,0 +1,10 @@ +namespace Dbosoft.Rebus.OperationsDB.Tests.Models; + +public class OperationAndTaskModel +{ + public Guid Id { get; set; } + + public DateTimeOffset Created { get; set; } + public DateTimeOffset LastUpdate { get; set; } + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Models/OperationLogEntry.cs b/test/Rebus.OperationsDB.Tests/Models/OperationLogEntry.cs new file mode 100644 index 0000000..6183a2a --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Models/OperationLogEntry.cs @@ -0,0 +1,13 @@ +namespace Dbosoft.Rebus.OperationsDB.Tests.Models; + +public class OperationLogEntry +{ + public Guid Id { get; set; } + public Guid OperationId { get; set; } + public Guid TaskId { get; set; } + + public string Message { get; set; } + public virtual OperationModel Operation { get; set; } + public virtual OperationTaskModel Task { get; set; } + public DateTimeOffset Timestamp { get; set; } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Models/OperationModel.cs b/test/Rebus.OperationsDB.Tests/Models/OperationModel.cs new file mode 100644 index 0000000..bb02596 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Models/OperationModel.cs @@ -0,0 +1,11 @@ +using Dbosoft.Rebus.Operations; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Models; + +public class OperationModel : OperationAndTaskModel, IOperation +{ + public List? Tasks { get; set; } + public OperationStatus Status { get; set; } + + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Models/OperationTaskModel.cs b/test/Rebus.OperationsDB.Tests/Models/OperationTaskModel.cs new file mode 100644 index 0000000..518949a --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Models/OperationTaskModel.cs @@ -0,0 +1,13 @@ +using Dbosoft.Rebus.Operations; + +namespace Dbosoft.Rebus.OperationsDB.Tests.Models; + +public class OperationTaskModel : OperationAndTaskModel, IOperationTask +{ + public OperationModel? Operation { get; set; } + public Guid OperationId { get; set; } + public Guid InitiatingTaskId { get; set; } + public OperationTaskStatus Status { get; set; } + public int Progress { get; set; } + +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/MyOperationManager.cs b/test/Rebus.OperationsDB.Tests/MyOperationManager.cs new file mode 100644 index 0000000..115eae7 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/MyOperationManager.cs @@ -0,0 +1,96 @@ +using Dbosoft.Rebus.Operations; +using Dbosoft.Rebus.Operations.Workflow; +using Dbosoft.Rebus.OperationsDB.Tests.Models; + +namespace Dbosoft.Rebus.OperationsDB.Tests; + +public class MyOperationManager : IOperationManager +{ + private readonly IStateStoreRepository _repository; + private readonly IStateStoreRepository _logRepository; + private readonly IStateStoreRepository _taskRepository; + + public MyOperationManager(IStateStoreRepository repository, + IStateStoreRepository logRepository, IStateStoreRepository taskRepository) + { + _repository = repository; + _logRepository = logRepository; + _taskRepository = taskRepository; + } + + public async ValueTask GetByIdAsync(Guid operationId) + { + return await _repository.GetByIdAsync(operationId).ConfigureAwait(false); + } + + public async ValueTask GetOrCreateAsync(Guid operationId, object command, + DateTimeOffset timestamp, object? additionalData, IDictionary? additionalHeaders) + { + var model = await _repository.GetByIdAsync(operationId).ConfigureAwait(false); + if (model != null) + return model; + + model = new OperationModel + { + Id = operationId, + Created = timestamp, + LastUpdate = timestamp, + Status = OperationStatus.Queued, + }; + + await _repository.AddAsync(model).ConfigureAwait(false); + return model; + + } + + public async ValueTask TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, + DateTimeOffset timestamp, + object? additionalData, + IDictionary? messageHeaders) + { + var model = await _repository.GetByIdAsync(operation.Id).ConfigureAwait(false); + if (model == null) + return false; + + if (model.LastUpdate > timestamp) + return false; + + model.Status = newStatus; + model.LastUpdate = timestamp; + return true; + } + + public async ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, + object? data, IDictionary? messageHeaders) + { + var message = ""; + var progress = 0; + + if (data is string msgString) + message = msgString; + + if (data is int progressMsg) + progress = progressMsg; + + var taskEntry = await _taskRepository.GetByIdAsync(task.Id).ConfigureAwait(false); + if (taskEntry != null) + { + taskEntry.Progress = progress; + taskEntry.LastUpdate = timestamp; + } + + var opLogEntry = + new OperationLogEntry + { + Id = progressId, + OperationId = operation.Id, + TaskId = task.Id, + Message = message, + Timestamp = timestamp + }; + + await _logRepository.AddAsync(opLogEntry).ConfigureAwait(false); + + + } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/MyOperationTaskManager.cs b/test/Rebus.OperationsDB.Tests/MyOperationTaskManager.cs new file mode 100644 index 0000000..6fac688 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/MyOperationTaskManager.cs @@ -0,0 +1,60 @@ +using Dbosoft.Rebus.Operations; +using Dbosoft.Rebus.Operations.Workflow; +using Dbosoft.Rebus.OperationsDB.Tests.Models; + +namespace Dbosoft.Rebus.OperationsDB.Tests; + +public class MyOperationTaskManager : IOperationTaskManager +{ + private readonly IStateStoreRepository _repository; + + public MyOperationTaskManager(IStateStoreRepository repository) + { + _repository = repository; + } + + public async ValueTask GetByIdAsync(Guid taskId) + { + return await _repository.GetByIdAsync(taskId).ConfigureAwait(false); + } + + public async ValueTask GetOrCreateAsync(IOperation operation, + object command, + DateTimeOffset timestamp, + Guid taskId, Guid parentTaskId) + { + var model = await _repository.GetByIdAsync(taskId).ConfigureAwait(false); + if (model != null) + return model; + + model = new OperationTaskModel + { + Id = taskId, + Created = timestamp, + LastUpdate = timestamp, + InitiatingTaskId = parentTaskId, + OperationId = operation.Id, + Status = OperationTaskStatus.Queued + }; + + await _repository.AddAsync(model).ConfigureAwait(false); + return model; + } + + public async ValueTask TryChangeStatusAsync(IOperationTask task, + OperationTaskStatus newStatus, + DateTimeOffset timestamp, + object? additionalData) + { + var model = await _repository.GetByIdAsync(task.Id).ConfigureAwait(false); + if (model == null) + return false; + + if (model.LastUpdate > timestamp) + return false; + + model.Status = newStatus; + model.LastUpdate = timestamp; + return true; + } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/Rebus.OperationsDB.Tests.csproj b/test/Rebus.OperationsDB.Tests/Rebus.OperationsDB.Tests.csproj new file mode 100644 index 0000000..dec02b8 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Rebus.OperationsDB.Tests.csproj @@ -0,0 +1,25 @@ + + + + net6.0 + enable + enable + + + + + + + + + + + + + + + + + + + diff --git a/test/Rebus.OperationsDB.Tests/Rebus.OperationsDB.Tests.csproj.DotSettings b/test/Rebus.OperationsDB.Tests/Rebus.OperationsDB.Tests.csproj.DotSettings new file mode 100644 index 0000000..89316e4 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/Rebus.OperationsDB.Tests.csproj.DotSettings @@ -0,0 +1,2 @@ + + Library \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/StateStoreContext.cs b/test/Rebus.OperationsDB.Tests/StateStoreContext.cs new file mode 100644 index 0000000..b3ab32f --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/StateStoreContext.cs @@ -0,0 +1,46 @@ +using Dbosoft.Rebus.OperationsDB.Tests.Models; +using Microsoft.EntityFrameworkCore; + +namespace Dbosoft.Rebus.OperationsDB.Tests; + +public class StateStoreContext : DbContext +{ + + public StateStoreContext(DbContextOptions options) + : base(options) + { + } + + public DbSet? Operations { get; set; } + public DbSet? OperationTasks { get; set; } + public DbSet? OperationLogs{ get; set; } + + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + + modelBuilder.Entity().HasKey(x => x.Id); + modelBuilder.Entity().Property(x => x.Id).IsRequired().ValueGeneratedNever(); + + // modelBuilder.Entity() + // .Property(x => x.Timestamp) + // .IsRowVersion(); + + modelBuilder.Entity() + .HasMany(x => x.Tasks) + .WithOne(x => x.Operation) + .HasForeignKey(x => x.OperationId) + .OnDelete(DeleteBehavior.Cascade); + + modelBuilder.Entity() + .Property(x => x.LastUpdate) + .IsConcurrencyToken(); + + modelBuilder.Entity().HasKey(x => x.Id); + modelBuilder.Entity().Property(x => x.Id).IsRequired().ValueGeneratedNever(); + + modelBuilder.Entity() + .Property(x => x.LastUpdate) + .IsConcurrencyToken(); + } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/StateStoreDbUnitOfWork.cs b/test/Rebus.OperationsDB.Tests/StateStoreDbUnitOfWork.cs new file mode 100644 index 0000000..ecf9841 --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/StateStoreDbUnitOfWork.cs @@ -0,0 +1,29 @@ +using Microsoft.Extensions.Logging; + +namespace Dbosoft.Rebus.OperationsDB.Tests; + +public sealed class StateStoreDbUnitOfWork : IRebusUnitOfWork +{ + private readonly StateStoreContext _dbContext; + private readonly ILogger _logger; + public StateStoreDbUnitOfWork(StateStoreContext dbContext, ILogger logger) + { + _dbContext = dbContext; + _logger = logger; + } + + public ValueTask DisposeAsync() + { + return default; + } + + public Task Commit() + { + _logger.LogInformation("COMMIT of State Store"); + return _dbContext.SaveChangesAsync(); + } + + public void Dispose() + { + } +} \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/StateStoreRepository.cs b/test/Rebus.OperationsDB.Tests/StateStoreRepository.cs new file mode 100644 index 0000000..ef69eae --- /dev/null +++ b/test/Rebus.OperationsDB.Tests/StateStoreRepository.cs @@ -0,0 +1,17 @@ +using Ardalis.Specification.EntityFrameworkCore; + +namespace Dbosoft.Rebus.OperationsDB.Tests; + +public class StateStoreRepository : RepositoryBase, IStateStoreRepository where T : class +{ + + public StateStoreRepository(StateStoreContext dbContext) : base(dbContext) + { + } + + public override Task SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken()) + { + return Task.FromResult(0); + // return base.SaveChangesAsync(cancellationToken); + } +} \ No newline at end of file