Skip to content

Commit

Permalink
zarusz#238 Resolve HybridMessageSerializer dependencies from IService…
Browse files Browse the repository at this point in the history
…Provider

Signed-off-by: Richard Pringle <[email protected]>
  • Loading branch information
EtherZa committed Apr 6, 2024
1 parent 5479263 commit 1f40bd1
Show file tree
Hide file tree
Showing 17 changed files with 477 additions and 50 deletions.
35 changes: 20 additions & 15 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,26 @@ The Hybrid plugin allows to have multiple serialization formats on one message b
To use it install the nuget package `SlimMessageBus.Host.Serialization.Hybrid` and then configure the bus:

```cs
services.AddSlimMessageBus(mbb =>
{
// serializer 1
var avroSerializer = new AvroMessageSerializer();

// serializer 2
var jsonSerializer = new JsonMessageSerializer();

// Note: Certain messages will be serialized by the Avro serializer, other using the Json serializer
mbb.AddHybridSerializer(new Dictionary<IMessageSerializer, Type[]>
{
[jsonSerializer] = new[] { typeof(SubtractCommand) }, // the first one will be the default serializer, no need to declare types here
[avroSerializer] = new[] { typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse) },
}, defaultMessageSerializer: jsonSerializer);
});
services
.AddSlimMessageBus(mbb =>
{
mbb
.AddHybridSerializer(
builder => {
builder
.AddJsonSerializer()
.AsDefault();

builder
.AddAvroSerializer()
.For(typeof(Message1), typeof(Message2));

builder
.AddGoogleProtobufSerializer()
.For(typeof(Message3));
})
...
}
```

The routing to the proper serializer happens based on message type. When a type cannot be matched the default serializer will be used.
27 changes: 11 additions & 16 deletions src/Samples/Sample.Serialization.ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace Sample.Serialization.ConsoleApp;

using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

Expand All @@ -11,7 +12,6 @@
using SlimMessageBus.Host;
using SlimMessageBus.Host.Memory;
using SlimMessageBus.Host.Redis;
using SlimMessageBus.Host.Serialization;
using SlimMessageBus.Host.Serialization.Avro;
using SlimMessageBus.Host.Serialization.Hybrid;
using SlimMessageBus.Host.Serialization.Json;
Expand All @@ -27,7 +27,7 @@ enum Provider

/// <summary>
/// This sample shows:
/// 1. How tu use the Avro serializer (for contract Avro IDL first apprach to generate C# code)
/// 1. How tu use the Avro serializer (for contract Avro IDL first approach to generate C# code)
/// 2. How to combine two serializer approaches in one app (using the Hybrid serializer).
/// </summary>
class Program
Expand All @@ -40,17 +40,11 @@ static async Task Main(string[] args) => await Host.CreateDefaultBuilder(args)

services.AddHostedService<MainProgram>();

// alternatively a simpler approach, but using the slower ReflectionMessageCreationStategy and ReflectionSchemaLookupStrategy
var avroSerializer = new AvroMessageSerializer();

// Avro serialized using the AvroConvert library - no schema generation neeeded upfront.
var jsonSerializer = new JsonMessageSerializer();

