diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs index 22e6de0..5393d54 100644 --- a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationDispatcher.cs @@ -1,15 +1,16 @@ #nullable enable using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Dbosoft.Rebus.Operations { public interface IOperationDispatcher { - ValueTask StartNew(object? additionalData = default) where T : class, new(); - ValueTask StartNew(Type commandType, object? additionalData = default); - ValueTask StartNew(object operationCommand); + ValueTask StartNew(object? additionalData = default, IDictionary? additionalHeaders = null) where T : class, new(); + ValueTask StartNew(Type commandType, object? additionalData = default, IDictionary? additionalHeaders = null); + ValueTask StartNew(object operationCommand, IDictionary? additionalHeaders = null); } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs index 9c18c43..167a57a 100644 --- a/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/IOperationTaskDispatcher.cs @@ -1,6 +1,7 @@ #nullable enable using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Dbosoft.Rebus.Operations @@ -8,13 +9,13 @@ namespace Dbosoft.Rebus.Operations public interface IOperationTaskDispatcher { ValueTask StartNew(Guid operationId, Guid initiatingTaskId, - object? additionalData = default) where T : class, new(); + object? additionalData = default, IDictionary? additionalHeaders = null) where T : class, new(); ValueTask StartNew(Guid operationId, Guid initiatingTaskId, Type operationCommandType, - object? additionalData = default); + object? additionalData = default, IDictionary? additionalHeaders = null); ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object command - , object? additionalData = default); + , object? additionalData = default, IDictionary? additionalHeaders = null); } diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/ITaskMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/ITaskMessaging.cs new file mode 100644 index 0000000..01de2a6 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/ITaskMessaging.cs @@ -0,0 +1,21 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Dbosoft.Rebus.Operations; + +public interface ITaskMessaging +{ + Task FailTask(IOperationTaskMessage message, string errorMessage, + IDictionary? additionalHeaders = null); + + Task FailTask(IOperationTaskMessage message, ErrorData error, + IDictionary? additionalHeaders = null); + + Task CompleteTask(IOperationTaskMessage message, IDictionary? additionalHeaders = null); + + Task CompleteTask(IOperationTaskMessage message, object responseMessage, + IDictionary? additionalHeaders = null); + + Task ProgressMessage(IOperationTaskMessage message, object data, + IDictionary? additionalHeaders = null); +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs index 0a4e318..19e9fa1 100644 --- a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IMessageEnricher.cs @@ -1,8 +1,22 @@ -using Dbosoft.Rebus.Operations.Commands; +using System.Collections.Generic; +using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Events; namespace Dbosoft.Rebus.Operations.Workflow; public interface IMessageEnricher { object? EnrichTaskAcceptedReply(OperationTaskSystemMessage taskMessage) where T : class, new(); -} \ No newline at end of file + + IDictionary? EnrichHeadersFromIncomingSystemMessage(OperationTaskSystemMessage taskMessage, + IDictionary systemMessageHeaders); + + IDictionary? EnrichHeadersOfOutgoingSystemMessage(object taskMessage, + IDictionary? previousHeaders); + + IDictionary? EnrichHeadersOfStatusEvent(OperationStatusEvent operationStatusEvent, + IDictionary? previousHeaders); + + IDictionary? EnrichHeadersOfTaskStatusEvent(OperationTaskStatusEvent operationStatusEvent, + IDictionary? previousHeaders); +} diff --git a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs index 8844aaf..54a0e75 100644 --- a/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs +++ b/src/Rebus.Operations/Rebus.Operations.Abstractions/Workflow/IOperationMessaging.cs @@ -1,17 +1,16 @@ -using System.Threading.Tasks; +using System.Collections.Generic; +using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Events; namespace Dbosoft.Rebus.Operations.Workflow; public interface IOperationMessaging { - Task DispatchTaskMessage(object command, IOperationTask task); - Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message); - Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message); - Task DispatchOperationStatusEventAsync(OperationStatusEvent operationStatusEvent); + Task DispatchTaskMessage(object command, IOperationTask task, IDictionary? additionalHeaders = null); + Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message, IDictionary? additionalHeaders = null); + Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message, IDictionary? additionalHeaders = null); + Task DispatchOperationStatusEventAsync(OperationStatusEvent operationStatusEvent, IDictionary? additionalHeaders = null); IOperationDispatcher OperationDispatcher { get; } IOperationTaskDispatcher TaskDispatcher { get; } - - } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs b/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs index 9eb777a..165beef 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/BusOperationTaskExtensions.cs @@ -1,57 +1,32 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; -using Dbosoft.Rebus.Operations.Events; +using Dbosoft.Rebus.Operations.Workflow; using Rebus.Bus; -using Rebus.Transport; namespace Dbosoft.Rebus.Operations; public static class BusOperationTaskExtensions { - public static Task FailTask(this IBus bus, IOperationTaskMessage message, string errorMessage) + public static Task SendWorkflowEvent(this IBus bus, WorkflowOptions options, object eventMessage, + IDictionary? additionalHeaders = null) { - return FailTask(bus, message, new ErrorData { ErrorMessage = errorMessage }); - } - - public static Task FailTask(this IBus bus, IOperationTaskMessage message, ErrorData error) - { - return bus.Publish( - OperationTaskStatusEvent.Failed( - message.OperationId, message.InitiatingTaskId, - message.TaskId, error)); - } - - - public static Task CompleteTask(this IBus bus, IOperationTaskMessage message) - { - return bus.Publish( - OperationTaskStatusEvent.Completed( - message.OperationId, message.InitiatingTaskId, message.TaskId)); - } - - public static Task CompleteTask(this IBus bus, IOperationTaskMessage message, object responseMessage) - { - return bus.Publish( - OperationTaskStatusEvent.Completed( - message.OperationId, message.InitiatingTaskId, message.TaskId, responseMessage)); - } - - - public static async Task ProgressMessage(this IBus bus, IOperationTaskMessage message, object data) - { - using var scope = new RebusTransactionScope(); - - - await bus.Publish(new OperationTaskProgressEvent + if (string.IsNullOrWhiteSpace(options.EventDestination)) { - Id = Guid.NewGuid(), - OperationId = message.OperationId, - TaskId = message.TaskId, - Data = data, - Timestamp = DateTimeOffset.UtcNow - }).ConfigureAwait(false); - - // commit it like this - await scope.CompleteAsync().ConfigureAwait(false); + return options.DispatchMode switch + { + WorkflowEventDispatchMode.Publish => bus.Publish(eventMessage, additionalHeaders), + WorkflowEventDispatchMode.Send => bus.Send(eventMessage, additionalHeaders), + _ => throw new ArgumentOutOfRangeException(nameof(options)) + }; + } + + return options.DispatchMode switch + { + WorkflowEventDispatchMode.Publish => bus.Advanced.Topics.Publish(options.EventDestination, eventMessage, additionalHeaders), + WorkflowEventDispatchMode.Send => bus.Advanced.Routing.Send(options.EventDestination, eventMessage, additionalHeaders), + _ => throw new ArgumentOutOfRangeException(nameof(options)) + }; } + } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs index eec08fc..f9fd2cb 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationDispatcher.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Workflow; using Microsoft.Extensions.Logging; @@ -14,8 +15,9 @@ public class DefaultOperationDispatcher : OperationDispatcherBase public DefaultOperationDispatcher( IBus bus, + WorkflowOptions workflowOptions, ILogger logger, - IOperationManager operationManager) : base(bus, logger) + IOperationManager operationManager) : base(bus, workflowOptions, logger) { _operationManager = operationManager; } diff --git a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs index fe87e11..9dfd62a 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/DefaultOperationTaskDispatcher.cs @@ -14,8 +14,9 @@ public class DefaultOperationTaskDispatcher : OperationTaskDispatcherBase public DefaultOperationTaskDispatcher( IBus bus, + WorkflowOptions workflowOptions, ILogger logger, - IOperationManager operationManager, IOperationTaskManager operationTaskManager) : base(bus, logger) + IOperationManager operationManager, IOperationTaskManager operationTaskManager) : base(bus, workflowOptions, logger) { _operationManager = operationManager; _operationTaskManager = operationTaskManager; diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs index bb1a183..b930f33 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationDispatcherBase.cs @@ -3,9 +3,11 @@ #nullable enable using System; +using System.Collections.Generic; using System.Text.Json; using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Workflow; using Microsoft.Extensions.Logging; using Rebus.Bus; @@ -15,34 +17,36 @@ namespace Dbosoft.Rebus.Operations public abstract class OperationDispatcherBase : IOperationDispatcher { private readonly IBus _bus; + private readonly WorkflowOptions _options; private readonly ILogger _logger; - protected OperationDispatcherBase(IBus bus, ILogger logger) + protected OperationDispatcherBase(IBus bus, WorkflowOptions options, ILogger logger) { _bus = bus; + _options = options; _logger = logger; } - public ValueTask StartNew(object command) + public ValueTask StartNew(object command, IDictionary? additionalHeaders = null) { - return StartOperation(command, null); + return StartOperation(command, null, additionalHeaders); } - public ValueTask StartNew(object? additionalData = default) + public ValueTask StartNew(object? additionalData = default, IDictionary? additionalHeaders = null) where T : class, new() { - return StartOperation( Activator.CreateInstance(),additionalData); + return StartOperation( Activator.CreateInstance(),additionalData, additionalHeaders); } - public ValueTask StartNew(Type commandType, object? additionalData = default) + public ValueTask StartNew(Type commandType, object? additionalData = default, IDictionary? additionalHeaders = null) { - return StartOperation(commandType,additionalData); + return StartOperation(commandType,additionalData, additionalHeaders); } protected abstract ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData); - protected async ValueTask StartOperation(object command, object? additionalData) + protected async ValueTask StartOperation(object command, object? additionalData, IDictionary? additionalHeaders = null) { if (command == null) throw new ArgumentNullException(nameof(command)); @@ -59,7 +63,9 @@ protected OperationDispatcherBase(IBus bus, ILogger log Guid.NewGuid()); var message = new CreateOperationCommand { TaskMessage = taskMessage }; - await _bus.Send(message); + await (string.IsNullOrWhiteSpace(_options.OperationsDestination) + ? _bus.Send(message, additionalHeaders) + : _bus.Advanced.Routing.Send(_options.OperationsDestination, message, additionalHeaders)); _logger.LogDebug("Send new command of type {commandType}. Id: {operationId}", taskCommand.GetType().Name, operation.Id); diff --git a/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs b/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs index 2e692b8..3b4f3fe 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/OperationTaskDispatcherBase.cs @@ -3,9 +3,11 @@ #nullable enable using System; +using System.Collections.Generic; using System.Text.Json; using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Workflow; using Microsoft.Extensions.Logging; using Rebus.Bus; @@ -13,34 +15,38 @@ namespace Dbosoft.Rebus.Operations; public abstract class OperationTaskDispatcherBase : IOperationTaskDispatcher { private readonly IBus _bus; + private readonly WorkflowOptions _options; private readonly ILogger _logger; - protected OperationTaskDispatcherBase(IBus bus, ILogger logger) + protected OperationTaskDispatcherBase(IBus bus, WorkflowOptions options, ILogger logger) { _bus = bus; + _options = options; _logger = logger; } - public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object command, object? additionalData = null) + public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object command, + object? additionalData = null, IDictionary? additionalHeaders = null) { - return StartTask(operationId, initiatingTaskId, command, additionalData); + return StartTask(operationId, initiatingTaskId, command, additionalData, additionalHeaders); } - public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object? additionalData = default) + public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, object? additionalData = default, IDictionary? additionalHeaders = null) where T : class, new() { return StartTask(operationId, initiatingTaskId, Activator.CreateInstance(), - additionalData); + additionalData, additionalHeaders); } - public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, Type commandType, object? additionalData = default) + public ValueTask StartNew(Guid operationId, Guid initiatingTaskId, Type commandType, object? additionalData = default, IDictionary? additionalHeaders = null) { - return StartTask(operationId, initiatingTaskId, commandType, additionalData); + return StartTask(operationId, initiatingTaskId, commandType, additionalData, additionalHeaders); } protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData); - protected async ValueTask StartTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData) + protected async ValueTask StartTask(Guid operationId, Guid initiatingTaskId, + object command, object? additionalData, IDictionary? additionalHeaders = null) { if (command == null) throw new ArgumentNullException(nameof(command)); @@ -55,7 +61,9 @@ protected OperationTaskDispatcherBase(IBus bus, ILogger SubscribeEvents(IBus bus) + public static async Task SubscribeEvents(IBus bus, WorkflowOptions options) { + if (options.DispatchMode == WorkflowEventDispatchMode.Send) + return bus; + + if (!string.IsNullOrWhiteSpace(options.EventDestination)) + { + await bus.Advanced.Topics.Subscribe(options.EventDestination); + return bus; + } + await bus.Subscribe(); await bus.Subscribe(); await bus.Subscribe(); diff --git a/src/Rebus.Operations/Rebus.Operations.Core/RebusConfigurerExtensions.cs b/src/Rebus.Operations/Rebus.Operations.Core/RebusConfigurerExtensions.cs new file mode 100644 index 0000000..ebee400 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/RebusConfigurerExtensions.cs @@ -0,0 +1,25 @@ +using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Events; +using Rebus.Routing.TypeBased; + +namespace Dbosoft.Rebus.Operations; + +public static class RebusConfigurerExtensions +{ + public static TypeBasedRouterConfigurationExtensions.TypeBasedRouterConfigurationBuilder + AddOperations(this TypeBasedRouterConfigurationExtensions.TypeBasedRouterConfigurationBuilder typeBasedRouter, + string operationsOwner, string? eventsOwner = default) + { + if (eventsOwner == default) + eventsOwner = operationsOwner; + + return typeBasedRouter + .Map(operationsOwner) + .Map(operationsOwner) + .Map(eventsOwner) + .Map(eventsOwner) + .Map(eventsOwner) + .Map(eventsOwner) + .Map(eventsOwner); + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/RebusTaskMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Core/RebusTaskMessaging.cs new file mode 100644 index 0000000..c2a4f93 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Core/RebusTaskMessaging.cs @@ -0,0 +1,68 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Dbosoft.Rebus.Operations.Events; +using Dbosoft.Rebus.Operations.Workflow; +using Rebus.Bus; +using Rebus.Transport; + +namespace Dbosoft.Rebus.Operations; + +public class RebusTaskMessaging : ITaskMessaging +{ + private readonly IBus _bus; + private readonly WorkflowOptions _options; + + public RebusTaskMessaging(IBus bus, + WorkflowOptions options) + { + _bus = bus; + _options = options; + } + + public Task FailTask(IOperationTaskMessage message, string errorMessage, IDictionary? additionalHeaders = null) + { + return FailTask(message, new ErrorData { ErrorMessage = errorMessage }, additionalHeaders); + } + + public Task FailTask(IOperationTaskMessage message, ErrorData error, IDictionary? additionalHeaders = null) + { + return _bus.SendWorkflowEvent(_options, + OperationTaskStatusEvent.Failed( + message.OperationId, message.InitiatingTaskId, + message.TaskId, error),additionalHeaders ); + } + + + public Task CompleteTask(IOperationTaskMessage message, IDictionary? additionalHeaders = null) + { + return _bus.SendWorkflowEvent(_options, + OperationTaskStatusEvent.Completed( + message.OperationId, message.InitiatingTaskId, message.TaskId), additionalHeaders); + } + + public Task CompleteTask(IOperationTaskMessage message, object responseMessage, IDictionary? additionalHeaders = null) + { + return _bus.SendWorkflowEvent(_options, + OperationTaskStatusEvent.Completed( + message.OperationId, message.InitiatingTaskId, message.TaskId, responseMessage), additionalHeaders); + } + + + public async Task ProgressMessage(IOperationTaskMessage message, object data, IDictionary? additionalHeaders = null) + { + using var scope = new RebusTransactionScope(); + + await _bus.SendWorkflowEvent(_options, new OperationTaskProgressEvent + { + Id = Guid.NewGuid(), + OperationId = message.OperationId, + TaskId = message.TaskId, + Data = data, + Timestamp = DateTimeOffset.UtcNow + }, additionalHeaders).ConfigureAwait(false); + + // commit it like this + await scope.CompleteAsync().ConfigureAwait(false); + } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs index b478117..1a5847b 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/DefaultMessageEnricher.cs @@ -1,4 +1,6 @@ -using Dbosoft.Rebus.Operations.Commands; +using System.Collections.Generic; +using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Events; namespace Dbosoft.Rebus.Operations.Workflow; @@ -8,4 +10,27 @@ public class DefaultMessageEnricher : IMessageEnricher { return null; } + + public IDictionary? EnrichHeadersFromIncomingSystemMessage( + OperationTaskSystemMessage taskMessage, IDictionary systemMessageHeaders) + { + return null; + } + + public IDictionary? EnrichHeadersOfOutgoingSystemMessage(object taskMessage, + IDictionary? previousHeaders) + { + return null; + } + + public IDictionary? EnrichHeadersOfStatusEvent(OperationStatusEvent operationStatusEvent, IDictionary? previousHeaders) + { + return null; + } + + public IDictionary? EnrichHeadersOfTaskStatusEvent(OperationTaskStatusEvent operationStatusEvent, + IDictionary? previousHeaders) + { + return null; + } } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs index 01ea7e7..bac9cc6 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs @@ -1,9 +1,12 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Commands; using Dbosoft.Rebus.Operations.Events; using Microsoft.Extensions.Logging; using Rebus.Bus; using Rebus.Handlers; +using Rebus.Pipeline; + namespace Dbosoft.Rebus.Operations.Workflow { @@ -21,7 +24,13 @@ public IncomingTaskMessageHandler(IBus bus, ILogger taskMessage) { - await _bus.SendLocal(new OperationTask(taskMessage.Message, taskMessage.OperationId, taskMessage.InitiatingTaskId, taskMessage.TaskId)).ConfigureAwait(false); + if(taskMessage.Message==null) + throw new InvalidOperationException($"Operation Workflow {taskMessage.OperationId}/{taskMessage.TaskId}: missing command message"); + + var headers = _messageEnricher.EnrichHeadersFromIncomingSystemMessage(taskMessage, MessageContext.Current.Headers); + await _bus.SendLocal(new OperationTask(taskMessage.Message, taskMessage.OperationId, taskMessage.InitiatingTaskId, taskMessage.TaskId) + , headers + ).ConfigureAwait(false); _logger.LogTrace($"Accepted incoming operation message. Operation id: '{taskMessage.OperationId}'"); diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs index d2774fa..fcb5bb1 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs @@ -31,19 +31,23 @@ public ProcessOperationSaga(IWorkflow workflow, ILogger log) public Task Handle(CreateOperationCommand message) { + if(message.TaskMessage == null) + throw new InvalidOperationException($"Operation Workflow: invalid command - missing task message"); + Data.OperationId = message.TaskMessage.OperationId; Data.PrimaryTaskId = message.TaskMessage.TaskId; + return Handle(message.TaskMessage); } - public Task Handle(OperationTimeoutEvent message) + public Task Handle(OperationTimeoutEvent? message) { return Task.CompletedTask; } protected override void CorrelateMessages(ICorrelationConfig config) { - config.Correlate(m => m.TaskMessage.OperationId, d => d.OperationId); + config.Correlate(m => m.TaskMessage?.OperationId, d => d.OperationId); config.Correlate(m => m.OperationId, d => d.OperationId); config.Correlate(m => m.OperationId, d => d.OperationId); config.Correlate(m => m.OperationId, d => d.OperationId); @@ -52,6 +56,10 @@ protected override void CorrelateMessages(ICorrelationConfig public async Task Handle(CreateNewOperationTaskCommand message) { + if(string.IsNullOrWhiteSpace(message.CommandData)) + throw new InvalidOperationException($"Operation Workflow {message.OperationId}: missing command data"); + if(string.IsNullOrWhiteSpace(message.CommandType)) + throw new InvalidOperationException($"Operation Workflow {message.OperationId}: missing command type"); var command = JsonSerializer.Deserialize(message.CommandData, Type.GetType(message.CommandType) ?? @@ -114,7 +122,7 @@ public async Task Handle(OperationTaskAcceptedEvent message) _log.LogDebug("Operation Workflow {operationId}: Status changed: {oldStatus} -> {newStatus}", message.OperationId, opOldStatus, op.Status); - + await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent { OperationId = op.Id, diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs index 8b46d5e..f879ff1 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/RebusOperationMessaging.cs @@ -1,34 +1,62 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Dbosoft.Rebus.Operations.Commands; using Dbosoft.Rebus.Operations.Events; using Rebus.Bus; +using Rebus.Pipeline; namespace Dbosoft.Rebus.Operations.Workflow; public class RebusOperationMessaging : IOperationMessaging { private readonly IBus _bus; + private readonly IMessageEnricher _messageEnricher; + private readonly WorkflowOptions _options; public RebusOperationMessaging(IBus bus, - IOperationDispatcher operationDispatcher, IOperationTaskDispatcher taskDispatcher) + IOperationDispatcher operationDispatcher, + IOperationTaskDispatcher taskDispatcher, + IMessageEnricher messageEnricher, + WorkflowOptions options) { _bus = bus; + _messageEnricher = messageEnricher; + _options = options; OperationDispatcher = operationDispatcher; TaskDispatcher = taskDispatcher; } - public virtual Task DispatchTaskMessage(object command, IOperationTask task) + private IDictionary? JoinHeaders(IDictionary? one, + IDictionary? another) + { + if (one == null) + return another; + + if (another == null) + return one; + + var result = new []{one, another}.SelectMany(dict => dict) + .ToLookup(pair => pair.Key, pair => pair.Value) + .ToDictionary(group => group.Key, group => group.First()); + + return result.Count == 0 ? null : result; + } + + public virtual Task DispatchTaskMessage(object command, IOperationTask task, IDictionary? additionalHeaders = null) { var messageType = command.GetType(); var outboundMessage = Activator.CreateInstance( typeof(OperationTaskSystemMessage<>).MakeGenericType(messageType), command, task.OperationId, task.InitiatingTaskId, task.Id); - return _bus.SendLocal(outboundMessage); + var taskHeaders = _messageEnricher.EnrichHeadersOfOutgoingSystemMessage(command, + JoinHeaders(additionalHeaders, MessageContext.Current.Headers)); + return _bus.SendLocal(outboundMessage, taskHeaders); } - public Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message) + public Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message, IDictionary? additionalHeaders = null) { var genericType = typeof(OperationTaskStatusEvent<>); var wrappedCommandType = genericType.MakeGenericType(Type.GetType(commandType) @@ -36,19 +64,28 @@ public Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatus $"Unknown task command type '{commandType}'.")); var commandInstance = Activator.CreateInstance(wrappedCommandType, message); - return _bus.SendLocal(commandInstance); + + var eventHeaders = _messageEnricher.EnrichHeadersOfTaskStatusEvent(message, + JoinHeaders(additionalHeaders, MessageContext.Current.Headers)); + return _bus.SendLocal(commandInstance, eventHeaders); } - public Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message) + public Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message, IDictionary? additionalHeaders = null) { - return _bus.Publish(message); + var eventHeaders = _messageEnricher.EnrichHeadersOfTaskStatusEvent(message, + JoinHeaders(additionalHeaders, MessageContext.Current.Headers)); + return _bus.SendWorkflowEvent(_options, message, eventHeaders); } - public Task DispatchOperationStatusEventAsync(OperationStatusEvent message) + public Task DispatchOperationStatusEventAsync(OperationStatusEvent message, IDictionary? additionalHeaders = null) { - return _bus.Publish(message); + var eventHeaders = _messageEnricher.EnrichHeadersOfStatusEvent(message, + JoinHeaders(additionalHeaders, MessageContext.Current.Headers)); + return _bus.SendWorkflowEvent(_options, message, eventHeaders); } public IOperationDispatcher OperationDispatcher { get; } public IOperationTaskDispatcher TaskDispatcher { get; } + + } \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowEventDispatchMode.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowEventDispatchMode.cs new file mode 100644 index 0000000..1e7a430 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowEventDispatchMode.cs @@ -0,0 +1,7 @@ +namespace Dbosoft.Rebus.Operations; + +public enum WorkflowEventDispatchMode +{ + Publish, + Send +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs b/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs new file mode 100644 index 0000000..2083ee4 --- /dev/null +++ b/src/Rebus.Operations/Rebus.Operations.Primitives/WorkflowOptions.cs @@ -0,0 +1,9 @@ +namespace Dbosoft.Rebus.Operations; + +public class WorkflowOptions +{ + public WorkflowEventDispatchMode DispatchMode { get; set; } = WorkflowEventDispatchMode.Publish; + public string? EventDestination { get; set; } + + public string? OperationsDestination { get; set; } +} \ No newline at end of file diff --git a/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs index 9f8ea03..4aeb958 100644 --- a/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs +++ b/src/Rebus.Operations/Rebus.Operations.SimpleInjector/SimpleInjectorExtensions.cs @@ -1,27 +1,12 @@ using System; -using Dbosoft.Rebus.Operations.Commands; -using Dbosoft.Rebus.Operations.Events; using Dbosoft.Rebus.Operations.Workflow; -using Rebus.Config; using Rebus.Handlers; -using Rebus.Routing.TypeBased; using SimpleInjector; namespace Dbosoft.Rebus.Operations; public static class SimpleInjectorExtensions { - public static RebusConfigurer AddOperations(this RebusConfigurer configurer, string queueName) - { - return configurer.Routing(x => x.TypeBased() - .Map(queueName) - .Map(queueName) - .Map(queueName) - .Map(queueName) - .Map(queueName) - .Map(queueName) - .Map(queueName)); - } public static Container AddRebusOperationsHandlers(this Container container) where TOpManager : IOperationManager @@ -42,15 +27,25 @@ public static Container AddRebusOperationsHandlers(this Container container, container.RegisterConditional(Lifestyle.Scoped,c=> !c.Handled); container.RegisterConditional(Lifestyle.Scoped, c=> !c.Handled); - container.RegisterConditional(Lifestyle.Scoped, c=> !c.Handled); - + container.Collection.Append(typeof(IHandleMessages<>), typeof(ProcessOperationSaga), Lifestyle.Scoped); container.Collection.Append(typeof(IHandleMessages<>), typeof(OperationTaskProgressEventHandler), Lifestyle.Scoped); container.Collection.Append(typeof(IHandleMessages<>), typeof(FailedOperationHandler<>), Lifestyle.Scoped); - container.Collection.Append(typeof(IHandleMessages<>), typeof(IncomingTaskMessageHandler<>), Lifestyle.Scoped); container.Collection.Append(typeof(IHandleMessages<>), typeof(EmptyOperationStatusEventHandler), Lifestyle.Scoped); container.Collection.Append(typeof(IHandleMessages<>), typeof(EmptyOperationTaskStatusEventHandler<>), Lifestyle.Scoped); + AddRebusOperationsHandlers(container); + return container; + } + + public static Container AddRebusOperationsHandlers(this Container container) + { + + container.RegisterConditional(Lifestyle.Scoped, c=> !c.Handled); + container.RegisterConditional(Lifestyle.Scoped, c=> !c.Handled); + + container.Collection.Append(typeof(IHandleMessages<>), typeof(IncomingTaskMessageHandler<>), Lifestyle.Scoped); + return container; } } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/ExposingHeadersCommandHandler.cs b/test/Rebus.Operations.Tests/ExposingHeadersCommandHandler.cs new file mode 100644 index 0000000..6e6349c --- /dev/null +++ b/test/Rebus.Operations.Tests/ExposingHeadersCommandHandler.cs @@ -0,0 +1,24 @@ +using Rebus.Handlers; +using Rebus.Pipeline; + +namespace Dbosoft.Rebus.Operations.Tests; + +public class ExposingHeadersCommandHandler : IHandleMessages> +{ + private readonly ITaskMessaging _messaging; + + public ExposingHeadersCommandHandler(ITaskMessaging messaging) + { + _messaging = messaging; + } + + public static bool Called { get; set; } + public static IDictionary Headers; + + public Task Handle(OperationTask message) + { + Called = true; + Headers = MessageContext.Current.Headers; + return _messaging.CompleteTask(message); + } +} \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/RebusTestBase.cs b/test/Rebus.Operations.Tests/RebusTestBase.cs index 394d74d..c2458a9 100644 --- a/test/Rebus.Operations.Tests/RebusTestBase.cs +++ b/test/Rebus.Operations.Tests/RebusTestBase.cs @@ -1,5 +1,3 @@ -using Dbosoft.Rebus.Operations.Commands; -using Dbosoft.Rebus.Operations.Events; using Dbosoft.Rebus.Operations.Workflow; using Microsoft.Extensions.Logging.Abstractions; using Rebus.Activation; @@ -23,8 +21,9 @@ protected RebusTestBase(ITestOutputHelper output) } public async Task SetupRebus( - - Action configureActivator) + bool sendMode, string eventDestination, + Action configureActivator, + IMessageEnricher? messageEnricher = default) { var rebusNetwork = new InMemNetwork(); @@ -34,43 +33,55 @@ public async Task SetupRebus( var activator = new BuiltinHandlerActivator(); var busStarter = Configure.With(activator) - .Options(o => - o.SimpleRetryStrategy(maxDeliveryAttempts:1, secondLevelRetriesEnabled:true)) + .Options(o => + { + o.SimpleRetryStrategy(maxDeliveryAttempts: 1, secondLevelRetriesEnabled: true); + }) .Transport(cfg => cfg.UseInMemoryTransport(rebusNetwork, "main")) - .Routing(r => r.TypeBased() - .Map("main") - .Map("main") - .Map("main") - .Map("main") - .Map("main") - .Map("main") - .Map("main") - ) + .Routing(r => + { + if (string.IsNullOrWhiteSpace(eventDestination)) + { + r.TypeBased().AddOperations("main"); + } + }) .Sagas(x => x.StoreInMemory()) - .Subscriptions(x=>x.StoreInMemory()) + .Subscriptions(x=>x.StoreInMemory(new InMemorySubscriberStore())) .Logging(x=>x.Use(new RebusTestLogging(_output))) .Create(); + var workflowOptions = new WorkflowOptions + { + DispatchMode = sendMode ? WorkflowEventDispatchMode.Send : WorkflowEventDispatchMode.Publish, + EventDestination = eventDestination, + OperationsDestination = eventDestination + }; - var opDispatcher = new DefaultOperationDispatcher(busStarter.Bus, + var opDispatcher = new DefaultOperationDispatcher(busStarter.Bus,workflowOptions, NullLogger.Instance, opManager); var taskDispatcher = new DefaultOperationTaskDispatcher(busStarter.Bus, + workflowOptions, NullLogger.Instance, opManager, taskManager); - + + messageEnricher ??= new DefaultMessageEnricher(); var workflow = new DefaultWorkflow( opManager, taskManager, new RebusOperationMessaging(busStarter.Bus, - opDispatcher, taskDispatcher)); + opDispatcher, taskDispatcher,messageEnricher, workflowOptions )); + var taskMessaging = new RebusTaskMessaging(busStarter.Bus, workflowOptions); + activator.Register(() => new ProcessOperationSaga(workflow, NullLogger.Instance)); activator.Register(() => new OperationTaskProgressEventHandler(workflow, NullLogger.Instance)); - configureActivator(activator,workflow, busStarter.Bus); + configureActivator(activator,workflow, taskMessaging, busStarter.Bus); var bus = busStarter.Start(); - await OperationsSetup.SubscribeEvents(busStarter.Bus); + + if(!sendMode) + await OperationsSetup.SubscribeEvents(busStarter.Bus, workflowOptions); return new TestRebusSetup(bus, opDispatcher, opManager, taskManager); diff --git a/test/Rebus.Operations.Tests/StepOneCommandHandler.cs b/test/Rebus.Operations.Tests/StepOneCommandHandler.cs index 5f78dcb..74a28c4 100644 --- a/test/Rebus.Operations.Tests/StepOneCommandHandler.cs +++ b/test/Rebus.Operations.Tests/StepOneCommandHandler.cs @@ -1,3 +1,4 @@ +using Dbosoft.Rebus.Operations.Workflow; using Rebus.Bus; using Rebus.Handlers; @@ -5,11 +6,11 @@ namespace Dbosoft.Rebus.Operations.Tests; public class StepOneCommandHandler : IHandleMessages> { - private readonly IBus _bus; + private readonly ITaskMessaging _messaging; - public StepOneCommandHandler(IBus bus) + public StepOneCommandHandler(ITaskMessaging messaging) { - _bus = bus; + _messaging = messaging; } public static bool Called { get; set; } @@ -17,6 +18,6 @@ public StepOneCommandHandler(IBus bus) public Task Handle(OperationTask message) { Called = true; - return _bus.CompleteTask(message); + return _messaging.CompleteTask(message); } } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs b/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs index ebfaa49..c6b28ba 100644 --- a/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs +++ b/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs @@ -1,3 +1,4 @@ +using Dbosoft.Rebus.Operations.Workflow; using Rebus.Bus; using Rebus.Handlers; @@ -5,11 +6,11 @@ namespace Dbosoft.Rebus.Operations.Tests; public class StepTwoCommandHandler : IHandleMessages> { - private readonly IBus _bus; + private readonly ITaskMessaging _messaging; - public StepTwoCommandHandler(IBus bus) + public StepTwoCommandHandler(ITaskMessaging messaging) { - _bus = bus; + _messaging = messaging; } public static bool Called { get; set; } @@ -17,6 +18,6 @@ public StepTwoCommandHandler(IBus bus) public Task Handle(OperationTask message) { Called = true; - return _bus.CompleteTask(message); + return _messaging.CompleteTask(message); } } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestCommandHandler.cs b/test/Rebus.Operations.Tests/TestCommandHandler.cs index c4a830a..d0a4641 100644 --- a/test/Rebus.Operations.Tests/TestCommandHandler.cs +++ b/test/Rebus.Operations.Tests/TestCommandHandler.cs @@ -6,17 +6,17 @@ namespace Dbosoft.Rebus.Operations.Tests; public class TestCommandHandler : IHandleMessages> { - private readonly IBus _bus; + private readonly ITaskMessaging _messaging; public bool Called { get; private set; } - public TestCommandHandler(IBus bus) + public TestCommandHandler(ITaskMessaging operationMessaging) { - _bus = bus; + _messaging = operationMessaging; } public Task Handle(OperationTask message) { Called = true; - return _bus.CompleteTask(message); + return _messaging.CompleteTask(message); } } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs b/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs index 7400f34..bd1167c 100644 --- a/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs +++ b/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs @@ -1,3 +1,4 @@ +using Dbosoft.Rebus.Operations.Workflow; using Rebus.Bus; using Rebus.Handlers; @@ -5,13 +6,13 @@ namespace Dbosoft.Rebus.Operations.Tests; public class TestCommandHandlerWithError : IHandleMessages> { - private readonly IBus _bus; + private readonly ITaskMessaging _messaging; private readonly bool _throws; - public TestCommandHandlerWithError(IBus bus, bool throws) + public TestCommandHandlerWithError(bool throws, ITaskMessaging messaging) { - _bus = bus; _throws = throws; + _messaging = messaging; } public async Task Handle(OperationTask message) @@ -19,6 +20,6 @@ public async Task Handle(OperationTask message) if (_throws) throw new InvalidOperationException(); - await _bus.FailTask(message, "error"); + await _messaging.FailTask(message, "error"); } } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs b/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs index e14e2d6..6d07e48 100644 --- a/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs +++ b/test/Rebus.Operations.Tests/TestCommandHandlerWithProgress.cs @@ -1,3 +1,4 @@ +using Dbosoft.Rebus.Operations.Workflow; using Rebus.Bus; using Rebus.Handlers; @@ -5,17 +6,17 @@ namespace Dbosoft.Rebus.Operations.Tests; public class TestCommandHandlerWithProgress : IHandleMessages> { - private readonly IBus _bus; - public TestCommandHandlerWithProgress(IBus bus) + private readonly ITaskMessaging _messaging; + public TestCommandHandlerWithProgress(ITaskMessaging messaging) { - _bus = bus; + _messaging = messaging; } public async Task Handle(OperationTask message) { - await _bus.ProgressMessage(message, "progressData"); + await _messaging.ProgressMessage(message, "progressData"); await Task.Delay(500); - await _bus.CompleteTask(message); + await _messaging.CompleteTask(message); } } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/WorkflowTests.cs b/test/Rebus.Operations.Tests/WorkflowTests.cs index 946f272..c7653d6 100644 --- a/test/Rebus.Operations.Tests/WorkflowTests.cs +++ b/test/Rebus.Operations.Tests/WorkflowTests.cs @@ -1,4 +1,6 @@ using System.Text.Json; +using Dbosoft.Rebus.Operations.Commands; +using Dbosoft.Rebus.Operations.Events; using Dbosoft.Rebus.Operations.Workflow; using Microsoft.Extensions.Logging.Abstractions; using Xunit; @@ -14,16 +16,20 @@ public WorkflowTests(ITestOutputHelper output) { } - [Fact] - public async Task SingleStep_Operation_is_processed() + [Theory] + [InlineData(false, "")] + [InlineData(true, "")] + [InlineData(false, "main")] + [InlineData(true, "main")] + public async Task SingleStep_Operation_is_processed(bool sendMode, string eventDestination) { TestCommandHandler? taskHandler = null; - using var setup = await SetupRebus(configureActivator: (activator, _, bus) => + using var setup = await SetupRebus(sendMode, eventDestination, configureActivator: (activator,_,tasks, bus) => { activator.Register(() => new IncomingTaskMessageHandler(bus, NullLogger>.Instance, new DefaultMessageEnricher())); - taskHandler = new TestCommandHandler(bus); + taskHandler = new TestCommandHandler(tasks); activator.Register(() => taskHandler); activator.Register(() => new EmptyOperationStatusEventHandler()); activator.Register(() => new EmptyOperationTaskStatusEventHandler()); @@ -40,12 +46,16 @@ public async Task SingleStep_Operation_is_processed() } - [Fact] - public async Task MultiStep_Operation_is_processed() + [Theory] + [InlineData(false, "")] + [InlineData(true, "")] + [InlineData(false, "main")] + [InlineData(true, "main")] + public async Task MultiStep_Operation_is_processed(bool sendMode, string eventDestination) { StepOneCommandHandler? stepOneHandler; StepTwoCommandHandler? stepTwoHandler; - using var setup = await SetupRebus(configureActivator: (activator, wf, bus) => + using var setup = await SetupRebus(sendMode, eventDestination, configureActivator: (activator, wf, tasks, bus) => { activator.Register(() => new IncomingTaskMessageHandler(bus, NullLogger>.Instance, new DefaultMessageEnricher())); @@ -57,8 +67,8 @@ public async Task MultiStep_Operation_is_processed() activator.Register(() => new EmptyOperationStatusEventHandler()); activator.Register(() => new MultiStepSaga(wf)); - stepOneHandler = new StepOneCommandHandler(bus); - stepTwoHandler = new StepTwoCommandHandler(bus); + stepOneHandler = new StepOneCommandHandler(tasks); + stepTwoHandler = new StepTwoCommandHandler(tasks); activator.Register(() => stepOneHandler); activator.Register(() => stepTwoHandler); }); @@ -82,10 +92,14 @@ public async Task MultiStep_Operation_is_processed() } } - [Fact] - public async Task Progress_is_reported() + [Theory] + [InlineData(false, "")] + [InlineData(true, "")] + [InlineData(false, "main")] + [InlineData(true, "main")] + public async Task Progress_is_reported(bool sendMode, string eventDestination) { - using var setup = await SetupRebus(configureActivator: (activator, wf, bus) => + using var setup = await SetupRebus(sendMode, eventDestination, configureActivator: (activator, wf,tasks, bus) => { activator.Register(() => new IncomingTaskMessageHandler(bus, NullLogger>.Instance, new DefaultMessageEnricher())); @@ -94,7 +108,7 @@ public async Task Progress_is_reported() NullLogger.Instance)); activator.Register(() => new EmptyOperationTaskStatusEventHandler()); - activator.Register(() => new TestCommandHandlerWithProgress(bus)); + activator.Register(() => new TestCommandHandlerWithProgress(tasks)); }); TestOperationManager.Reset(); @@ -114,11 +128,11 @@ public async Task Progress_is_reported() public async Task SingleStep_Operation_failure_is_reported(bool throws) { - using var setup = await SetupRebus(configureActivator: (activator, wf, bus) => + using var setup = await SetupRebus(false, "", configureActivator: (activator, wf,tasks, bus) => { activator.Register(() => new IncomingTaskMessageHandler(bus, NullLogger>.Instance, new DefaultMessageEnricher())); - activator.Register(() => new TestCommandHandlerWithError(bus, throws)); + activator.Register(() => new TestCommandHandlerWithError(throws, tasks)); activator.Register(() => new EmptyOperationStatusEventHandler()); activator.Register(() => new EmptyOperationTaskStatusEventHandler()); activator.Register(() => @@ -134,4 +148,70 @@ public async Task SingleStep_Operation_failure_is_reported(bool throws) Assert.Equal(OperationStatus.Failed ,TestOperationManager.Operations.First().Value.Status); } + + [Fact] + public async Task Headers_are_passed_to_task() + { + var messageEnricher = new TestMessageEnricher(); + using var setup = await SetupRebus(false, "", configureActivator: (activator, wf,tasks, bus) => + { + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, messageEnricher)); + activator.Register(() => new ExposingHeadersCommandHandler(tasks)); + activator.Register(() => new EmptyOperationStatusEventHandler()); + activator.Register(() => new EmptyOperationTaskStatusEventHandler()); + activator.Register(() => + new FailedOperationHandler>( + NullLogger>>.Instance, + wf.Messaging)); + }, messageEnricher); + TestOperationManager.Reset(); + TestTaskManager.Reset(); + + await setup.OperationDispatcher.StartNew(additionalHeaders: + new Dictionary{{"custom_header", "data"}}); + await Task.Delay(1000); + Assert.True(ExposingHeadersCommandHandler.Called); + var headers = ExposingHeadersCommandHandler.Headers; + Assert.Contains(headers, x => x.Key == "custom_header"); + + } + + private class TestMessageEnricher : IMessageEnricher + { + public object? EnrichTaskAcceptedReply(OperationTaskSystemMessage taskMessage) where T : class, new() + { + return null; + } + + private static IDictionary? CopyCustomHeader(IDictionary? headers) + { + if (headers == null || !headers.ContainsKey("custom_header")) + return null; + + return new Dictionary { { "custom_header", headers["custom_header"] } }; + } + + public IDictionary? EnrichHeadersFromIncomingSystemMessage(OperationTaskSystemMessage taskMessage, + IDictionary systemMessageHeaders) + { + return CopyCustomHeader(systemMessageHeaders); + } + + public IDictionary? EnrichHeadersOfOutgoingSystemMessage(object taskMessage, IDictionary? previousHeaders) + { + return CopyCustomHeader(previousHeaders); + } + + public IDictionary? EnrichHeadersOfStatusEvent(OperationStatusEvent operationStatusEvent, IDictionary? previousHeaders) + { + return CopyCustomHeader(previousHeaders); + } + + public IDictionary? EnrichHeadersOfTaskStatusEvent(OperationTaskStatusEvent operationStatusEvent, + IDictionary? previousHeaders) + { + return CopyCustomHeader(previousHeaders); + } + } } \ No newline at end of file diff --git a/test/Rebus.SimpleInjector.Tests/SimpleInjectorTestBase.cs b/test/Rebus.SimpleInjector.Tests/SimpleInjectorTestBase.cs index 38e6451..31caef1 100644 --- a/test/Rebus.SimpleInjector.Tests/SimpleInjectorTestBase.cs +++ b/test/Rebus.SimpleInjector.Tests/SimpleInjectorTestBase.cs @@ -1,6 +1,8 @@ using Dbosoft.Rebus.Operations; using Dbosoft.Rebus.Operations.Tests; +using Dbosoft.Rebus.Operations.Workflow; using Rebus.Persistence.InMem; +using Rebus.Routing.TypeBased; using Rebus.Transport.InMem; using SimpleInjector; using Xunit.Abstractions; @@ -16,20 +18,31 @@ protected SimpleInjectorTestBase(ITestOutputHelper output) _output = output; } - public void SetupRebus(Container container) + public void SetupRebus(Container container, bool sendMode, string eventDestination) { var rebusNetwork = new InMemNetwork(); container.Register(Lifestyle.Scoped); + container.RegisterInstance(new WorkflowOptions + { + DispatchMode = sendMode ? WorkflowEventDispatchMode.Send : WorkflowEventDispatchMode.Publish, + EventDestination = eventDestination, + OperationsDestination = eventDestination + }); container.AddRebusOperationsHandlers(); container.ConfigureRebus(configurer => { return configurer .Options(o => o.EnableSimpleInjectorUnitOfWork()) .Transport(cfg => cfg.UseInMemoryTransport(rebusNetwork, "main")) - .AddOperations("main") + .Routing(r => + { + if(string.IsNullOrWhiteSpace(eventDestination)) + r.TypeBased().AddOperations("main"); + + }) .Sagas(x => x.StoreInMemory()) - .Subscriptions(x=>x.StoreInMemory()) + .Subscriptions(x=>x.StoreInMemory(new InMemorySubscriberStore())) .Logging(x=>x.Use(new RebusTestLogging(_output))) .Start(); diff --git a/test/Rebus.SimpleInjector.Tests/SimpleInjectorWorkflowTests.cs b/test/Rebus.SimpleInjector.Tests/SimpleInjectorWorkflowTests.cs index ab54493..f0f861c 100644 --- a/test/Rebus.SimpleInjector.Tests/SimpleInjectorWorkflowTests.cs +++ b/test/Rebus.SimpleInjector.Tests/SimpleInjectorWorkflowTests.cs @@ -1,5 +1,6 @@ using Dbosoft.Rebus.Operations; using Dbosoft.Rebus.Operations.Tests; +using Dbosoft.Rebus.Operations.Workflow; using Microsoft.Extensions.DependencyInjection; using Rebus.Bus; using Rebus.Handlers; @@ -19,8 +20,12 @@ public SimpleInjectorWorkflowTests(ITestOutputHelper output) } - [Fact] - public async Task MultiStep_Operation_is_processed() + [Theory] + [InlineData(false, "")] + [InlineData(true, "")] + [InlineData(false, "main")] + [InlineData(true, "main")] + public async Task MultiStep_Operation_is_processed(bool sendMode, string eventDestination) { var sc = new ServiceCollection(); sc.AddLogging(); @@ -29,11 +34,11 @@ public async Task MultiStep_Operation_is_processed() container.Options.DefaultScopedLifestyle = new AsyncScopedLifestyle(); sc.AddSimpleInjector(container, cfg => cfg.AddLogging()); - SetupRebus(container); + SetupRebus(container, sendMode, eventDestination); container.Collection.Append(typeof(IHandleMessages<>), typeof(MultiStepSaga)); container.Collection.Append(typeof(IHandleMessages<>), typeof(StepOneCommandHandler), Lifestyle.Scoped); container.Collection.Append(typeof(IHandleMessages<>), typeof(StepTwoCommandHandler), Lifestyle.Scoped); - + container.Register(Lifestyle.Scoped); container.Register(Lifestyle.Scoped); container.Register(Lifestyle.Scoped); @@ -51,7 +56,8 @@ public async Task MultiStep_Operation_is_processed() await using var scope = AsyncScopedLifestyle.BeginScope(container); var bus = scope.GetInstance(); - await OperationsSetup.SubscribeEvents(bus); + await OperationsSetup.SubscribeEvents(bus, + container.GetInstance()); var dispatcher = scope.GetInstance();