diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs index 0231680..6abc5e0 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/IncomingTaskMessageHandler.cs @@ -5,7 +5,9 @@ using Dbosoft.Rebus.Operations.Events; using Microsoft.Extensions.Logging; using Rebus.Bus; +using Rebus.Extensions; using Rebus.Handlers; +using Rebus.Messages; using Rebus.Pipeline; using Rebus.Transport; @@ -24,19 +26,6 @@ public IncomingTaskMessageHandler(IBus bus, ILogger taskMessage, IDictionary? headers) - { - using var scope = new RebusTransactionScope(); - await bus.SendLocal(new OperationTask(taskMessage.Message, - taskMessage.OperationId, taskMessage.InitiatingTaskId, - taskMessage.TaskId, taskMessage.Created) - , headers - ).ConfigureAwait(false); - - await scope.CompleteAsync().ConfigureAwait(false); - } public async Task Handle(OperationTaskSystemMessage taskMessage) { @@ -50,13 +39,27 @@ public async Task Handle(OperationTaskSystemMessage taskMessage) InitiatingTaskId = taskMessage.InitiatingTaskId, TaskId = taskMessage.TaskId, AdditionalData = _messageEnricher.EnrichTaskAcceptedReply(taskMessage), - Created = taskMessage.Created + Created = DateTimeOffset.UtcNow }; + var replyAddress = MessageContext.Current.Headers.GetValueOrNull(Headers.ReturnAddress); - await _bus.Reply(reply).ConfigureAwait(false); - _logger.LogTrace($"Accepted incoming operation message. Operation id: '{taskMessage.OperationId}'"); + if (replyAddress == null) + { + _logger.LogWarning($"Operation Workflow {taskMessage.OperationId}/{taskMessage.TaskId}: missing return address"); + } + else + { + using var replyScope = new RebusTransactionScope(); + await _bus.Advanced.Routing.Send(replyAddress, reply).ConfigureAwait(false); + _logger.LogTrace($"Accepted incoming operation message. Operation id: '{taskMessage.OperationId}'"); + await replyScope.CompleteAsync().ConfigureAwait(false); + } - Resubmit(_bus, taskMessage, headers); + await _bus.SendLocal(new OperationTask(taskMessage.Message, + taskMessage.OperationId, taskMessage.InitiatingTaskId, + taskMessage.TaskId, taskMessage.Created) + , headers + ).ConfigureAwait(false); } } } \ No newline at end of file diff --git a/test/Rebus.OperationsDB.Tests/DatabaseTests.cs b/test/Rebus.OperationsDB.Tests/DatabaseTests.cs index 462f792..45bb4b8 100644 --- a/test/Rebus.OperationsDB.Tests/DatabaseTests.cs +++ b/test/Rebus.OperationsDB.Tests/DatabaseTests.cs @@ -97,7 +97,7 @@ private async Task SetupAndRunWorkflow( .ConfigureLogging(l => { l.AddXUnit(_outputHelper); - l.SetMinimumLevel(LogLevel.Debug); + l.SetMinimumLevel(LogLevel.Trace); }) .ConfigureServices(s=>s.AddSimpleInjector(container, cfg => cfg.AddLogging()))