From 899dff331c7e5a202db47f610e6eb99d51b5ea21 Mon Sep 17 00:00:00 2001 From: Nazarii Piontko <1506550+nazarii-piontko@users.noreply.github.com> Date: Sat, 22 Jul 2023 20:34:41 +0200 Subject: [PATCH 1/3] Make ConfluentProtobufTypeNameResolver more flexible on type name construction --- .../DependencyConfiguratorExtensions.cs | 164 ++++++++++++++++++ .../IDependencyConfigurator.cs | 7 + .../MicrosoftDependencyConfigurator.cs | 13 ++ .../ConfluentAvroTypeNameResolver.cs | 2 +- .../ConsumerConfigurationBuilderExtensions.cs | 4 +- .../IConfluentAvroTypeNameResolver.cs | 7 + .../ProducerConfigurationBuilderExtensions.cs | 4 +- .../AssemblyInfo.cs | 4 + .../ConfluentProtobufTypeNameResolver.cs | 15 +- .../ConsumerConfigurationBuilderExtensions.cs | 4 +- .../IConfluentProtobufTypeNameResolver.cs | 7 + .../ProducerConfigurationBuilderExtensions.cs | 4 +- .../DependencyConfiguratorExtensionsTests.cs | 143 +++++++++++++++ .../KafkaFlow.UnitTests.csproj | 3 + .../ConfluentProtobufTypeNameResolverTests.cs | 74 ++++++++ .../UnityDependencyConfigurator.cs | 3 +- 16 files changed, 450 insertions(+), 8 deletions(-) create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs create mode 100644 src/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs create mode 100644 src/KafkaFlow.UnitTests/Serializers/SchemaRegistry/ConfluentProtobufTypeNameResolverTests.cs diff --git a/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs b/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs index 8059be30c..6de2ff682 100644 --- a/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs +++ b/src/KafkaFlow.Abstractions/Extensions/DependencyConfiguratorExtensions.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using System; + using KafkaFlow.Configuration; /// /// Provides extension methods over @@ -21,6 +22,22 @@ public static IDependencyConfigurator AddSingleton(th return configurator.Add(InstanceLifetime.Singleton); } + /// + /// Registers a singleton type mapping if the given is not already registered + /// + /// The object that this method was called on. + /// that will be requested. + /// that will actually be returned. + /// + public static IDependencyConfigurator TryAddSingleton(this IDependencyConfigurator configurator) + where TImplementation : class, TService + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(InstanceLifetime.Singleton); + } + /// /// Registers a singleton type mapping /// @@ -33,6 +50,20 @@ public static IDependencyConfigurator AddSingleton(this IDependencyCon return configurator.Add(InstanceLifetime.Singleton); } + /// + /// Registers a singleton type mapping if the given is not already registered + /// + /// The object that this method was called on + /// that will be created + /// + public static IDependencyConfigurator TryAddSingleton(this IDependencyConfigurator configurator) + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(InstanceLifetime.Singleton); + } + /// /// Registers a singleton type mapping where the returned instance will be the given implementation /// @@ -48,6 +79,23 @@ public static IDependencyConfigurator AddSingleton( return configurator.Add(service); } + /// + /// Registers a singleton type mapping where the returned instance will be the given implementation if the given is not already registered + /// + /// The object that this method was called on + /// that will be returned + /// Type that will be created + /// + public static IDependencyConfigurator TryAddSingleton( + this IDependencyConfigurator configurator, + TService service) + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(service); + } + /// /// Registers a singleton type mapping where the returned instance will be given by the provided factory /// @@ -65,6 +113,25 @@ public static IDependencyConfigurator AddSingleton( InstanceLifetime.Singleton); } + /// + /// Registers a singleton type mapping where the returned instance will be given by the provided factory if the given is not already registered + /// + /// The object that this method was called on + /// A factory to create new instances of the service implementation + /// Type that will be created + /// + public static IDependencyConfigurator TryAddSingleton( + this IDependencyConfigurator configurator, + Func factory) + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add( + typeof(TService), + factory, + InstanceLifetime.Singleton); + } + /// /// Registers a transient type mapping /// @@ -83,6 +150,26 @@ public static IDependencyConfigurator AddTransient( InstanceLifetime.Transient); } + /// + /// Registers a transient type mapping if the given is not already registered + /// + /// The object that this method was called on + /// that will be requested + /// that will actually be returned + /// + public static IDependencyConfigurator TryAddTransient( + this IDependencyConfigurator configurator, + Type serviceType, + Type implementationType) + { + return configurator.AlreadyRegistered(serviceType) + ? configurator + : configurator.Add( + serviceType, + implementationType, + InstanceLifetime.Transient); + } + /// /// Registers a transient type mapping /// @@ -95,6 +182,50 @@ public static IDependencyConfigurator AddTransient(this IDependencyCon return configurator.Add(InstanceLifetime.Transient); } + /// + /// Registers a transient type mapping if the given is not already registered + /// + /// The object that this method was called on + /// Type that will be created + /// + public static IDependencyConfigurator TryAddTransient(this IDependencyConfigurator configurator) + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(InstanceLifetime.Transient); + } + + /// + /// Registers a singleton type mapping + /// + /// The object that this method was called on. + /// that will be requested. + /// that will actually be returned. + /// + public static IDependencyConfigurator AddTransient(this IDependencyConfigurator configurator) + where TImplementation : class, TService + where TService : class + { + return configurator.Add(InstanceLifetime.Transient); + } + + /// + /// Registers a singleton type mapping if the given is not already registered + /// + /// The object that this method was called on. + /// that will be requested. + /// that will actually be returned. + /// + public static IDependencyConfigurator TryAddTransient(this IDependencyConfigurator configurator) + where TImplementation : class, TService + where TService : class + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add(InstanceLifetime.Transient); + } + /// /// Registers a transient type mapping where the returned instance will be given by the provided factory /// @@ -111,5 +242,38 @@ public static IDependencyConfigurator AddTransient( factory, InstanceLifetime.Transient); } + + /// + /// Registers a transient type mapping where the returned instance will be given by the provided factory if the given is not already registered + /// + /// The object that this method was called on + /// A factory to create new instances of the service implementation + /// Type that will be created + /// + public static IDependencyConfigurator TryAddTransient( + this IDependencyConfigurator configurator, + Func factory) + { + return configurator.AlreadyRegistered(typeof(TService)) + ? configurator + : configurator.Add( + typeof(TService), + factory, + InstanceLifetime.Transient); + } + + /// + /// Add a type mapping to the cluster dependency resolver + /// + /// Cluster configuration builder + /// A handler to set the configuration values + /// + public static IClusterConfigurationBuilder WithDependencies( + this IClusterConfigurationBuilder cluster, + Action handler) + { + handler(cluster.DependencyConfigurator); + return cluster; + } } } diff --git a/src/KafkaFlow.Abstractions/IDependencyConfigurator.cs b/src/KafkaFlow.Abstractions/IDependencyConfigurator.cs index 5f8212664..150a2cc3e 100644 --- a/src/KafkaFlow.Abstractions/IDependencyConfigurator.cs +++ b/src/KafkaFlow.Abstractions/IDependencyConfigurator.cs @@ -54,5 +54,12 @@ IDependencyConfigurator Add( Type serviceType, Func factory, InstanceLifetime lifetime); + + /// + /// Checks if the given is already registered + /// + /// Service type + /// + bool AlreadyRegistered(Type registeredType); } } diff --git a/src/KafkaFlow.Microsoft.DependencyInjection/MicrosoftDependencyConfigurator.cs b/src/KafkaFlow.Microsoft.DependencyInjection/MicrosoftDependencyConfigurator.cs index 7d0b993d3..abb49e4ce 100644 --- a/src/KafkaFlow.Microsoft.DependencyInjection/MicrosoftDependencyConfigurator.cs +++ b/src/KafkaFlow.Microsoft.DependencyInjection/MicrosoftDependencyConfigurator.cs @@ -73,6 +73,19 @@ public IDependencyConfigurator Add( return this; } + public bool AlreadyRegistered(Type registeredType) + { + foreach (var s in this.services) + { + if (s.ServiceType == registeredType) + { + return true; + } + } + + return false; + } + private static ServiceLifetime ParseLifetime(InstanceLifetime lifetime) { switch (lifetime) diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs index 8a8af39e1..526f5919b 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs @@ -4,7 +4,7 @@ namespace KafkaFlow using Confluent.SchemaRegistry; using Newtonsoft.Json; - internal class ConfluentAvroTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver + internal class ConfluentAvroTypeNameResolver : IConfluentAvroTypeNameResolver { private readonly ISchemaRegistryClient client; diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs index 81ac47435..49fc8d95a 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs @@ -17,10 +17,12 @@ public static class ConsumerConfigurationBuilderExtensions public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { + middlewares.DependencyConfigurator.TryAddTransient(); + return middlewares.Add( resolver => new SerializerConsumerMiddleware( new ConfluentAvroSerializer(resolver), - new SchemaRegistryTypeResolver(new ConfluentAvroTypeNameResolver(resolver.Resolve())))); + new SchemaRegistryTypeResolver(resolver.Resolve()))); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs new file mode 100644 index 000000000..90d55fea1 --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs @@ -0,0 +1,7 @@ +namespace KafkaFlow +{ + /// + public interface IConfluentAvroTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver + { + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs index d60a7a39c..9e529caac 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs @@ -20,10 +20,12 @@ public static IProducerMiddlewareConfigurationBuilder AddSchemaRegistryAvroSeria this IProducerMiddlewareConfigurationBuilder middlewares, AvroSerializerConfig config = null) { + middlewares.DependencyConfigurator.TryAddTransient(); + return middlewares.Add( resolver => new SerializerProducerMiddleware( new ConfluentAvroSerializer(resolver, config), - new SchemaRegistryTypeResolver(new ConfluentAvroTypeNameResolver(resolver.Resolve())))); + new SchemaRegistryTypeResolver(resolver.Resolve()))); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs new file mode 100644 index 000000000..f8402e685 --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/AssemblyInfo.cs @@ -0,0 +1,4 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs index f57f8fca7..cae42be8e 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs @@ -6,7 +6,7 @@ namespace KafkaFlow using Google.Protobuf; using Google.Protobuf.Reflection; - internal class ConfluentProtobufTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver + internal sealed class ConfluentProtobufTypeNameResolver : IConfluentProtobufTypeNameResolver { private readonly ISchemaRegistryClient client; @@ -21,7 +21,18 @@ public async Task ResolveAsync(int id) var protoFields = FileDescriptorProto.Parser.ParseFrom(ByteString.FromBase64(schemaString)); - return $"{protoFields.Package}.{protoFields.MessageType.FirstOrDefault()?.Name}"; + return BuildTypeName(protoFields); + } + + private static string BuildTypeName(FileDescriptorProto protoFields) + { + var package = protoFields.Package; + if (string.IsNullOrEmpty(package)) + { + package = protoFields.Options.CsharpNamespace; + } + + return $"{package}.{protoFields.MessageType.FirstOrDefault()?.Name}"; } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs index ce3b0e4c3..e1cefce90 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs @@ -17,10 +17,12 @@ public static class ConsumerConfigurationBuilderExtensions public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { + middlewares.DependencyConfigurator.TryAddTransient(); + return middlewares.Add( resolver => new SerializerConsumerMiddleware( new ConfluentProtobufSerializer(resolver), - new SchemaRegistryTypeResolver(new ConfluentProtobufTypeNameResolver(resolver.Resolve())))); + new SchemaRegistryTypeResolver(resolver.Resolve()))); } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs new file mode 100644 index 000000000..e15b9d8a2 --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs @@ -0,0 +1,7 @@ +namespace KafkaFlow +{ + /// + public interface IConfluentProtobufTypeNameResolver : IAsyncSchemaRegistryTypeNameResolver + { + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs index fc59f5e3d..7af504534 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs @@ -20,10 +20,12 @@ public static IProducerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufS this IProducerMiddlewareConfigurationBuilder middlewares, ProtobufSerializerConfig config = null) { + middlewares.DependencyConfigurator.TryAddTransient(); + return middlewares.Add( resolver => new SerializerProducerMiddleware( new ConfluentProtobufSerializer(resolver, config), - new SchemaRegistryTypeResolver(new ConfluentProtobufTypeNameResolver(resolver.Resolve())))); + new SchemaRegistryTypeResolver(resolver.Resolve()))); } } } diff --git a/src/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs b/src/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs new file mode 100644 index 000000000..e083248d1 --- /dev/null +++ b/src/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs @@ -0,0 +1,143 @@ +namespace KafkaFlow.UnitTests.ConfigurationBuilders +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Linq.Expressions; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class DependencyConfiguratorExtensionsTests + { + private static IEnumerable MethodsForAdd + { + get + { + yield return new object[] + { + (Action)(configurator => configurator.AddSingleton()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddSingleton()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddSingleton(new ArrayList())), + (Expression>)(configurator => configurator.Add(It.IsAny())), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddSingleton(_ => new ArrayList())), + (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddTransient()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddTransient()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), + }; + yield return new object[] + { + (Action)(configurator => configurator.AddTransient(_ => new ArrayList())), + (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Transient)), + }; + } + } + + private static IEnumerable MethodsForTryAdd + { + get + { + yield return new object[] + { + (Action)(configurator => configurator.TryAddSingleton()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddSingleton()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddSingleton(new ArrayList())), + (Expression>)(configurator => configurator.Add(It.IsAny())), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddSingleton(_ => new ArrayList())), + (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Singleton)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddTransient()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddTransient()), + (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), + }; + yield return new object[] + { + (Action)(configurator => configurator.TryAddTransient(_ => new ArrayList())), + (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Transient)), + }; + } + } + + [TestMethod] + [DynamicData(nameof(MethodsForAdd))] + public void Add(Action addMethod, Expression> underlyingAddMethod) + { + // Arrange + var configurator = new Mock(); + + // Act + addMethod(configurator.Object); + + // Assert + configurator.Verify(underlyingAddMethod, Times.Once); + } + + [TestMethod] + [DynamicData(nameof(MethodsForTryAdd))] + public void TryAdd_AddWhenNotExists(Action tryAddMethod, Expression> underlyingAddMethod) + { + // Arrange + var configurator = new Mock(); + configurator.Setup(c => c.AlreadyRegistered(typeof(IList))).Returns(false); + configurator.Setup(c => c.AlreadyRegistered(typeof(ArrayList))).Returns(false); + + // Act + tryAddMethod(configurator.Object); + + // Assert + configurator.Verify(underlyingAddMethod, Times.Once); + } + + [TestMethod] + [DynamicData(nameof(MethodsForTryAdd))] + public void TryAdd_IgnoreWhenExists(Action tryAddMethod, Expression> underlyingAddMethod) + { + // Arrange + var configurator = new Mock(); + configurator.Setup(c => c.AlreadyRegistered(typeof(IList))).Returns(true); + configurator.Setup(c => c.AlreadyRegistered(typeof(ArrayList))).Returns(true); + + // Act + tryAddMethod(configurator.Object); + + // Assert + configurator.Verify(underlyingAddMethod, Times.Never); + } + } +} diff --git a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index 92605b7dd..0bb24a9e7 100644 --- a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -25,6 +25,8 @@ + + @@ -32,6 +34,7 @@ + diff --git a/src/KafkaFlow.UnitTests/Serializers/SchemaRegistry/ConfluentProtobufTypeNameResolverTests.cs b/src/KafkaFlow.UnitTests/Serializers/SchemaRegistry/ConfluentProtobufTypeNameResolverTests.cs new file mode 100644 index 000000000..abf69070e --- /dev/null +++ b/src/KafkaFlow.UnitTests/Serializers/SchemaRegistry/ConfluentProtobufTypeNameResolverTests.cs @@ -0,0 +1,74 @@ +namespace KafkaFlow.UnitTests.Serializers.SchemaRegistry +{ + using System; + using System.Threading.Tasks; + using Confluent.SchemaRegistry; + using FluentAssertions; + using Google.Protobuf; + using Google.Protobuf.Reflection; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class ConfluentProtobufTypeNameResolverTests + { + private const string MessageTypeName = "TestMessage"; + + [TestMethod] + public async Task ResolveAsync_WithPackage_ReturnTypeName() + { + // Arrange + var schemaRegistryClientMock = CreateSchemaRegistryClientMock(p => p.Package = "TestPackage"); + var resolver = new ConfluentProtobufTypeNameResolver(schemaRegistryClientMock.Object); + + // Act + var typeName = await resolver.ResolveAsync(1); + + // Assert + typeName.Should().Be($"TestPackage.{MessageTypeName}"); + } + + [TestMethod] + public async Task ResolveAsync_NoPackageWithCsharpNamespace_ReturnTypeName() + { + // Arrange + var schemaRegistryClientMock = CreateSchemaRegistryClientMock(p => + { + p.Package = string.Empty; + p.Options.CsharpNamespace = "TestCsharpNamespace"; + }); + var resolver = new ConfluentProtobufTypeNameResolver(schemaRegistryClientMock.Object); + + // Act + var typeName = await resolver.ResolveAsync(1); + + // Assert + typeName.Should().Be($"TestCsharpNamespace.{MessageTypeName}"); + } + + private static Mock CreateSchemaRegistryClientMock(Action configure) + { + var protoFields = new FileDescriptorProto + { + MessageType = + { + new DescriptorProto + { + Name = MessageTypeName, + }, + }, + Options = new FileOptions(), + }; + configure(protoFields); + + var schema = new Schema(protoFields.ToByteString().ToBase64(), SchemaType.Protobuf); + + var schemaRegistryClientMock = new Mock(); + schemaRegistryClientMock + .Setup(o => o.GetSchemaAsync(1, "serialized")) + .ReturnsAsync(schema); + + return schemaRegistryClientMock; + } + } +} diff --git a/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs b/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs index 8273095bf..45a03ee14 100644 --- a/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs +++ b/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs @@ -82,7 +82,8 @@ public IDependencyConfigurator Add( return this; } - private bool AlreadyRegistered(Type registeredType) + /// + public bool AlreadyRegistered(Type registeredType) { return this.container.Registrations.Any(x => x.RegisteredType == registeredType); } From 28bd63f2359d42d75891383be7a91122fae497ef Mon Sep 17 00:00:00 2001 From: Nazar Piontko Date: Sun, 19 Nov 2023 13:14:58 +0100 Subject: [PATCH 2/3] fix: upgrade nuget packages --- src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index 123f684e9..b5629eeb1 100644 --- a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -25,8 +25,8 @@ - - + + From 235bc6fd08a5646141d85802f8bbf3c2113e7f8a Mon Sep 17 00:00:00 2001 From: Nazarii Piontko <1506550+nazarii-piontko@users.noreply.github.com> Date: Tue, 28 Nov 2023 18:57:09 +0100 Subject: [PATCH 3/3] fix: clean changes --- .../Program.cs | 4 +- .../ConfluentAvroTypeNameResolver.cs | 2 +- .../ConsumerConfigurationBuilderExtensions.cs | 2 +- .../IConfluentAvroTypeNameResolver.cs | 2 +- .../ConfluentProtobufTypeNameResolver.cs | 2 +- .../ConsumerConfigurationBuilderExtensions.cs | 2 +- .../IConfluentProtobufTypeNameResolver.cs | 2 +- .../DependencyConfiguratorExtensionsTests.cs | 143 ------------------ .../ConfluentProtobufTypeNameResolverTests.cs | 74 --------- .../UnityDependencyConfigurator.cs | 22 +-- .../KafkaFlow.UnitTests.csproj | 9 -- .../ConfluentProtobufTypeNameResolverTests.cs | 3 +- 12 files changed, 20 insertions(+), 247 deletions(-) delete mode 100644 src/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs delete mode 100644 src/KafkaFlow.UnitTests/Serializers/SchemaRegistry/ConfluentProtobufTypeNameResolverTests.cs diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs index 3106ab60c..ba5b18d80 100644 --- a/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs +++ b/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs @@ -73,7 +73,7 @@ .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSchemaRegistryAvroSerializer() + .AddSchemaRegistryAvroDeserializer() .AddTypedHandlers( handlers => handlers .AddHandler() @@ -102,7 +102,7 @@ .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSchemaRegistryProtobufSerializer() + .AddSchemaRegistryProtobufDeserializer() .AddTypedHandlers(handlers => handlers.AddHandler()) ) ) diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs index 5a3a05b09..a8dd3c3b1 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroTypeNameResolver.cs @@ -2,7 +2,7 @@ using Confluent.SchemaRegistry; using Newtonsoft.Json; -namespace KafkaFlow +namespace KafkaFlow.Serializer.SchemaRegistry { internal sealed class ConfluentAvroTypeNameResolver : IConfluentAvroTypeNameResolver { diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs index 794ff4916..4ca34e35e 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs @@ -14,7 +14,7 @@ public static class ConsumerConfigurationBuilderExtensions /// /// The middleware configuration builder /// - public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { middlewares.DependencyConfigurator.TryAddTransient(); diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs index c0398af60..19391b03e 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/IConfluentAvroTypeNameResolver.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow +namespace KafkaFlow.Serializer.SchemaRegistry { /// public interface IConfluentAvroTypeNameResolver : ISchemaRegistryTypeNameResolver diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs index e04a2dda4..665bb4af1 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs @@ -4,7 +4,7 @@ using Google.Protobuf; using Google.Protobuf.Reflection; -namespace KafkaFlow +namespace KafkaFlow.Serializer.SchemaRegistry { internal class ConfluentProtobufTypeNameResolver : IConfluentProtobufTypeNameResolver { diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs index b52fe79fc..b60d20812 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs @@ -14,7 +14,7 @@ public static class ConsumerConfigurationBuilderExtensions /// /// The middleware configuration builder /// - public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { middlewares.DependencyConfigurator.TryAddTransient(); diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs index 32da1c8df..1f17fa35f 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/IConfluentProtobufTypeNameResolver.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow +namespace KafkaFlow.Serializer.SchemaRegistry { /// public interface IConfluentProtobufTypeNameResolver : ISchemaRegistryTypeNameResolver diff --git a/src/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs b/src/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs deleted file mode 100644 index e083248d1..000000000 --- a/src/KafkaFlow.UnitTests/ConfigurationBuilders/DependencyConfiguratorExtensionsTests.cs +++ /dev/null @@ -1,143 +0,0 @@ -namespace KafkaFlow.UnitTests.ConfigurationBuilders -{ - using System; - using System.Collections; - using System.Collections.Generic; - using System.Linq.Expressions; - using Microsoft.VisualStudio.TestTools.UnitTesting; - using Moq; - - [TestClass] - public class DependencyConfiguratorExtensionsTests - { - private static IEnumerable MethodsForAdd - { - get - { - yield return new object[] - { - (Action)(configurator => configurator.AddSingleton()), - (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), - }; - yield return new object[] - { - (Action)(configurator => configurator.AddSingleton()), - (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), - }; - yield return new object[] - { - (Action)(configurator => configurator.AddSingleton(new ArrayList())), - (Expression>)(configurator => configurator.Add(It.IsAny())), - }; - yield return new object[] - { - (Action)(configurator => configurator.AddSingleton(_ => new ArrayList())), - (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Singleton)), - }; - yield return new object[] - { - (Action)(configurator => configurator.AddTransient()), - (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), - }; - yield return new object[] - { - (Action)(configurator => configurator.AddTransient()), - (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), - }; - yield return new object[] - { - (Action)(configurator => configurator.AddTransient(_ => new ArrayList())), - (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Transient)), - }; - } - } - - private static IEnumerable MethodsForTryAdd - { - get - { - yield return new object[] - { - (Action)(configurator => configurator.TryAddSingleton()), - (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), - }; - yield return new object[] - { - (Action)(configurator => configurator.TryAddSingleton()), - (Expression>)(configurator => configurator.Add(InstanceLifetime.Singleton)), - }; - yield return new object[] - { - (Action)(configurator => configurator.TryAddSingleton(new ArrayList())), - (Expression>)(configurator => configurator.Add(It.IsAny())), - }; - yield return new object[] - { - (Action)(configurator => configurator.TryAddSingleton(_ => new ArrayList())), - (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Singleton)), - }; - yield return new object[] - { - (Action)(configurator => configurator.TryAddTransient()), - (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), - }; - yield return new object[] - { - (Action)(configurator => configurator.TryAddTransient()), - (Expression>)(configurator => configurator.Add(InstanceLifetime.Transient)), - }; - yield return new object[] - { - (Action)(configurator => configurator.TryAddTransient(_ => new ArrayList())), - (Expression>)(configurator => configurator.Add(typeof(ArrayList), It.IsAny>(), InstanceLifetime.Transient)), - }; - } - } - - [TestMethod] - [DynamicData(nameof(MethodsForAdd))] - public void Add(Action addMethod, Expression> underlyingAddMethod) - { - // Arrange - var configurator = new Mock(); - - // Act - addMethod(configurator.Object); - - // Assert - configurator.Verify(underlyingAddMethod, Times.Once); - } - - [TestMethod] - [DynamicData(nameof(MethodsForTryAdd))] - public void TryAdd_AddWhenNotExists(Action tryAddMethod, Expression> underlyingAddMethod) - { - // Arrange - var configurator = new Mock(); - configurator.Setup(c => c.AlreadyRegistered(typeof(IList))).Returns(false); - configurator.Setup(c => c.AlreadyRegistered(typeof(ArrayList))).Returns(false); - - // Act - tryAddMethod(configurator.Object); - - // Assert - configurator.Verify(underlyingAddMethod, Times.Once); - } - - [TestMethod] - [DynamicData(nameof(MethodsForTryAdd))] - public void TryAdd_IgnoreWhenExists(Action tryAddMethod, Expression> underlyingAddMethod) - { - // Arrange - var configurator = new Mock(); - configurator.Setup(c => c.AlreadyRegistered(typeof(IList))).Returns(true); - configurator.Setup(c => c.AlreadyRegistered(typeof(ArrayList))).Returns(true); - - // Act - tryAddMethod(configurator.Object); - - // Assert - configurator.Verify(underlyingAddMethod, Times.Never); - } - } -} diff --git a/src/KafkaFlow.UnitTests/Serializers/SchemaRegistry/ConfluentProtobufTypeNameResolverTests.cs b/src/KafkaFlow.UnitTests/Serializers/SchemaRegistry/ConfluentProtobufTypeNameResolverTests.cs deleted file mode 100644 index abf69070e..000000000 --- a/src/KafkaFlow.UnitTests/Serializers/SchemaRegistry/ConfluentProtobufTypeNameResolverTests.cs +++ /dev/null @@ -1,74 +0,0 @@ -namespace KafkaFlow.UnitTests.Serializers.SchemaRegistry -{ - using System; - using System.Threading.Tasks; - using Confluent.SchemaRegistry; - using FluentAssertions; - using Google.Protobuf; - using Google.Protobuf.Reflection; - using Microsoft.VisualStudio.TestTools.UnitTesting; - using Moq; - - [TestClass] - public class ConfluentProtobufTypeNameResolverTests - { - private const string MessageTypeName = "TestMessage"; - - [TestMethod] - public async Task ResolveAsync_WithPackage_ReturnTypeName() - { - // Arrange - var schemaRegistryClientMock = CreateSchemaRegistryClientMock(p => p.Package = "TestPackage"); - var resolver = new ConfluentProtobufTypeNameResolver(schemaRegistryClientMock.Object); - - // Act - var typeName = await resolver.ResolveAsync(1); - - // Assert - typeName.Should().Be($"TestPackage.{MessageTypeName}"); - } - - [TestMethod] - public async Task ResolveAsync_NoPackageWithCsharpNamespace_ReturnTypeName() - { - // Arrange - var schemaRegistryClientMock = CreateSchemaRegistryClientMock(p => - { - p.Package = string.Empty; - p.Options.CsharpNamespace = "TestCsharpNamespace"; - }); - var resolver = new ConfluentProtobufTypeNameResolver(schemaRegistryClientMock.Object); - - // Act - var typeName = await resolver.ResolveAsync(1); - - // Assert - typeName.Should().Be($"TestCsharpNamespace.{MessageTypeName}"); - } - - private static Mock CreateSchemaRegistryClientMock(Action configure) - { - var protoFields = new FileDescriptorProto - { - MessageType = - { - new DescriptorProto - { - Name = MessageTypeName, - }, - }, - Options = new FileOptions(), - }; - configure(protoFields); - - var schema = new Schema(protoFields.ToByteString().ToBase64(), SchemaType.Protobuf); - - var schemaRegistryClientMock = new Mock(); - schemaRegistryClientMock - .Setup(o => o.GetSchemaAsync(1, "serialized")) - .ReturnsAsync(schema); - - return schemaRegistryClientMock; - } - } -} diff --git a/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs b/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs index c0872fa19..7e24b35ac 100644 --- a/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs +++ b/src/KafkaFlow.Unity/UnityDependencyConfigurator.cs @@ -1,7 +1,7 @@ using System; using System.Linq; -using global::Unity; -using global::Unity.Lifetime; +using Unity; +using Unity.Lifetime; namespace KafkaFlow.Unity { @@ -10,7 +10,7 @@ namespace KafkaFlow.Unity /// public class UnityDependencyConfigurator : IDependencyConfigurator { - private readonly IUnityContainer container; + private readonly IUnityContainer _container; /// /// Initializes a new instance of the class. @@ -18,7 +18,7 @@ public class UnityDependencyConfigurator : IDependencyConfigurator /// The Unity container instance public UnityDependencyConfigurator(IUnityContainer container) { - this.container = container; + _container = container; } /// @@ -27,7 +27,7 @@ public IDependencyConfigurator Add( Type implementationType, InstanceLifetime lifetime) { - this.container.RegisterType( + _container.RegisterType( serviceType, implementationType, (ITypeLifetimeManager)ParseLifetime(lifetime)); @@ -39,7 +39,7 @@ public IDependencyConfigurator Add(InstanceLifetime l where TService : class where TImplementation : class, TService { - this.container.RegisterType((ITypeLifetimeManager)ParseLifetime(lifetime)); + _container.RegisterType((ITypeLifetimeManager)ParseLifetime(lifetime)); return this; } @@ -47,7 +47,7 @@ public IDependencyConfigurator Add(InstanceLifetime l public IDependencyConfigurator Add(InstanceLifetime lifetime) where TService : class { - this.container.RegisterType((ITypeLifetimeManager)ParseLifetime(lifetime)); + _container.RegisterType((ITypeLifetimeManager)ParseLifetime(lifetime)); return this; } @@ -55,7 +55,7 @@ public IDependencyConfigurator Add(InstanceLifetime lifetime) public IDependencyConfigurator Add(TImplementation service) where TImplementation : class { - this.container.RegisterInstance(service); + _container.RegisterInstance(service); return this; } @@ -67,12 +67,12 @@ public IDependencyConfigurator Add( { string name = null; - if (this.AlreadyRegistered(serviceType)) + if (AlreadyRegistered(serviceType)) { name = Guid.NewGuid().ToString(); } - this.container.RegisterFactory( + _container.RegisterFactory( serviceType, name, c => factory(new UnityDependencyResolver(c)), @@ -84,7 +84,7 @@ public IDependencyConfigurator Add( /// public bool AlreadyRegistered(Type registeredType) { - return this.container.Registrations.Any(x => x.RegisteredType == registeredType); + return _container.Registrations.Any(x => x.RegisteredType == registeredType); } private static object ParseLifetime(InstanceLifetime lifetime) => diff --git a/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index 16435660f..62f4b7482 100644 --- a/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/tests/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -40,15 +40,6 @@ - - - - - - - - - diff --git a/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs b/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs index c7d6ef294..5252be581 100644 --- a/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs +++ b/tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs @@ -1,11 +1,10 @@ using System; using System.Threading.Tasks; using Confluent.SchemaRegistry; - using FluentAssertions; - using Google.Protobuf; using Google.Protobuf.Reflection; +using KafkaFlow.Serializer.SchemaRegistry; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq;