Skip to content

Commit

Permalink
Added RoundRobin/Broadcast send scopes
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonAquitaine committed Oct 12, 2023
1 parent 2885c17 commit 5aba9c3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 23 deletions.
6 changes: 3 additions & 3 deletions Samples/Sample05-Docker/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MongoDB.Bson;
using Zen.MessageQueue.Shared;
using Zen.Web.Host;

namespace Sample05_Docker
Expand All @@ -7,13 +8,12 @@ public class Program
{
public static void Main(string[] args)
{
Zen.MessageQueue.Queue.RegisterType("string");
Zen.MessageQueue.Queue.Send("string");
Zen.MessageQueue.Queue.RegisterType("string", true);
Zen.MessageQueue.Queue.Receive += (model) =>
{
Zen.Base.Log.Add(model.ToJson());
};

Zen.MessageQueue.Queue.Send("string", EDistributionStyle.Broadcast);

Builder.Start<Startup>(args);

Expand Down
32 changes: 23 additions & 9 deletions Zen.MessageQueue/Queue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,47 @@ public static class Queue
{
private static readonly IMessageQueueBundle DefaultBundle = (IMessageQueueBundle)IoC.GetClassesByInterface<IMessageQueueBundle>(false)?.First()?.CreateInstance();

private static readonly Dictionary<Type, object> _cache = new Dictionary<Type, object>();
private static readonly Dictionary<Type, object> _cache = new();

public static event MessageReceivedHandler Receive;

public static void RegisterType<T>(this T targetObject, bool subscribe = false)
{
var adapter = GetAdapter(targetObject);

if (subscribe)
{
adapter.Receive += (item) =>
{
Receive?.Invoke(item);
};

adapter.Subscribe();
}
}

public static void RegisterType<T>(this T targetObject)
public static MessageQueuePrimitive<T> GetAdapter<T>(this T targetObject, bool subscribe = false)
{

if (DefaultBundle == null)
throw new InvalidOperationException("No Message Queue adapter specificed. Try adding a Zen.Module.MQ.* reference.");

var type = typeof(T);

if (_cache.ContainsKey(type)) return;
if (_cache.ContainsKey(type)) return (MessageQueuePrimitive<T>)_cache[typeof(T)];

MessageQueuePrimitive<T> adapter = DefaultBundle.AdapterType.CreateGenericInstance<T, MessageQueuePrimitive<T>>();

Base.Log.KeyValuePair("Zen.MessageQueue", $"{typeof(T).Name} adapter - {adapter.GetType().Namespace}", Base.Module.Log.Message.EContentType.Info);

_cache[typeof(T)] = adapter;

adapter.Receive += (item) =>
{
Receive?.Invoke(item);
};
return adapter;
}

public static void Send<T>(this T targetObject)
public static void Send<T>(this T targetObject, EDistributionStyle distributionStyle = EDistributionStyle.Broadcast)
{
((MessageQueuePrimitive<T>)_cache[typeof(T)]).Send(targetObject);
GetAdapter(targetObject).Send(targetObject, distributionStyle);
}
}
}
9 changes: 8 additions & 1 deletion Zen.MessageQueue/Shared/MessageQueuePrimitive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@
{
public delegate void MessageReceivedHandler<T>(T item);

public enum EDistributionStyle
{
RoundRobin,
Broadcast
}
public abstract class MessageQueuePrimitive<T>
{
public virtual void Send(T item) { }

public virtual void Send(T item, EDistributionStyle distributionStyle = EDistributionStyle.Broadcast) { }
public virtual void Subscribe() { }
public virtual event MessageReceivedHandler<T> Receive;

}
Expand Down
36 changes: 26 additions & 10 deletions Zen.Module.MQ.RabbitMQ/RabbitMQAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ public class RabbitMQAdapter<T> : MessageQueuePrimitive<T>
private readonly string _queueName;
private readonly List<string> _categories;

private static string _broadcastExchange = "broadcast";
private static string _roundRobinExchange = "roundRobin";


public RabbitMQAdapter()
{
_options = new Configuration.Options().GetSettings<Configuration.IOptions, Configuration.Options>("MessageQueue:RabbitMQ");
Expand All @@ -27,9 +31,31 @@ public RabbitMQAdapter()
_categories = typeof(T).GetParentTypes().Select(i => i.Name).ToList();
_queueName = typeof(T).FullName;

_channel.ExchangeDeclare(_roundRobinExchange, ExchangeType.Direct, true, false);
_channel.ExchangeDeclare(_broadcastExchange, ExchangeType.Fanout, true, false);

_channel.QueueDeclare(_queueName, durable: _options.Durable, exclusive: _options.Exclusive, autoDelete: _options.AutoDelete);

_channel.QueueBind(queue: _queueName, exchange: _broadcastExchange, routingKey: _queueName);
_channel.QueueBind(queue: _queueName, exchange: _roundRobinExchange, routingKey: _queueName);

}

public override event MessageReceivedHandler<T> Receive;

public override void Send(T item, EDistributionStyle distributionStyle = EDistributionStyle.Broadcast)
{
var payload = item.ToJson();
var body = Encoding.UTF8.GetBytes(payload);

var dist = distributionStyle == EDistributionStyle.RoundRobin ? _roundRobinExchange : _broadcastExchange;

_channel.BasicPublish(exchange: dist, routingKey: _queueName, body: body);
//_channel.BasicPublish(exchange: string.Empty, routingKey: _queueName, body: body);
}

public override void Subscribe()
{
var consumer = new EventingBasicConsumer(_channel);

consumer.Received += (model, ea) =>
Expand All @@ -42,15 +68,5 @@ public RabbitMQAdapter()

_channel.BasicConsume(queue: _queueName, autoAck: true, consumer: consumer);
}

public override event MessageReceivedHandler<T> Receive;

public override void Send(T item)
{
var payload = item.ToJson();
var body = Encoding.UTF8.GetBytes(payload);

_channel.BasicPublish(exchange: string.Empty, routingKey: _queueName, basicProperties: null, body: body);
}
}
}

0 comments on commit 5aba9c3

Please sign in to comment.