Skip to content

Commit

Permalink
IOperationDispatcher: added optional data
Browse files Browse the repository at this point in the history
IOperationTaskDispatcher: added headers
  • Loading branch information
fw2568 committed Jul 26, 2023
1 parent 8b5ea04 commit f2917bf
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface IOperationDispatcher
{
ValueTask<IOperation?> StartNew<T>(object? additionalData = default, IDictionary<string,string>? additionalHeaders = null) where T : class, new();
ValueTask<IOperation?> StartNew(Type commandType, object? additionalData = default, IDictionary<string,string>? additionalHeaders = null);
ValueTask<IOperation?> StartNew(object operationCommand, IDictionary<string,string>? additionalHeaders = null);

ValueTask<IOperation?> StartNew(object operationCommand, object? additionalData = default, IDictionary<string,string>? additionalHeaders = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public DefaultOperationDispatcher(
_operationManager = operationManager;
}

protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData)
protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData,IDictionary<string,string>? additionalHeaders)
{
return (await _operationManager.GetOrCreateAsync(Guid.NewGuid(), command), command);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Workflow;
using Microsoft.Extensions.Logging;
Expand All @@ -22,7 +23,8 @@ public DefaultOperationTaskDispatcher(
_operationTaskManager = operationTaskManager;
}

protected override async ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData)
protected override async ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId,
object command, object? additionalData, IDictionary<string,string>? additionalHeaders)
{
var op = await _operationManager.GetByIdAsync(operationId);
if (op == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Text.Json;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Commands;
using Dbosoft.Rebus.Operations.Workflow;
using Microsoft.Extensions.Logging;
using Rebus.Bus;

Expand All @@ -27,9 +26,9 @@ protected OperationDispatcherBase(IBus bus, WorkflowOptions options, ILogger<Ope
_logger = logger;
}

public ValueTask<IOperation?> StartNew(object command, IDictionary<string,string>? additionalHeaders = null)
public ValueTask<IOperation?> StartNew(object command, object? additionalData = default, IDictionary<string,string>? additionalHeaders = null)
{
return StartOperation(command, null, additionalHeaders);
return StartOperation(command, additionalData, additionalHeaders);
}

public ValueTask<IOperation?> StartNew<T>(object? additionalData = default, IDictionary<string,string>? additionalHeaders = null)
Expand All @@ -44,14 +43,14 @@ protected OperationDispatcherBase(IBus bus, WorkflowOptions options, ILogger<Ope
return StartOperation(commandType,additionalData, additionalHeaders);
}

protected abstract ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData);
protected abstract ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData, IDictionary<string,string>? additionalHeaders);

protected async ValueTask<IOperation?> StartOperation(object command, object? additionalData, IDictionary<string,string>? additionalHeaders = null)
{
if (command == null)
throw new ArgumentNullException(nameof(command));

var(operation, taskCommand) = await CreateOperation(command, additionalData);
var(operation, taskCommand) = await CreateOperation(command, additionalData, additionalHeaders);

var commandJson = JsonSerializer.Serialize(taskCommand);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ protected OperationTaskDispatcherBase(IBus bus, WorkflowOptions options, ILogger
return StartTask(operationId, initiatingTaskId, commandType, additionalData, additionalHeaders);
}

protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData);
protected abstract ValueTask<(IOperationTask, object)> CreateTask(Guid operationId, Guid initiatingTaskId, object command, object? additionalData, IDictionary<string,string>? additionalHeaders);

protected async ValueTask<IOperationTask?> StartTask(Guid operationId, Guid initiatingTaskId,
object command, object? additionalData, IDictionary<string,string>? additionalHeaders = null)
{
if (command == null)
throw new ArgumentNullException(nameof(command));

var (task, taskCommand) = await CreateTask(operationId, initiatingTaskId, command, additionalData);
var (task, taskCommand) = await CreateTask(operationId, initiatingTaskId, command, additionalData, additionalHeaders);
var commandJson = JsonSerializer.Serialize(taskCommand);

var taskMessage = new CreateNewOperationTaskCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public ExposingHeadersCommandHandler(ITaskMessaging messaging)
}

public static bool Called { get; set; }
public static IDictionary<string, string> Headers;
public static IDictionary<string, string>? Headers;

public Task Handle(OperationTask<TestCommand> message)
{
Expand Down

0 comments on commit f2917bf

Please sign in to comment.