services
.AddSlimMessageBus(mbb =>
{
// Note: remember that Memory provider does not support req-resp yet.
var provider = Provider.Redis;
var provider = Provider.Memory;

/*
var sl = new DictionarySchemaLookupStrategy();
Expand All @@ -59,7 +53,7 @@ static async Task Main(string[] args) => await Host.CreateDefaultBuilder(args)
sl.Add(typeof(MultiplyRequest), MultiplyRequest._SCHEMA);
sl.Add(typeof(MultiplyResponse), MultiplyResponse._SCHEMA);
var mf = new DictionaryMessageCreationStategy();
var mf = new DictionaryMessageCreationStrategy();
/// register all your types
mf.Add(typeof(AddCommand), () => new AddCommand());
mf.Add(typeof(MultiplyRequest), () => new MultiplyRequest());
Expand All @@ -71,13 +65,14 @@ static async Task Main(string[] args) => await Host.CreateDefaultBuilder(args)

mbb
.AddServicesFromAssemblyContaining<AddCommandConsumer>()
// Note: Certain messages will be serialized by one Avro serializer, other using the Json serializer
.AddHybridSerializer(new Dictionary<IMessageSerializer, Type[]>
.AddHybridSerializer(builder =>
{
[jsonSerializer] = new[] { typeof(SubtractCommand) }, // the first one will be the default serializer, no need to declare types here
[avroSerializer] = new[] { typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse) },
}, defaultMessageSerializer: jsonSerializer)
builder.AddJsonSerializer()
.AsDefault();

builder.AddAvroSerializer()
.For(typeof(AddCommand), typeof(MultiplyRequest), typeof(MultiplyResponse));
})
.Produce<AddCommand>(x => x.DefaultTopic("AddCommand"))
.Consume<AddCommand>(x => x.Topic("AddCommand").WithConsumer<AddCommandConsumer>())

Expand Down Expand Up @@ -221,7 +216,7 @@ public class SubtractCommandConsumer : IConsumer<SubtractCommand>
{
public async Task OnHandle(SubtractCommand message)
{
Console.WriteLine("Consumer: Subracting {0} and {1} gives {2}", message.Left, message.Right, message.Left - message.Right);
Console.WriteLine("Consumer: Subtracting {0} and {1} gives {2}", message.Left, message.Right, message.Left - message.Right);
await Task.Delay(50); // Simulate some work
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace SlimMessageBus.Host.Builders
{
using System;

public interface IHybridSerializationBuilder
{
IHybridSerializerBuilderOptions RegisterSerializer<TMessageSerializer>(Action<IServiceCollection> services = null) where TMessageSerializer : class, IMessageSerializer;
}

public interface IHybridSerializerBuilderOptions
{
IHybridSerializationBuilder For(params Type[] types);
IHybridSerializationBuilder AsDefault();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public object Deserialize(Type t, byte[] payload)
var writerSchema = WriteSchemaLookup(t);
AssertSchemaNotNull(t, writerSchema, true);

_logger.LogDebug("Type {0} writer schema: {1}, reader schema: {2}", t, writerSchema, readerSchema);
_logger.LogDebug("Type {MessageType} writer schema: {WriterSchema}, reader schema: {ReaderSchema}", t, writerSchema, readerSchema);

var reader = new SpecificDefaultReader(writerSchema, readerSchema);
reader.Read(message, dec);
Expand All @@ -108,7 +108,7 @@ public byte[] Serialize(Type t, object message)
var writerSchema = WriteSchemaLookup(t);
AssertSchemaNotNull(t, writerSchema, true);

_logger.LogDebug("Type {0} writer schema: {1}", t, writerSchema);
_logger.LogDebug("Type {MessageType} writer schema: {WriterSchema}", t, writerSchema);

var writer = new SpecificDefaultWriter(writerSchema); // Schema comes from pre-compiled, code-gen phase
writer.Write(message, enc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.Extensions.Logging;

using SlimMessageBus.Host;
using SlimMessageBus.Host.Builders;

public static class MessageBusBuilderExtensions
{
Expand Down Expand Up @@ -40,4 +41,30 @@ public static MessageBusBuilder AddAvroSerializer(this MessageBusBuilder mbb)
});
return mbb;
}

/// <summary>
/// Registers the <see cref="IMessageSerializer"/> with implementation as <see cref="AvroMessageSerializer"/>.
/// </summary>
/// <param name="builder"></param>
/// <returns></returns>
public static IHybridSerializerBuilderOptions AddAvroSerializer(this IHybridSerializationBuilder builder, IMessageCreationStrategy messageCreationStrategy, ISchemaLookupStrategy schemaLookupStrategy)
{
return builder.RegisterSerializer<AvroMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new AvroMessageSerializer(svp.GetRequiredService<ILoggerFactory>(), messageCreationStrategy, schemaLookupStrategy));
});
}

/// <summary>
/// Registers the <see cref="IMessageSerializer"/> with implementation as <see cref="AvroMessageSerializer"/>.
/// </summary>
/// <param name="builder"></param>
/// <returns></returns>
public static IHybridSerializerBuilderOptions AddAvroSerializer(this IHybridSerializationBuilder builder)
{
return builder.RegisterSerializer<AvroMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new AvroMessageSerializer(svp.GetRequiredService<ILoggerFactory>()));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.Extensions.Logging;

using SlimMessageBus.Host;
using SlimMessageBus.Host.Builders;

public static class MessageBusBuilderExtensions
{
Expand All @@ -23,4 +24,18 @@ public static MessageBusBuilder AddGoogleProtobufSerializer(this MessageBusBuild
});
return mbb;
}

/// <summary>
/// Registers the <see cref="IMessageSerializer"/> with implementation as <see cref="GoogleProtobufMessageSerializer"/>.
/// </summary>
/// <param name="mbb"></param>
/// <param name="messageParserFactory"></param>
/// <returns></returns>
public static IHybridSerializerBuilderOptions AddGoogleProtobufSerializer(this IHybridSerializationBuilder builder, IMessageParserFactory messageParserFactory = null)
{
return builder.RegisterSerializer<GoogleProtobufMessageSerializer>(services =>
{
services.TryAddSingleton(svp => new GoogleProtobufMessageSerializer(svp.GetRequiredService<ILoggerFactory>(), messageParserFactory));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
public class HybridMessageSerializer : IMessageSerializer
{
private readonly ILogger _logger;
private readonly IList<IMessageSerializer> _serializers = new List<IMessageSerializer>();
private readonly IDictionary<Type, IMessageSerializer> _serializerByType = new Dictionary<Type, IMessageSerializer>();
private readonly Dictionary<Type, IMessageSerializer> _serializerByType = [];

public IMessageSerializer DefaultSerializer { get; set; }

internal IReadOnlyDictionary<Type, IMessageSerializer> SerializerByType => _serializerByType;

public HybridMessageSerializer(ILogger<HybridMessageSerializer> logger, IDictionary<IMessageSerializer, Type[]> registration, IMessageSerializer defaultMessageSerializer = null)
{
_logger = logger;
Expand All @@ -24,12 +26,14 @@ public HybridMessageSerializer(ILogger<HybridMessageSerializer> logger, IDiction

public void Add(IMessageSerializer serializer, params Type[] supportedTypes)
{
if (_serializers.Count == 0 && DefaultSerializer == null)
{
DefaultSerializer = serializer;
}
#if NETSTANDARD2_0
if (serializer is null) throw new ArgumentNullException(nameof(serializer));
#else
ArgumentNullException.ThrowIfNull(serializer);
#endif

DefaultSerializer ??= serializer;

_serializers.Add(serializer);
foreach (var type in supportedTypes)
{
_serializerByType.Add(type, serializer);
Expand All @@ -38,19 +42,19 @@ public void Add(IMessageSerializer serializer, params Type[] supportedTypes)

protected virtual IMessageSerializer MatchSerializer(Type t)
{
if (_serializers.Count == 0)
{
throw new InvalidOperationException("No serializers registered.");
}

if (!_serializerByType.TryGetValue(t, out var serializer))
{
// use first as default
_logger.LogTrace("Serializer for type {0} not registered, will use default serializer", t);
_logger.LogTrace("Serializer for type {MessageType} not registered, will use default serializer", t);

if (DefaultSerializer == null)
{
throw new InvalidOperationException("No serializers registered.");
}

serializer = DefaultSerializer;
}

_logger.LogDebug("Serializer for type {0} will be {1}", t, serializer);
_logger.LogDebug("Serializer for type {MessageType} will be {Serializer}", t, serializer);
return serializer;
}

Expand Down
Loading

0 comments on commit 1f40bd1

Please sign in to comment.