Skip to content

Commit

Permalink
Operations: added send and topic mode (#4)
Browse files Browse the repository at this point in the history
* Operations: added send and topic mode
It is now possible to configure workflow engine to use send mode instead of publish for events.
Topic based sending support was also added.

* added message header enrichment
  • Loading branch information
fw2568 authored Jul 26, 2023
1 parent 4170c66 commit 8b5ea04
Show file tree
Hide file tree
Showing 30 changed files with 538 additions and 179 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#nullable enable

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Dbosoft.Rebus.Operations
{
public interface IOperationDispatcher
{
ValueTask<IOperation?> StartNew<T>(object? additionalData = default) where T : class, new();
ValueTask<IOperation?> StartNew(Type commandType, object? additionalData = default);
ValueTask<IOperation?> StartNew(object operationCommand);
ValueTask<IOperation?> StartNew<T>(object? additionalData = default, IDictionary<string,string>? additionalHeaders = null) where T : class, new();
ValueTask<IOperation?> StartNew(Type commandType, object? additionalData = default, IDictionary<string,string>? additionalHeaders = null);
ValueTask<IOperation?> StartNew(object operationCommand, IDictionary<string,string>? additionalHeaders = null);

}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
#nullable enable

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Dbosoft.Rebus.Operations
{
public interface IOperationTaskDispatcher
{
ValueTask<IOperationTask?> StartNew<T>(Guid operationId, Guid initiatingTaskId,
object? additionalData = default) where T : class, new();
object? additionalData = default, IDictionary<string,string>? additionalHeaders = null) where T : class, new();

ValueTask<IOperationTask?> StartNew(Guid operationId, Guid initiatingTaskId, Type operationCommandType,
object? additionalData = default);
object? additionalData = default, IDictionary<string,string>? additionalHeaders = null);

ValueTask<IOperationTask?> StartNew(Guid operationId, Guid initiatingTaskId, object command
, object? additionalData = default);
, object? additionalData = default, IDictionary<string,string>? additionalHeaders = null);

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Dbosoft.Rebus.Operations;

public interface ITaskMessaging
{
Task FailTask(IOperationTaskMessage message, string errorMessage,
IDictionary<string, string>? additionalHeaders = null);

Task FailTask(IOperationTaskMessage message, ErrorData error,
IDictionary<string, string>? additionalHeaders = null);

Task CompleteTask(IOperationTaskMessage message, IDictionary<string, string>? additionalHeaders = null);

Task CompleteTask(IOperationTaskMessage message, object responseMessage,
IDictionary<string, string>? additionalHeaders = null);

Task ProgressMessage(IOperationTaskMessage message, object data,
IDictionary<string, string>? additionalHeaders = null);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
using Dbosoft.Rebus.Operations.Commands;
using System.Collections.Generic;
using Dbosoft.Rebus.Operations.Commands;
using Dbosoft.Rebus.Operations.Events;

namespace Dbosoft.Rebus.Operations.Workflow;

public interface IMessageEnricher
{
object? EnrichTaskAcceptedReply<T>(OperationTaskSystemMessage<T> taskMessage) where T : class, new();
}

IDictionary<string, string>? EnrichHeadersFromIncomingSystemMessage<T>(OperationTaskSystemMessage<T> taskMessage,
IDictionary<string, string> systemMessageHeaders);

IDictionary<string, string>? EnrichHeadersOfOutgoingSystemMessage(object taskMessage,
IDictionary<string, string>? previousHeaders);

IDictionary<string, string>? EnrichHeadersOfStatusEvent(OperationStatusEvent operationStatusEvent,
IDictionary<string, string>? previousHeaders);

IDictionary<string, string>? EnrichHeadersOfTaskStatusEvent(OperationTaskStatusEvent operationStatusEvent,
IDictionary<string, string>? previousHeaders);
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Events;

namespace Dbosoft.Rebus.Operations.Workflow;

public interface IOperationMessaging
{
Task DispatchTaskMessage(object command, IOperationTask task);
Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message);
Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message);
Task DispatchOperationStatusEventAsync(OperationStatusEvent operationStatusEvent);
Task DispatchTaskMessage(object command, IOperationTask task, IDictionary<string,string>? additionalHeaders = null);
Task DispatchTaskStatusEventAsync(string commandType, OperationTaskStatusEvent message, IDictionary<string,string>? additionalHeaders = null);
Task DispatchTaskStatusEventAsync(OperationTaskStatusEvent message, IDictionary<string,string>? additionalHeaders = null);
Task DispatchOperationStatusEventAsync(OperationStatusEvent operationStatusEvent, IDictionary<string,string>? additionalHeaders = null);

IOperationDispatcher OperationDispatcher { get; }
IOperationTaskDispatcher TaskDispatcher { get; }


}
Original file line number Diff line number Diff line change
@@ -1,57 +1,32 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Events;
using Dbosoft.Rebus.Operations.Workflow;
using Rebus.Bus;
using Rebus.Transport;

namespace Dbosoft.Rebus.Operations;

