From 5aba9c3504f4b21f4c976235e71df08da30dd8a6 Mon Sep 17 00:00:00 2001 From: Leon Aquitaine Date: Thu, 12 Oct 2023 15:21:12 -0400 Subject: [PATCH] Added RoundRobin/Broadcast send scopes --- Samples/Sample05-Docker/Program.cs | 6 ++-- Zen.MessageQueue/Queue.cs | 32 ++++++++++++----- .../Shared/MessageQueuePrimitive.cs | 9 ++++- Zen.Module.MQ.RabbitMQ/RabbitMQAdapter.cs | 36 +++++++++++++------ 4 files changed, 60 insertions(+), 23 deletions(-) diff --git a/Samples/Sample05-Docker/Program.cs b/Samples/Sample05-Docker/Program.cs index 6a90fb7..f4fc4da 100644 --- a/Samples/Sample05-Docker/Program.cs +++ b/Samples/Sample05-Docker/Program.cs @@ -1,4 +1,5 @@ using MongoDB.Bson; +using Zen.MessageQueue.Shared; using Zen.Web.Host; namespace Sample05_Docker @@ -7,13 +8,12 @@ public class Program { public static void Main(string[] args) { - Zen.MessageQueue.Queue.RegisterType("string"); - Zen.MessageQueue.Queue.Send("string"); + Zen.MessageQueue.Queue.RegisterType("string", true); Zen.MessageQueue.Queue.Receive += (model) => { Zen.Base.Log.Add(model.ToJson()); }; - + Zen.MessageQueue.Queue.Send("string", EDistributionStyle.Broadcast); Builder.Start(args); diff --git a/Zen.MessageQueue/Queue.cs b/Zen.MessageQueue/Queue.cs index 7b42d14..d340a5a 100644 --- a/Zen.MessageQueue/Queue.cs +++ b/Zen.MessageQueue/Queue.cs @@ -14,12 +14,26 @@ public static class Queue { private static readonly IMessageQueueBundle DefaultBundle = (IMessageQueueBundle)IoC.GetClassesByInterface(false)?.First()?.CreateInstance(); - private static readonly Dictionary _cache = new Dictionary(); + private static readonly Dictionary _cache = new(); public static event MessageReceivedHandler Receive; + public static void RegisterType(this T targetObject, bool subscribe = false) + { + var adapter = GetAdapter(targetObject); + + if (subscribe) + { + adapter.Receive += (item) => + { + Receive?.Invoke(item); + }; + + adapter.Subscribe(); + } + } - public static void RegisterType(this T targetObject) + public static MessageQueuePrimitive GetAdapter(this T targetObject, bool subscribe = false) { if (DefaultBundle == null) @@ -27,20 +41,20 @@ public static void RegisterType(this T targetObject) var type = typeof(T); - if (_cache.ContainsKey(type)) return; + if (_cache.ContainsKey(type)) return (MessageQueuePrimitive)_cache[typeof(T)]; + MessageQueuePrimitive adapter = DefaultBundle.AdapterType.CreateGenericInstance>(); + Base.Log.KeyValuePair("Zen.MessageQueue", $"{typeof(T).Name} adapter - {adapter.GetType().Namespace}", Base.Module.Log.Message.EContentType.Info); + _cache[typeof(T)] = adapter; - adapter.Receive += (item) => - { - Receive?.Invoke(item); - }; + return adapter; } - public static void Send(this T targetObject) + public static void Send(this T targetObject, EDistributionStyle distributionStyle = EDistributionStyle.Broadcast) { - ((MessageQueuePrimitive)_cache[typeof(T)]).Send(targetObject); + GetAdapter(targetObject).Send(targetObject, distributionStyle); } } } diff --git a/Zen.MessageQueue/Shared/MessageQueuePrimitive.cs b/Zen.MessageQueue/Shared/MessageQueuePrimitive.cs index 77f0129..c88c752 100644 --- a/Zen.MessageQueue/Shared/MessageQueuePrimitive.cs +++ b/Zen.MessageQueue/Shared/MessageQueuePrimitive.cs @@ -2,9 +2,16 @@ { public delegate void MessageReceivedHandler(T item); + public enum EDistributionStyle + { + RoundRobin, + Broadcast + } public abstract class MessageQueuePrimitive { - public virtual void Send(T item) { } + + public virtual void Send(T item, EDistributionStyle distributionStyle = EDistributionStyle.Broadcast) { } + public virtual void Subscribe() { } public virtual event MessageReceivedHandler Receive; } diff --git a/Zen.Module.MQ.RabbitMQ/RabbitMQAdapter.cs b/Zen.Module.MQ.RabbitMQ/RabbitMQAdapter.cs index 372959e..d01307e 100644 --- a/Zen.Module.MQ.RabbitMQ/RabbitMQAdapter.cs +++ b/Zen.Module.MQ.RabbitMQ/RabbitMQAdapter.cs @@ -16,6 +16,10 @@ public class RabbitMQAdapter : MessageQueuePrimitive private readonly string _queueName; private readonly List _categories; + private static string _broadcastExchange = "broadcast"; + private static string _roundRobinExchange = "roundRobin"; + + public RabbitMQAdapter() { _options = new Configuration.Options().GetSettings("MessageQueue:RabbitMQ"); @@ -27,9 +31,31 @@ public RabbitMQAdapter() _categories = typeof(T).GetParentTypes().Select(i => i.Name).ToList(); _queueName = typeof(T).FullName; + _channel.ExchangeDeclare(_roundRobinExchange, ExchangeType.Direct, true, false); + _channel.ExchangeDeclare(_broadcastExchange, ExchangeType.Fanout, true, false); _channel.QueueDeclare(_queueName, durable: _options.Durable, exclusive: _options.Exclusive, autoDelete: _options.AutoDelete); + _channel.QueueBind(queue: _queueName, exchange: _broadcastExchange, routingKey: _queueName); + _channel.QueueBind(queue: _queueName, exchange: _roundRobinExchange, routingKey: _queueName); + + } + + public override event MessageReceivedHandler Receive; + + public override void Send(T item, EDistributionStyle distributionStyle = EDistributionStyle.Broadcast) + { + var payload = item.ToJson(); + var body = Encoding.UTF8.GetBytes(payload); + + var dist = distributionStyle == EDistributionStyle.RoundRobin ? _roundRobinExchange : _broadcastExchange; + + _channel.BasicPublish(exchange: dist, routingKey: _queueName, body: body); + //_channel.BasicPublish(exchange: string.Empty, routingKey: _queueName, body: body); + } + + public override void Subscribe() + { var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => @@ -42,15 +68,5 @@ public RabbitMQAdapter() _channel.BasicConsume(queue: _queueName, autoAck: true, consumer: consumer); } - - public override event MessageReceivedHandler Receive; - - public override void Send(T item) - { - var payload = item.ToJson(); - var body = Encoding.UTF8.GetBytes(payload); - - _channel.BasicPublish(exchange: string.Empty, routingKey: _queueName, basicProperties: null, body: body); - } } } \ No newline at end of file