Skip to content

Commit

Permalink
Improvements for eryph (#5)
Browse files Browse the repository at this point in the history
* changes for eryph: passing more data
* added serialization config
  • Loading branch information
fw2568 authored Jul 28, 2023
1 parent 03bde2c commit 9a80fe6
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Dbosoft.Rebus.Operations.Workflow;
Expand All @@ -7,8 +8,11 @@ namespace Dbosoft.Rebus.Operations.Workflow;
public interface IOperationManager
{
ValueTask<IOperation?> GetByIdAsync(Guid operationId);
ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object command);
ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object command,
object? additionalData, IDictionary<string,string>? additionalHeaders);

ValueTask<bool> TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData);
ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task, object? data);
ValueTask<bool> TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData,
IDictionary<string,string>? messageHeaders);
ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task,
object? data, IDictionary<string,string>? messageHeaders);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ public DefaultOperationDispatcher(
_operationManager = operationManager;
}

protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData,IDictionary<string,string>? additionalHeaders)
protected override async ValueTask<(IOperation, object)> CreateOperation(object command, object? additionalData,
IDictionary<string,string>? additionalHeaders)
{
return (await _operationManager.GetOrCreateAsync(Guid.NewGuid(), command), command);
return (await _operationManager.GetOrCreateAsync(Guid.NewGuid(), command,
additionalData,additionalHeaders), command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected OperationDispatcherBase(IBus bus, WorkflowOptions options, ILogger<Ope

var(operation, taskCommand) = await CreateOperation(command, additionalData, additionalHeaders);

var commandJson = JsonSerializer.Serialize(taskCommand);
var commandJson = JsonSerializer.Serialize(taskCommand, _options.JsonSerializerOptions);

var taskMessage = new CreateNewOperationTaskCommand(
taskCommand.GetType().AssemblyQualifiedName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Text.Json;
using System.Threading.Tasks;
using Dbosoft.Rebus.Operations.Commands;
using Dbosoft.Rebus.Operations.Workflow;
using Microsoft.Extensions.Logging;
using Rebus.Bus;

Expand Down Expand Up @@ -52,7 +51,7 @@ protected OperationTaskDispatcherBase(IBus bus, WorkflowOptions options, ILogger
throw new ArgumentNullException(nameof(command));

var (task, taskCommand) = await CreateTask(operationId, initiatingTaskId, command, additionalData, additionalHeaders);
var commandJson = JsonSerializer.Serialize(taskCommand);
var commandJson = JsonSerializer.Serialize(taskCommand, _options.JsonSerializerOptions);

var taskMessage = new CreateNewOperationTaskCommand(
taskCommand.GetType().AssemblyQualifiedName,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Dbosoft.Rebus.Operations.Workflow;
Expand All @@ -9,15 +10,16 @@ public abstract class OperationManagerBase : IOperationManager

public abstract ValueTask<IOperation?> GetByIdAsync(Guid operationId);

public abstract ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object command);
public abstract ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object command,
object? additionalData,IDictionary<string,string>? additionalHeaders);


public abstract ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation,
IOperationTask task,
object? data);
object? data, IDictionary<string,string>? messageHeaders);

public abstract ValueTask<bool> TryChangeStatusAsync(IOperation operation, OperationStatus newStatus,
object? additionalData);
object? additionalData, IDictionary<string,string>? messageHeaders);



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Rebus.Handlers;
using Rebus.Pipeline;

namespace Dbosoft.Rebus.Operations.Workflow
{
Expand Down Expand Up @@ -39,7 +40,7 @@ await _workflow.Operations.AddProgressAsync(
message.Timestamp,
operation,
task,
message.Data);
message.Data, MessageContext.Current.Headers);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Rebus.Handlers;
using Rebus.Pipeline;
using Rebus.Sagas;

namespace Dbosoft.Rebus.Operations.Workflow
Expand Down Expand Up @@ -117,7 +118,8 @@ public async Task Handle(OperationTaskAcceptedEvent message)
}

var opOldStatus = op.Status;
if (await _workflow.Operations.TryChangeStatusAsync(op, OperationStatus.Running, null))
if (await _workflow.Operations.TryChangeStatusAsync(op, OperationStatus.Running, null,
MessageContext.Current.Headers))
{
_log.LogDebug("Operation Workflow {operationId}: Status changed: {oldStatus} -> {newStatus}",
message.OperationId, opOldStatus, op.Status);
Expand Down Expand Up @@ -195,7 +197,7 @@ public async Task Handle(OperationTaskStatusEvent message)


if (await _workflow.Operations.TryChangeStatusAsync(op,
newStatus, message.GetMessage()))
newStatus, message.GetMessage(), MessageContext.Current.Headers))
{
await _workflow.Messaging.DispatchOperationStatusEventAsync(new OperationStatusEvent
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
using System.Text.Json;

namespace Dbosoft.Rebus.Operations;

public class WorkflowOptions
{
public WorkflowOptions()
{
JsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web);
}

public WorkflowEventDispatchMode DispatchMode { get; set; } = WorkflowEventDispatchMode.Publish;
public string? EventDestination { get; set; }

public string? OperationsDestination { get; set; }
public JsonSerializerOptions JsonSerializerOptions { get; set; }
}
8 changes: 5 additions & 3 deletions test/Rebus.Operations.Tests/TestOperationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public static void Reset()

}

public override ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object command)
public override ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object command,
object? additionalData, IDictionary<string,string>? additionalHeaders)
{
if (Operations.ContainsKey(operationId))
return GetByIdAsync(operationId)!;
Expand All @@ -35,7 +36,8 @@ public override ValueTask<IOperation> GetOrCreateAsync(Guid operationId, object
return new ValueTask<IOperation>(op);
}

public override ValueTask<bool> TryChangeStatusAsync(IOperation operation, OperationStatus newStatus, object? additionalData)
public override ValueTask<bool> TryChangeStatusAsync(IOperation operation, OperationStatus newStatus,
object? additionalData, IDictionary<string,string>? messageHeaders)
{
if (!Operations.ContainsKey(operation.Id))
return new ValueTask<bool>(false);
Expand All @@ -46,7 +48,7 @@ public override ValueTask<bool> TryChangeStatusAsync(IOperation operation, Opera
}

public override ValueTask AddProgressAsync(Guid progressId, DateTimeOffset timestamp, IOperation operation, IOperationTask task,
object? data)
object? data, IDictionary<string,string>? messageHeaders)
{
if(!Progress.ContainsKey(progressId))
Progress.Add(operation.Id, new List<object>());
Expand Down

0 comments on commit 9a80fe6

Please sign in to comment.