Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework type name resolves to able to change the implementation #421

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using KafkaFlow.Configuration;

namespace KafkaFlow
{
Expand All @@ -21,6 +22,22 @@ public static IDependencyConfigurator AddSingleton<TService, TImplementation>(th
return configurator.Add<TService, TImplementation>(InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a singleton type mapping if the given <typeparamref name="TService" /> is not already registered
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on.</param>
/// <typeparam name="TService"><see cref="Type"/> that will be requested.</typeparam>
/// <typeparam name="TImplementation"><see cref="Type"/> that will actually be returned.</typeparam>
/// <returns></returns>
public static IDependencyConfigurator TryAddSingleton<TService, TImplementation>(this IDependencyConfigurator configurator)
where TImplementation : class, TService
where TService : class
{
return configurator.AlreadyRegistered(typeof(TService))
? configurator
: configurator.Add<TService, TImplementation>(InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a singleton type mapping
/// </summary>
Expand All @@ -33,6 +50,20 @@ public static IDependencyConfigurator AddSingleton<TService>(this IDependencyCon
return configurator.Add<TService>(InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a singleton type mapping if the given <typeparamref name="TService" /> is not already registered
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <typeparam name="TService"><see cref="Type"/> that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator TryAddSingleton<TService>(this IDependencyConfigurator configurator)
where TService : class
{
return configurator.AlreadyRegistered(typeof(TService))
? configurator
: configurator.Add<TService>(InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a singleton type mapping where the returned instance will be the given implementation
/// </summary>
Expand All @@ -48,6 +79,23 @@ public static IDependencyConfigurator AddSingleton<TService>(
return configurator.Add(service);
}

/// <summary>
/// Registers a singleton type mapping where the returned instance will be the given implementation if the given <typeparamref name="TService" /> is not already registered
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="service"><see cref="Type"/> that will be returned</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator TryAddSingleton<TService>(
this IDependencyConfigurator configurator,
TService service)
where TService : class
{
return configurator.AlreadyRegistered(typeof(TService))
? configurator
: configurator.Add(service);
}

/// <summary>
/// Registers a singleton type mapping where the returned instance will be given by the provided factory
/// </summary>
Expand All @@ -65,6 +113,25 @@ public static IDependencyConfigurator AddSingleton<TService>(
InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a singleton type mapping where the returned instance will be given by the provided factory if the given <typeparamref name="TService" /> is not already registered
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="factory">A factory to create new instances of the service implementation</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator TryAddSingleton<TService>(
this IDependencyConfigurator configurator,
Func<IDependencyResolver, TService> factory)
{
return configurator.AlreadyRegistered(typeof(TService))
? configurator
: configurator.Add(
typeof(TService),
factory,
InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
Expand Down Expand Up @@ -112,6 +179,26 @@ public static IDependencyConfigurator AddTransient(
InstanceLifetime.Transient);
}

/// <summary>
/// Registers a transient type mapping if the given <paramref name="serviceType" /> is not already registered
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="serviceType"><see cref="Type"/> that will be requested</param>
/// <param name="implementationType"><see cref="Type"/> that will actually be returned</param>
/// <returns></returns>
public static IDependencyConfigurator TryAddTransient(
this IDependencyConfigurator configurator,
Type serviceType,
Type implementationType)
{
return configurator.AlreadyRegistered(serviceType)
? configurator
: configurator.Add(
serviceType,
implementationType,
InstanceLifetime.Transient);
}

/// <summary>
/// Registers a transient type mapping
/// </summary>
Expand All @@ -124,6 +211,50 @@ public static IDependencyConfigurator AddTransient<TService>(this IDependencyCon
return configurator.Add<TService>(InstanceLifetime.Transient);
}

/// <summary>
/// Registers a transient type mapping if the given <typeparamref name="TService" /> is not already registered
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator TryAddTransient<TService>(this IDependencyConfigurator configurator)
where TService : class
{
return configurator.AlreadyRegistered(typeof(TService))
? configurator
: configurator.Add<TService>(InstanceLifetime.Transient);
}

/// <summary>
/// Registers a singleton type mapping
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on.</param>
/// <typeparam name="TService"><see cref="Type"/> that will be requested.</typeparam>
/// <typeparam name="TImplementation"><see cref="Type"/> that will actually be returned.</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddTransient<TService, TImplementation>(this IDependencyConfigurator configurator)
where TImplementation : class, TService
where TService : class
{
return configurator.Add<TService, TImplementation>(InstanceLifetime.Transient);
}

/// <summary>
/// Registers a singleton type mapping if the given <typeparamref name="TService" /> is not already registered
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on.</param>
/// <typeparam name="TService"><see cref="Type"/> that will be requested.</typeparam>
/// <typeparam name="TImplementation"><see cref="Type"/> that will actually be returned.</typeparam>
/// <returns></returns>
public static IDependencyConfigurator TryAddTransient<TService, TImplementation>(this IDependencyConfigurator configurator)
where TImplementation : class, TService
where TService : class
{
return configurator.AlreadyRegistered(typeof(TService))
? configurator
: configurator.Add<TService, TImplementation>(InstanceLifetime.Transient);
}

/// <summary>
/// Registers a transient type mapping where the returned instance will be given by the provided factory
/// </summary>
Expand All @@ -140,5 +271,38 @@ public static IDependencyConfigurator AddTransient<TService>(
factory,
InstanceLifetime.Transient);
}

/// <summary>
/// Registers a transient type mapping where the returned instance will be given by the provided factory if the given <typeparamref name="TService" /> is not already registered
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="factory">A factory to create new instances of the service implementation</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator TryAddTransient<TService>(
this IDependencyConfigurator configurator,
Func<IDependencyResolver, TService> factory)
{
return configurator.AlreadyRegistered(typeof(TService))
? configurator
: configurator.Add(
typeof(TService),
factory,
InstanceLifetime.Transient);
}

/// <summary>
/// Add a type mapping to the cluster dependency resolver
/// </summary>
/// <param name="cluster">Cluster configuration builder</param>
/// <param name="handler">A handler to set the configuration values</param>
/// <returns></returns>
public static IClusterConfigurationBuilder WithDependencies(
this IClusterConfigurationBuilder cluster,
Action<IDependencyConfigurator> handler)
{
handler(cluster.DependencyConfigurator);
return cluster;
}
}
}
7 changes: 7 additions & 0 deletions src/KafkaFlow.Abstractions/IDependencyConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,12 @@ IDependencyConfigurator Add<TImplementation>(
Type serviceType,
Func<IDependencyResolver, TImplementation> factory,
InstanceLifetime lifetime);

/// <summary>
/// Checks if the given <paramref name="registeredType" /> is already registered
/// </summary>
/// <param name="registeredType">Service type</param>
/// <returns></returns>
bool AlreadyRegistered(Type registeredType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ public IDependencyConfigurator Add<TImplementation>(
return this;
}

public bool AlreadyRegistered(Type registeredType)
{
foreach (var s in _services)
{
if (s.ServiceType == registeredType)
{
return true;
}
}

return false;
}

private static ServiceLifetime ParseLifetime(InstanceLifetime lifetime)
{
switch (lifetime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace KafkaFlow.Serializer.SchemaRegistry
{
internal class ConfluentAvroTypeNameResolver : ISchemaRegistryTypeNameResolver
internal sealed class ConfluentAvroTypeNameResolver : IConfluentAvroTypeNameResolver
{
private readonly ISchemaRegistryClient _client;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Confluent.SchemaRegistry;
using KafkaFlow.Configuration;
using KafkaFlow.Configuration;
using KafkaFlow.Middlewares.Serializer;
using KafkaFlow.Serializer.SchemaRegistry;

Expand All @@ -18,10 +17,12 @@ public static class ConsumerConfigurationBuilderExtensions
public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroDeserializer(
this IConsumerMiddlewareConfigurationBuilder middlewares)
{
middlewares.DependencyConfigurator.TryAddTransient<IConfluentAvroTypeNameResolver, ConfluentAvroTypeNameResolver>();

return middlewares.Add(
resolver => new DeserializerConsumerMiddleware(
new ConfluentAvroDeserializer(resolver),
new SchemaRegistryTypeResolver(new ConfluentAvroTypeNameResolver(resolver.Resolve<ISchemaRegistryClient>()))));
new SchemaRegistryTypeResolver(resolver.Resolve<IConfluentAvroTypeNameResolver>())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace KafkaFlow.Serializer.SchemaRegistry
{
/// <inheritdoc />
public interface IConfluentAvroTypeNameResolver : ISchemaRegistryTypeNameResolver
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ public static IProducerMiddlewareConfigurationBuilder AddSchemaRegistryAvroSeria
this IProducerMiddlewareConfigurationBuilder middlewares,
AvroSerializerConfig config = null)
{
middlewares.DependencyConfigurator.TryAddTransient<IConfluentAvroTypeNameResolver, ConfluentAvroTypeNameResolver>();

return middlewares.Add(
resolver => new SerializerProducerMiddleware(
new ConfluentAvroSerializer(resolver, config),
new SchemaRegistryTypeResolver(new ConfluentAvroTypeNameResolver(resolver.Resolve<ISchemaRegistryClient>()))));
new SchemaRegistryTypeResolver(resolver.Resolve<IConfluentAvroTypeNameResolver>())));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
using Google.Protobuf;
using Google.Protobuf.Reflection;

namespace KafkaFlow
namespace KafkaFlow.Serializer.SchemaRegistry
{
internal class ConfluentProtobufTypeNameResolver : ISchemaRegistryTypeNameResolver
internal class ConfluentProtobufTypeNameResolver : IConfluentProtobufTypeNameResolver
{
private readonly ISchemaRegistryClient _client;

Expand All @@ -21,7 +21,18 @@ public async Task<string> ResolveAsync(int id)

var protoFields = FileDescriptorProto.Parser.ParseFrom(ByteString.FromBase64(schemaString));

return $"{protoFields.Package}.{protoFields.MessageType.FirstOrDefault()?.Name}";
return BuildTypeName(protoFields);
}

private static string BuildTypeName(FileDescriptorProto protoFields)
{
var package = protoFields.Package;
if (string.IsNullOrEmpty(package))
{
package = protoFields.Options.CsharpNamespace;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options might be null

}

return $"{package}.{protoFields.MessageType.FirstOrDefault()?.Name}";
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Confluent.SchemaRegistry;
using KafkaFlow.Configuration;
using KafkaFlow.Configuration;
using KafkaFlow.Middlewares.Serializer;
using KafkaFlow.Serializer.SchemaRegistry;

Expand All @@ -18,10 +17,12 @@ public static class ConsumerConfigurationBuilderExtensions
public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufDeserializer(
this IConsumerMiddlewareConfigurationBuilder middlewares)
{
middlewares.DependencyConfigurator.TryAddTransient<IConfluentProtobufTypeNameResolver, ConfluentProtobufTypeNameResolver>();

return middlewares.Add(
resolver => new DeserializerConsumerMiddleware(
new ConfluentProtobufDeserializer(),
new SchemaRegistryTypeResolver(new ConfluentProtobufTypeNameResolver(resolver.Resolve<ISchemaRegistryClient>()))));
new SchemaRegistryTypeResolver(resolver.Resolve<IConfluentProtobufTypeNameResolver>())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace KafkaFlow.Serializer.SchemaRegistry
{
/// <inheritdoc />
public interface IConfluentProtobufTypeNameResolver : ISchemaRegistryTypeNameResolver
{
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using Confluent.SchemaRegistry.Serdes;
using KafkaFlow.Configuration;
using KafkaFlow.Middlewares.Serializer;
using KafkaFlow.Serializer.SchemaRegistry;
Expand All @@ -21,10 +20,12 @@ public static IProducerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufS
this IProducerMiddlewareConfigurationBuilder middlewares,
ProtobufSerializerConfig config = null)
{
middlewares.DependencyConfigurator.TryAddTransient<IConfluentProtobufTypeNameResolver, ConfluentProtobufTypeNameResolver>();

return middlewares.Add(
resolver => new SerializerProducerMiddleware(
new ConfluentProtobufSerializer(resolver, config),
new SchemaRegistryTypeResolver(new ConfluentProtobufTypeNameResolver(resolver.Resolve<ISchemaRegistryClient>()))));
new SchemaRegistryTypeResolver(resolver.Resolve<IConfluentProtobufTypeNameResolver>())));
}
}
}
Loading