public static class BusOperationTaskExtensions
{
public static Task FailTask(this IBus bus, IOperationTaskMessage message, string errorMessage)
public static Task SendWorkflowEvent(this IBus bus, WorkflowOptions options, object eventMessage,
IDictionary<string, string>? additionalHeaders = null)
{
return FailTask(bus, message, new ErrorData { ErrorMessage = errorMessage });
}

public static Task FailTask(this IBus bus, IOperationTaskMessage message, ErrorData error)
{
return bus.Publish(
OperationTaskStatusEvent.Failed(
message.OperationId, message.InitiatingTaskId,
message.TaskId, error));
}


public static Task CompleteTask(this IBus bus, IOperationTaskMessage message)
{
return bus.Publish(
OperationTaskStatusEvent.Completed(
message.OperationId, message.InitiatingTaskId, message.TaskId));
}

public static Task CompleteTask(this IBus bus, IOperationTaskMessage message, object responseMessage)
{
return bus.Publish(
OperationTaskStatusEvent.Completed(
message.OperationId, message.InitiatingTaskId, message.TaskId, responseMessage));
}


public static async Task ProgressMessage(this IBus bus, IOperationTaskMessage message, object data)
{
using var scope = new RebusTransactionScope();


await bus.Publish(new OperationTaskProgressEvent
if (string.IsNullOrWhiteSpace(options.EventDestination))
{
Id = Guid.NewGuid(),
OperationId = message.OperationId,
TaskId = message.TaskId,
Data = data,
Timestamp = DateTimeOffset.UtcNow
}).ConfigureAwait(false);

// commit it like this
await scope.CompleteAsync().ConfigureAwait(false);
return options.DispatchMode switch
{
WorkflowEventDispatchMode.Publish => bus.Publish(eventMessage, additionalHeaders),
WorkflowEventDispatchMode.Send => bus.Send(eventMessage, additionalHeaders),
_ => throw new ArgumentOutOfRangeException(nameof(options))
};
}

return options.DispatchMode switch
{
WorkflowEventDispatchMode.Publish => bus.Advanced.Topics.Publish(options.EventDestination, eventMessage, additionalHeaders),
WorkflowEventDispatchMode.Send => bus.Advanced.Routing.Send(options.EventDestination, eventMessage, additionalHeaders),
_ => throw new ArgumentOutOfRangeException(nameof(options))
};
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Workflow;
using Microsoft.Extensions.Logging;
Expand All @@ -14,8 +15,9 @@ public class DefaultOperationDispatcher : OperationDispatcherBase

public DefaultOperationDispatcher(
IBus bus,
WorkflowOptions workflowOptions,
ILogger<DefaultOperationDispatcher> logger,
IOperationManager operationManager) : base(bus, logger)
IOperationManager operationManager) : base(bus, workflowOptions, logger)
{
_operationManager = operationManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ public class DefaultOperationTaskDispatcher : OperationTaskDispatcherBase

public DefaultOperationTaskDispatcher(
IBus bus,
WorkflowOptions workflowOptions,
ILogger<DefaultOperationTaskDispatcher> logger,
IOperationManager operationManager, IOperationTaskManager operationTaskManager) : base(bus, logger)
IOperationManager operationManager, IOperationTaskManager operationTaskManager) : base(bus, workflowOptions, logger)
{
_operationManager = operationManager;
_operationTaskManager = operationTaskManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
#nullable enable

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Commands;
using Dbosoft.Rebus.Operations.Workflow;
using Microsoft.Extensions.Logging;
using Rebus.Bus;

Expand All @@ -15,34 +17,36 @@ namespace Dbosoft.Rebus.Operations
public abstract class OperationDispatcherBase : IOperationDispatcher
{
private readonly IBus _bus;
private readonly WorkflowOptions _options;
private readonly ILogger<OperationDispatcherBase> _logger;

protected OperationDispatcherBase(IBus bus, ILogger<OperationDispatcherBase> logger)
protected OperationDispatcherBase(IBus bus, WorkflowOptions options, ILogger<OperationDispatcherBase> logger)
{
_bus = bus;
_options = options;
_logger = logger;
}

public ValueTask<IOperation?> StartNew(object command)
public ValueTask<IOperation?> StartNew(object command, IDictionary<string,string>? additionalHeaders = null)
{
return StartOperation(command, null);
return StartOperation(command, null, additionalHeaders);
}

public ValueTask<IOperation?> StartNew<T>(object? additionalData = default)
public ValueTask<IOperation?> StartNew<T>(object? additionalData = default, IDictionary<string,string>? additionalHeaders = null)
where T : class, new()
{
return StartOperation( Activator.CreateInstance<T>(),additionalData);
return StartOperation( Activator.CreateInstance<T>(),additionalData, additionalHeaders);
}


public ValueTask<IOperation?> StartNew(Type commandType, object? additionalData = default)
public ValueTask<IOperation?> StartNew(Type commandType, object? additionalData = default, IDictionary<string,string>? additionalHeaders = null)
{
return StartOperation(commandType,additionalData);
return StartOperation(commandType,additionalData, additionalHeaders);
}

protected abstract ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData);

protected async ValueTask<IOperation?> StartOperation(object command, object? additionalData)
protected async ValueTask<IOperation?> StartOperation(object command, object? additionalData, IDictionary<string,string>? additionalHeaders = null)
{
if (command == null)
throw new ArgumentNullException(nameof(command));
Expand All @@ -59,7 +63,9 @@ protected OperationDispatcherBase(IBus bus, ILogger<OperationDispatcherBase> log
Guid.NewGuid());

var message = new CreateOperationCommand { TaskMessage = taskMessage };
await _bus.Send(message);
await (string.IsNullOrWhiteSpace(_options.OperationsDestination)
? _bus.Send(message, additionalHeaders)
: _bus.Advanced.Routing.Send(_options.OperationsDestination, message, additionalHeaders));

_logger.LogDebug("Send new command of type {commandType}. Id: {operationId}",
taskCommand.GetType().Name, operation.Id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,50 @@
#nullable enable

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Commands;
using Dbosoft.Rebus.Operations.Workflow;
using Microsoft.Extensions.Logging;
using Rebus.Bus;

namespace Dbosoft.Rebus.Operations;
public abstract class OperationTaskDispatcherBase : IOperationTaskDispatcher
{
private readonly IBus _bus;
private readonly WorkflowOptions _options;
private readonly ILogger<OperationTaskDispatcherBase> _logger;

protected OperationTaskDispatcherBase(IBus bus, ILogger<OperationTaskDispatcherBase> logger)
protected OperationTaskDispatcherBase(IBus bus, WorkflowOptions options, ILogger<OperationTaskDispatcherBase> logger)
{
_bus = bus;
_options = options;
_logger = logger;
}

public ValueTask<IOperationTask?> StartNew(Guid operationId, Guid initiatingTaskId, object command, object? additionalData = null)
public ValueTask<IOperationTask?> StartNew(Guid operationId, Guid initiatingTaskId, object command,
object? additionalData = null, IDictionary<string,string>? additionalHeaders = null)
{
return StartTask(operationId, initiatingTaskId, command, additionalData);
return StartTask(operationId, initiatingTaskId, command, additionalData, additionalHeaders);
}

public ValueTask<IOperationTask?> StartNew<T>(Guid operationId, Guid initiatingTaskId, object? additionalData = default)
public ValueTask<IOperationTask?> StartNew<T>(Guid operationId, Guid initiatingTaskId, object? additionalData = default, IDictionary<string,string>? additionalHeaders = null)
where T : class, new()
{
return StartTask(operationId, initiatingTaskId, Activator.CreateInstance<T>(),
additionalData);
additionalData, additionalHeaders);
}

public ValueTask<IOperationTask?> StartNew(Guid operationId, Guid initiatingTaskId, Type commandType, object? additionalData = default)
public ValueTask<IOperationTask?> StartNew(Guid operationId, Guid initiatingTaskId, Type commandType, object? additionalData = default, IDictionary<string,string>? additionalHeaders = null)
{
return StartTask(operationId, initiatingTaskId, commandType, additionalData);
return StartTask(operationId, initiatingTaskId, commandType, additionalData, additionalHeaders);
}

protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData);

protected async ValueTask<IOperationTask?> StartTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData)
protected async ValueTask<IOperationTask?> StartTask(Guid operationId, Guid initiatingTaskId,
object command, object? additionalData, IDictionary<string,string>? additionalHeaders = null)
{
if (command == null)
throw new ArgumentNullException(nameof(command));
Expand All @@ -55,7 +61,9 @@ protected OperationTaskDispatcherBase(IBus bus, ILogger<OperationTaskDispatcherB
initiatingTaskId,
task.Id);

await _bus.Send(taskMessage);
await (string.IsNullOrWhiteSpace(_options.OperationsDestination)
? _bus.Send(taskMessage, additionalHeaders)
: _bus.Advanced.Routing.Send(_options.OperationsDestination, taskMessage, additionalHeaders)) ;

_logger.LogDebug("Send new command of type {commandType}. Id: {operationId}, ParentTaskId: {parentTaskId}",
taskCommand.GetType().Name, operationId, initiatingTaskId);
Expand Down
12 changes: 11 additions & 1 deletion src/Rebus.Operations/Rebus.Operations.Core/OperationsSetup.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Events;
using Dbosoft.Rebus.Operations.Workflow;
using Rebus.Bus;

namespace Dbosoft.Rebus.Operations;

public static class OperationsSetup
{
public static async Task<IBus> SubscribeEvents(IBus bus)
public static async Task<IBus> SubscribeEvents(IBus bus, WorkflowOptions options)
{
if (options.DispatchMode == WorkflowEventDispatchMode.Send)
return bus;

if (!string.IsNullOrWhiteSpace(options.EventDestination))
{
await bus.Advanced.Topics.Subscribe(options.EventDestination);
return bus;
}

await bus.Subscribe<OperationStatusEvent>();
await bus.Subscribe<OperationTaskAcceptedEvent>();
await bus.Subscribe<OperationTaskProgressEvent>();
Expand Down
Loading

0 comments on commit 8b5ea04

Please sign in to comment.