Skip to content
This repository was archived by the owner on May 7, 2020. It is now read-only.

Commit

Permalink
rabbitmq refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed Oct 16, 2017
1 parent 1b7ca4b commit efff0df
Showing 1 changed file with 65 additions and 44 deletions.
109 changes: 65 additions & 44 deletions src/Stove.RabbitMQ/RabbitMQ/StoveRabbitMQRegistrationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Stove.RabbitMQ
public static class StoveRabbitMQRegistrationExtensions
{
/// <summary>
/// Uses the stove rabbit mq.
/// Uses the Stove RabbitMq integration with producer and consumer.Loads consumers from Ioc.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="rabbitMQConfigurer">The rabbit mq configurer.</param>
Expand All @@ -28,7 +28,7 @@ public static class StoveRabbitMQRegistrationExtensions
public static IIocBuilder UseStoveRabbitMQ(
[NotNull] this IIocBuilder builder,
[NotNull] Func<IStoveRabbitMQConfiguration, IStoveRabbitMQConfiguration> rabbitMQConfigurer,
Action<IRabbitMqBusFactoryConfigurator> busConfigurer = null)
Action<IRabbitMqBusFactoryConfigurator> busConfigurer)
{
Check.NotNull(rabbitMQConfigurer, nameof(rabbitMQConfigurer));

Expand All @@ -49,26 +49,7 @@ public static IIocBuilder UseStoveRabbitMQ(

IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(new Uri(configuration.HostAddress), h =>
{
h.Username(configuration.Username);
h.Password(configuration.Password);
});

if (configuration.UseRetryMechanism)
{
cfg.UseRetry(rtryConf => { rtryConf.Immediate(configuration.MaxRetryCount); });
}

if (configuration.PrefetchCount.HasValue)
{
cfg.PrefetchCount = (ushort)configuration.PrefetchCount;
}

if (configuration.ConcurrencyLimit.HasValue)
{
cfg.UseConcurrencyLimit(configuration.ConcurrencyLimit.Value);
}
IRabbitMqHost host = EnsureStoveDefaults(cfg, configuration);

cfg.ReceiveEndpoint(host, configuration.QueueName, ec => { ec.LoadFrom(ctx); });

Expand All @@ -85,12 +66,54 @@ public static IIocBuilder UseStoveRabbitMQ(
}

/// <summary>
/// Uses the stove rabbit mq. Consumer loading doesn't come from IoC, you should register explicitly with <see cref="consumerConfigurer"/>
/// Uses the Stove RabbitMQ integration as just publisher.
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="rabbitMQConfigurer">The rabbit mq configurer.</param>
/// <returns></returns>
[NotNull]
public static IIocBuilder UseStoveRabbitMQ(
[NotNull] this IIocBuilder builder,
[NotNull] Func<IStoveRabbitMQConfiguration, IStoveRabbitMQConfiguration> rabbitMQConfigurer)
{
Check.NotNull(rabbitMQConfigurer, nameof(rabbitMQConfigurer));

builder
.RegisterServices(r =>
{
r.RegisterAssemblyByConvention(typeof(StoveRabbitMQRegistrationExtensions).GetAssembly());
r.Register<IStoveRabbitMQConfiguration, StoveRabbitMQConfiguration>(Lifetime.Singleton);
r.Register<IMessageBus, StoveRabbitMQMessageBus>();
r.Register(ctx => rabbitMQConfigurer);
});

builder.RegisterServices(r => r.UseBuilder(cb =>
{
cb.Register(ctx =>
{
var configuration = ctx.Resolve<IStoveRabbitMQConfiguration>();

IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
EnsureStoveDefaults(cfg, configuration);
});

return busControl;
}).SingleInstance()
.As<IBusControl>()
.As<IBus>();
}));

return builder;
}

/// <summary>
/// Uses the Stove RabbitMQ integration. Consumer loadings doesn't come from IoC, you should register explicitly with
/// <see cref="consumerConfigurer" />
/// </summary>
/// <param name="builder">The builder.</param>
/// <param name="rabbitMQConfigurer">The rabbit mq configurer.</param>
/// <param name="consumerConfigurer"></param>
/// <param name="busConfigurer"></param>
/// <returns></returns>
[NotNull]
public static IIocBuilder UseStoveRabbitMQ(
Expand Down Expand Up @@ -119,26 +142,7 @@ Action<IRabbitMqHost, IRabbitMqBusFactoryConfigurator, IComponentContext> consum

IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(new Uri(configuration.HostAddress), h =>
{
h.Username(configuration.Username);
h.Password(configuration.Password);
});

if (configuration.UseRetryMechanism)
{
cfg.UseRetry(rtryConf => { rtryConf.Immediate(configuration.MaxRetryCount); });
}

if (configuration.PrefetchCount.HasValue)
{
cfg.PrefetchCount = (ushort)configuration.PrefetchCount;
}

if (configuration.ConcurrencyLimit.HasValue)
{
cfg.UseConcurrencyLimit(configuration.ConcurrencyLimit.Value);
}
IRabbitMqHost host = EnsureStoveDefaults(cfg, configuration);

consumerConfigurer(host, cfg, ctx);
});
Expand All @@ -151,5 +155,22 @@ Action<IRabbitMqHost, IRabbitMqBusFactoryConfigurator, IComponentContext> consum

return builder;
}

private static IRabbitMqHost EnsureStoveDefaults(IRabbitMqBusFactoryConfigurator cfg, IStoveRabbitMQConfiguration configuration)
{
IRabbitMqHost host = cfg.Host(new Uri(configuration.HostAddress), h =>
{
h.Username(configuration.Username);
h.Password(configuration.Password);
});

if (configuration.UseRetryMechanism) cfg.UseRetry(rtryConf => { rtryConf.Immediate(configuration.MaxRetryCount); });

if (configuration.PrefetchCount.HasValue) cfg.PrefetchCount = (ushort)configuration.PrefetchCount;

if (configuration.ConcurrencyLimit.HasValue) cfg.UseConcurrencyLimit(configuration.ConcurrencyLimit.Value);

return host;
}
}
}

0 comments on commit efff0df

Please sign in to comment.