diff --git a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs index b476cbd..640242f 100644 --- a/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs +++ b/src/Rebus.Operations/Rebus.Operations.Core/Workflow/ProcessOperationSaga.cs @@ -212,6 +212,24 @@ await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusE MarkAsComplete(); } + else + { + // capture failed operations and send status events to initiating task + if (message.OperationFailed) + { + var initiatingTask = await _workflow.Tasks + .GetByIdAsync(message.TaskId) + .ConfigureAwait(false); + + if (initiatingTask != null && Data.Tasks.TryGetValue(initiatingTask.Id, out var taskCommandTypeName)) + { + message.InitiatingTaskId = initiatingTask.InitiatingTaskId; + message.TaskId = initiatingTask.Id; + await _workflow.Messaging.DispatchTaskStatusEventAsync(taskCommandTypeName, message); + } + } + } + } } diff --git a/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs b/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs index c6b28ba..96de536 100644 --- a/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs +++ b/test/Rebus.Operations.Tests/StepTwoCommandHandler.cs @@ -7,6 +7,7 @@ namespace Dbosoft.Rebus.Operations.Tests; public class StepTwoCommandHandler : IHandleMessages> { private readonly ITaskMessaging _messaging; + public bool Throws { get; set; } public StepTwoCommandHandler(ITaskMessaging messaging) { @@ -18,6 +19,10 @@ public StepTwoCommandHandler(ITaskMessaging messaging) public Task Handle(OperationTask message) { Called = true; + + if (Throws) + throw new Exception("Failed"); + return _messaging.CompleteTask(message); } } \ No newline at end of file diff --git a/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs b/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs index bd1167c..d51db70 100644 --- a/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs +++ b/test/Rebus.Operations.Tests/TestCommandHandlerWithError.cs @@ -7,17 +7,16 @@ namespace Dbosoft.Rebus.Operations.Tests; public class TestCommandHandlerWithError : IHandleMessages> { private readonly ITaskMessaging _messaging; - private readonly bool _throws; + public bool Throws { get; set; } - public TestCommandHandlerWithError(bool throws, ITaskMessaging messaging) + public TestCommandHandlerWithError(ITaskMessaging messaging) { - _throws = throws; _messaging = messaging; } public async Task Handle(OperationTask message) { - if (_throws) + if (Throws) throw new InvalidOperationException(); await _messaging.FailTask(message, "error"); diff --git a/test/Rebus.Operations.Tests/WorkflowTests.cs b/test/Rebus.Operations.Tests/WorkflowTests.cs index 5df182c..6c50de8 100644 --- a/test/Rebus.Operations.Tests/WorkflowTests.cs +++ b/test/Rebus.Operations.Tests/WorkflowTests.cs @@ -3,6 +3,7 @@ using Dbosoft.Rebus.Operations.Events; using Dbosoft.Rebus.Operations.Workflow; using Microsoft.Extensions.Logging.Abstractions; +using Rebus.Retry.Simple; using Xunit; using Xunit.Abstractions; @@ -92,6 +93,62 @@ public async Task MultiStep_Operation_is_processed(bool sendMode, string eventDe } } + [Theory] + [InlineData(false, "")] + [InlineData(true, "")] + [InlineData(false, "main")] + [InlineData(true, "main")] + public async Task MultiStep_Operation_Exception_is_reported(bool sendMode, string eventDestination) + { + StepOneCommandHandler? stepOneHandler; + StepTwoCommandHandler? stepTwoHandler; + using var setup = await SetupRebus(sendMode, eventDestination, configureActivator: (activator, wf, tasks, bus) => + { + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, new DefaultMessageEnricher())); + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, new DefaultMessageEnricher())); + activator.Register(() => new IncomingTaskMessageHandler(bus, + NullLogger>.Instance, new DefaultMessageEnricher())); + + activator.Register(() => new EmptyOperationStatusEventHandler()); + activator.Register(() => new MultiStepSaga(wf)); + activator.Register(() => new FailedOperationHandler>(wf.WorkflowOptions, + NullLogger< FailedOperationHandler>>.Instance, + wf.Messaging)); + + stepOneHandler = new StepOneCommandHandler(tasks); + stepTwoHandler = new StepTwoCommandHandler(tasks){Throws = true}; + activator.Register(() => stepOneHandler); + activator.Register(() => stepTwoHandler); + }); + + TestOperationManager.Reset(); + TestTaskManager.Reset(); + StepOneCommandHandler.Called = false; + StepTwoCommandHandler.Called = false; + + await setup.OperationDispatcher.StartNew(); + + var timeout = new CancellationTokenSource(60000); + while ( + !timeout.Token.IsCancellationRequested && + (TestOperationManager.Operations.First().Value.Status == OperationStatus.Running || + TestOperationManager.Operations.First().Value.Status == OperationStatus.Queued)) + // ReSharper disable once MethodSupportsCancellation + { + await Task.Delay(1000); + } + + Assert.True(StepOneCommandHandler.Called); + Assert.True(StepTwoCommandHandler.Called); + Assert.Single(TestOperationManager.Operations); + Assert.Equal(3, TestTaskManager.Tasks.Count); + Assert.Equal(OperationStatus.Failed, TestOperationManager.Operations.First().Value.Status); + + + } + [Theory] [InlineData(false, "")] [InlineData(true, "")] @@ -132,7 +189,7 @@ public async Task SingleStep_Operation_failure_is_reported(bool throws) { activator.Register(() => new IncomingTaskMessageHandler(bus, NullLogger>.Instance, new DefaultMessageEnricher())); - activator.Register(() => new TestCommandHandlerWithError(throws, tasks)); + activator.Register(() => new TestCommandHandlerWithError(tasks){Throws = true}); activator.Register(() => new EmptyOperationStatusEventHandler()); activator.Register(() => new EmptyOperationTaskStatusEventHandler()); activator.Register(() => @@ -145,7 +202,15 @@ public async Task SingleStep_Operation_failure_is_reported(bool throws) TestTaskManager.Reset(); await setup.OperationDispatcher.StartNew(); - await Task.Delay(throws ? 2000: 1000); + var timeout = new CancellationTokenSource(10000); + while ( + !timeout.Token.IsCancellationRequested && + (TestOperationManager.Operations.First().Value.Status == OperationStatus.Running || + TestOperationManager.Operations.First().Value.Status == OperationStatus.Queued)) + // ReSharper disable once MethodSupportsCancellation + { + await Task.Delay(1000); + } Assert.Equal(OperationStatus.Failed ,TestOperationManager.Operations.First().Value.Status); }