diff --git a/.idea/.idea.Foundatio.RabbitMQ/.idea/indexLayout.xml b/.idea/.idea.Foundatio.RabbitMQ/.idea/indexLayout.xml index 27ba142..7b08163 100644 --- a/.idea/.idea.Foundatio.RabbitMQ/.idea/indexLayout.xml +++ b/.idea/.idea.Foundatio.RabbitMQ/.idea/indexLayout.xml @@ -1,6 +1,6 @@ - + diff --git a/Foundatio.RabbitMQ.sln b/Foundatio.RabbitMQ.sln index 7b1d511..6d38288 100644 --- a/Foundatio.RabbitMQ.sln +++ b/Foundatio.RabbitMQ.sln @@ -12,6 +12,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution README.md = README.md docker-compose.yml = docker-compose.yml tests\Directory.Build.props = tests\Directory.Build.props + build\Dockerfile = build\Dockerfile EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Foundatio.RabbitMQ", "src\Foundatio.RabbitMQ\Foundatio.RabbitMQ.csproj", "{EAE3607D-73A1-4D02-BDAA-24A37DDA15CB}" diff --git a/build/Dockerfile b/build/Dockerfile index f1d3113..e7557aa 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -1,4 +1,4 @@ -FROM rabbitmq:3.10-management +FROM rabbitmq:4.0.3-management -COPY rabbitmq_delayed_message_exchange-3.10.0.ez /opt/rabbitmq/plugins +COPY rabbitmq_delayed_message_exchange-4.0.2.ez /opt/rabbitmq/plugins RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange diff --git a/build/common.props b/build/common.props index dbc0634..59ea214 100644 --- a/build/common.props +++ b/build/common.props @@ -1,7 +1,7 @@ - netstandard2.0 + netstandard2.1 Foundatio Pluggable foundation blocks for building distributed apps. https://github.com/FoundatioFx/Foundatio.RabbitMQ diff --git a/build/rabbitmq_delayed_message_exchange-3.10.0.ez b/build/rabbitmq_delayed_message_exchange-3.10.0.ez deleted file mode 100644 index 4f7d6a7..0000000 Binary files a/build/rabbitmq_delayed_message_exchange-3.10.0.ez and /dev/null differ diff --git a/build/rabbitmq_delayed_message_exchange-4.0.2.ez b/build/rabbitmq_delayed_message_exchange-4.0.2.ez new file mode 100644 index 0000000..c6dbbdf Binary files /dev/null and b/build/rabbitmq_delayed_message_exchange-4.0.2.ez differ diff --git a/docker-compose.yml b/docker-compose.yml index ae8179c..a259cf2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: rabbitmq: - image: rabbitmq:3.10-management + image: rabbitmq:4.0.3-management ports: - "5672:5672" - "8080:15672" # management ui - login with guest:guest diff --git a/src/Foundatio.RabbitMQ/Extensions/TaskExtensions.cs b/src/Foundatio.RabbitMQ/Extensions/TaskExtensions.cs index f0fd963..deba7e2 100644 --- a/src/Foundatio.RabbitMQ/Extensions/TaskExtensions.cs +++ b/src/Foundatio.RabbitMQ/Extensions/TaskExtensions.cs @@ -25,4 +25,10 @@ public static ConfiguredTaskAwaitable AnyContext(this Awaitabl { return task.ConfigureAwait(continueOnCapturedContext: false); } + + [DebuggerStepThrough] + public static ConfiguredValueTaskAwaitable AnyContext(this ValueTask task) + { + return task.ConfigureAwait(continueOnCapturedContext: false); + } } diff --git a/src/Foundatio.RabbitMQ/Foundatio.RabbitMQ.csproj b/src/Foundatio.RabbitMQ/Foundatio.RabbitMQ.csproj index 0d4c794..06e5cab 100644 --- a/src/Foundatio.RabbitMQ/Foundatio.RabbitMQ.csproj +++ b/src/Foundatio.RabbitMQ/Foundatio.RabbitMQ.csproj @@ -1,6 +1,6 @@ - + diff --git a/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs b/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs index e17a0df..a2b1349 100644 --- a/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs +++ b/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs @@ -13,15 +13,16 @@ namespace Foundatio.Messaging; -public class RabbitMQMessageBus : MessageBusBase +public class RabbitMQMessageBus : MessageBusBase, IAsyncDisposable { private readonly AsyncLock _lock = new(); private readonly ConnectionFactory _factory; private IConnection _publisherConnection; private IConnection _subscriberConnection; - private IModel _publisherModel; - private IModel _subscriberModel; + private IChannel _publisherChannel; + private IChannel _subscriberChannel; private bool? _delayedExchangePluginEnabled; + private bool _isDisposed; public RabbitMQMessageBus(RabbitMQMessageBusOptions options) : base(options) { @@ -32,12 +33,11 @@ public RabbitMQMessageBus(RabbitMQMessageBusOptions options) : base(options) // in case the server is restarted or there has been any network failures // Topology ( queues, exchanges, bindings and consumers) recovery "TopologyRecoveryEnabled" is already enabled // by default so no need to initialize it. NetworkRecoveryInterval is also by default set to 5 seconds. - // it can always be fine tuned if needed. + // it can always be fine-tuned if needed. _factory = new ConnectionFactory { Uri = new Uri(options.ConnectionString), - AutomaticRecoveryEnabled = true, - DispatchConsumersAsync = true + AutomaticRecoveryEnabled = true }; } @@ -48,58 +48,54 @@ protected override Task RemoveTopicSubscriptionAsync() { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("RemoveTopicSubscriptionAsync"); - CloseSubscriberConnection(); - return Task.CompletedTask; + + return CloseSubscriberConnectionAsync(); } protected override async Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) { - if (_subscriberModel != null) + if (_subscriberChannel != null) return; await EnsureTopicCreatedAsync(cancellationToken).AnyContext(); using (await _lock.LockAsync().AnyContext()) { - if (_subscriberModel != null) + if (_subscriberChannel != null) return; - _subscriberConnection = CreateConnection(); - _subscriberModel = _subscriberConnection.CreateModel(); + _subscriberConnection = await CreateConnectionAsync().AnyContext(); + _subscriberChannel = await _subscriberConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext(); - // If InitPublisher is called first, then we will never come in this if clause. - if (!CreateDelayedExchange(_subscriberModel)) + // If InitPublisher is called first, then we will never come in this if-clause. + if (!await CreateDelayedExchangeAsync(_subscriberChannel).AnyContext()) { - _subscriberModel.Close(); - _subscriberModel.Abort(); - _subscriberModel.Dispose(); - - _subscriberConnection.Close(); - _subscriberConnection.Dispose(); + await _subscriberChannel.DisposeAsync().AnyContext(); + await _subscriberConnection.DisposeAsync().AnyContext(); - _subscriberConnection = CreateConnection(); - _subscriberModel = _subscriberConnection.CreateModel(); - CreateRegularExchange(_subscriberModel); + _subscriberConnection = await CreateConnectionAsync().AnyContext(); + _subscriberChannel = await _subscriberConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext(); + await CreateRegularExchangeAsync(_subscriberChannel).AnyContext(); } - string queueName = CreateQueue(_subscriberModel); - var consumer = new AsyncEventingBasicConsumer(_subscriberModel); - consumer.Received += OnMessage; - consumer.Shutdown += OnConsumerShutdown; + string queueName = await CreateQueueAsync(_subscriberChannel).AnyContext(); + var consumer = new AsyncEventingBasicConsumer(_subscriberChannel); + consumer.ReceivedAsync += OnMessageAsync; + consumer.ShutdownAsync += OnConsumerShutdownAsync; - _subscriberModel.BasicConsume(queueName, _options.AcknowledgementStrategy == AcknowledgementStrategy.FireAndForget, consumer); + await _subscriberChannel.BasicConsumeAsync(queueName, _options.AcknowledgementStrategy == AcknowledgementStrategy.FireAndForget, consumer, cancellationToken: cancellationToken).AnyContext(); if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("The unique channel number for the subscriber is : {ChannelNumber}", _subscriberModel.ChannelNumber); + _logger.LogTrace("The unique channel number for the subscriber is : {ChannelNumber}", _subscriberChannel.ChannelNumber); } } - private Task OnConsumerShutdown(object sender, ShutdownEventArgs e) + private Task OnConsumerShutdownAsync(object sender, ShutdownEventArgs e) { _logger.LogInformation("Consumer shutdown. Reply Code: {ReplyCode} Reason: {ReplyText}", e.ReplyCode, e.ReplyText); return Task.CompletedTask; } - private async Task OnMessage(object sender, BasicDeliverEventArgs envelope) + private async Task OnMessageAsync(object sender, BasicDeliverEventArgs envelope) { if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("OnMessageAsync({MessageId})", envelope.BasicProperties?.MessageId); @@ -110,7 +106,7 @@ private async Task OnMessage(object sender, BasicDeliverEventArgs envelope) _logger.LogTrace("No subscribers ({MessageId})", envelope.BasicProperties?.MessageId); if (_options.AcknowledgementStrategy == AcknowledgementStrategy.Automatic) - _subscriberModel.BasicReject(envelope.DeliveryTag, true); + await _subscriberChannel.BasicRejectAsync(envelope.DeliveryTag, true).AnyContext(); return; } @@ -121,14 +117,14 @@ private async Task OnMessage(object sender, BasicDeliverEventArgs envelope) await SendMessageToSubscribersAsync(message).AnyContext(); if (_options.AcknowledgementStrategy == AcknowledgementStrategy.Automatic) - _subscriberModel.BasicAck(envelope.DeliveryTag, false); + await _subscriberChannel.BasicAckAsync(envelope.DeliveryTag, false).AnyContext(); } catch (Exception ex) { _logger.LogError(ex, "Error handling message ({MessageId}): {Message}", envelope.BasicProperties?.MessageId, ex.Message); if (_options.AcknowledgementStrategy == AcknowledgementStrategy.Automatic) - _subscriberModel.BasicReject(envelope.DeliveryTag, true); + await _subscriberChannel.BasicRejectAsync(envelope.DeliveryTag, true).AnyContext(); } } @@ -161,43 +157,39 @@ protected virtual IMessage ConvertToMessage(BasicDeliverEventArgs envelope) protected override async Task EnsureTopicCreatedAsync(CancellationToken cancellationToken) { - if (_publisherModel != null) + if (_publisherChannel != null) return; using (await _lock.LockAsync().AnyContext()) { - if (_publisherModel != null) + if (_publisherChannel != null) return; // Create the client connection, channel, declares the exchange, queue and binds // the exchange with the publisher queue. It requires the name of our exchange, exchange type, durability and auto-delete. - // For now we are using same autoDelete for both exchange and queue ( it will survive a server restart ) - _publisherConnection = CreateConnection(); - _publisherModel = _publisherConnection.CreateModel(); + // For now, we are using same autoDelete for both exchange and queue ( it will survive a server restart ) + _publisherConnection = await CreateConnectionAsync().AnyContext(); + _publisherChannel = await _publisherConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext(); // We first attempt to create "x-delayed-type". For this plugin should be installed. - // However, we plugin is not installed this will throw an exception. In that case + // However, we plug in is not installed this will throw an exception. In that case // we attempt to create regular exchange. If regular exchange also throws and exception // then trouble shoot the problem. - if (!CreateDelayedExchange(_publisherModel)) + if (!await CreateDelayedExchangeAsync(_publisherChannel).AnyContext()) { // if the initial exchange creation was not successful then we must close the previous connection // and establish the new client connection and model otherwise you will keep receiving failure in creation // of the regular exchange too. - _publisherModel.Close(); - _publisherModel.Abort(); - _publisherModel.Dispose(); + await _publisherChannel.DisposeAsync().AnyContext(); + await _publisherConnection.DisposeAsync().AnyContext(); - _publisherConnection.Close(); - _publisherConnection.Dispose(); - - _publisherConnection = CreateConnection(); - _publisherModel = _publisherConnection.CreateModel(); - CreateRegularExchange(_publisherModel); + _publisherConnection = await CreateConnectionAsync().AnyContext(); + _publisherChannel = await _publisherConnection.CreateChannelAsync(cancellationToken: cancellationToken).AnyContext(); + await CreateRegularExchangeAsync(_publisherChannel).AnyContext(); } if (_logger.IsEnabled(LogLevel.Trace)) - _logger.LogTrace("The unique channel number for the publisher is : {ChannelNumber}", _publisherModel.ChannelNumber); + _logger.LogTrace("The unique channel number for the publisher is : {ChannelNumber}", _publisherChannel.ChannelNumber); } } @@ -212,7 +204,7 @@ protected override async Task EnsureTopicCreatedAsync(CancellationToken cancella /// The rule of thumb is: avoid sharing channels across threads. /// Publishers in your application that publish from separate threads should use their own channels. /// The same is a good idea for consumers. - protected override Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken) + protected override async Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken) { byte[] data = SerializeMessageBody(messageType, message); @@ -223,13 +215,16 @@ protected override Task PublishImplAsync(string messageType, object message, Mes if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Schedule delayed message: {MessageType} ({Delay}ms)", messageType, options.DeliveryDelay.Value.TotalMilliseconds); - return AddDelayedMessageAsync(mappedType, message, options.DeliveryDelay.Value); + await AddDelayedMessageAsync(mappedType, message, options.DeliveryDelay.Value).AnyContext(); + return; } - var basicProperties = _publisherModel.CreateBasicProperties(); - basicProperties.MessageId = options.UniqueId ?? Guid.NewGuid().ToString("N"); - basicProperties.CorrelationId = options.CorrelationId; - basicProperties.Type = messageType; + var basicProperties = new BasicProperties + { + MessageId = options.UniqueId ?? Guid.NewGuid().ToString("N"), + CorrelationId = options.CorrelationId, + Type = messageType, + }; if (_options.IsDurable) basicProperties.Persistent = true; @@ -246,7 +241,7 @@ protected override Task PublishImplAsync(string messageType, object message, Mes // RabbitMQ only supports delayed messages with a third party plugin called "rabbitmq_delayed_message_exchange" if (_delayedExchangePluginEnabled.Value && options.DeliveryDelay.HasValue && options.DeliveryDelay.Value > TimeSpan.Zero) { - // Its necessary to typecast long to int because RabbitMQ on the consumer side is reading the + // It's necessary to typecast long to int because RabbitMQ on the consumer side is reading the // data back as signed (using BinaryReader#ReadInt64). You will see the value to be negative // and the data will be delivered immediately. basicProperties.Headers = new Dictionary { { "x-delay", Convert.ToInt32(options.DeliveryDelay.Value.TotalMilliseconds) } }; @@ -258,30 +253,27 @@ protected override Task PublishImplAsync(string messageType, object message, Mes if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Message publish type {MessageType} {MessageId}", messageType, basicProperties.MessageId); } - // The publication occurs with mandatory=false - lock (_publisherModel) - _publisherModel.BasicPublish(_options.Topic, String.Empty, basicProperties, data); + using (await _lock.LockAsync().AnyContext()) + await _publisherChannel.BasicPublishAsync(_options.Topic, String.Empty, mandatory: false, basicProperties, data, cancellationToken: cancellationToken).AnyContext(); if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("Done publishing type {MessageType} {MessageId}", messageType, basicProperties.MessageId); - - return Task.CompletedTask; } /// /// Connect to a broker - RabbitMQ /// /// - private IConnection CreateConnection() + private Task CreateConnectionAsync() { - return _factory.CreateConnection(); + return _factory.CreateConnectionAsync(); } /// /// Attempts to create the delayed exchange. /// - /// + /// /// true if the delayed exchange was successfully declared. Which means plugin was installed. - private bool CreateDelayedExchange(IModel model) + private async Task CreateDelayedExchangeAsync(IChannel channel) { bool success = true; if (_delayedExchangePluginEnabled.HasValue) @@ -293,24 +285,21 @@ private bool CreateDelayedExchange(IModel model) // Disclaimer : https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ // Please read the *Performance Impact* of the delayed exchange type. var args = new Dictionary { { "x-delayed-type", ExchangeType.Fanout } }; - model.ExchangeDeclare(_options.Topic, "x-delayed-message", _options.IsDurable, false, args); + await channel.ExchangeDeclareAsync(_options.Topic, "x-delayed-message", _options.IsDurable, false, args).AnyContext(); } catch (OperationInterruptedException ex) { - if (ex.ShutdownReason.ReplyCode == 503) - { - if (_logger.IsEnabled(LogLevel.Information)) _logger.LogInformation(ex, "Not able to create x-delayed-type exchange"); - success = false; - } + _logger.LogInformation(ex, "Unable to create x-delayed-type exchange: {Message}", ex.Message); + success = false; } _delayedExchangePluginEnabled = success; return _delayedExchangePluginEnabled.Value; } - private void CreateRegularExchange(IModel model) + private Task CreateRegularExchangeAsync(IChannel channel) { - model.ExchangeDeclare(_options.Topic, ExchangeType.Fanout, _options.IsDurable, false, null); + return channel.ExchangeDeclareAsync(_options.Topic, ExchangeType.Fanout, _options.IsDurable, false); } /// @@ -319,17 +308,17 @@ private void CreateRegularExchange(IModel model) /// receiver attached which will process the message. We’ll initiate a dedicated message /// exchange and not use the default one. Note that a queue can be dedicated to one or more routing keys. /// - /// channel - private string CreateQueue(IModel model) + /// channel + private async Task CreateQueueAsync(IChannel channel) { - // Setup the queue where the messages will reside - it requires the queue name and durability. + // Set up the queue where the messages will reside - it requires the queue name and durability. // Durable (the queue will survive a broker restart) // Arguments (some brokers use it to implement additional features like message TTL) - var result = model.QueueDeclare(_options.SubscriptionQueueName, _options.IsDurable, _options.IsSubscriptionQueueExclusive, _options.SubscriptionQueueAutoDelete, _options.Arguments); + var result = await channel.QueueDeclareAsync(_options.SubscriptionQueueName, _options.IsDurable, _options.IsSubscriptionQueueExclusive, _options.SubscriptionQueueAutoDelete, _options.Arguments).AnyContext(); string queueName = result.QueueName; // bind the queue with the exchange. - model.QueueBind(queueName, _options.Topic, ""); + await channel.QueueBindAsync(queueName, _options.Topic, "").AnyContext(); return queueName; } @@ -338,11 +327,34 @@ public override void Dispose() { base.Dispose(); + if (_isDisposed) + return; + + _isDisposed = true; + if (_factory != null) _factory.AutomaticRecoveryEnabled = false; ClosePublisherConnection(); CloseSubscriberConnection(); + GC.SuppressFinalize(this); + } + + public async ValueTask DisposeAsync() + { + base.Dispose(); + + if (_isDisposed) + return; + + _isDisposed = true; + + if (_factory != null) + _factory.AutomaticRecoveryEnabled = false; + + await ClosePublisherConnectionAsync().AnyContext(); + await CloseSubscriberConnectionAsync().AnyContext(); + GC.SuppressFinalize(this); } private void ClosePublisherConnection() @@ -355,23 +367,44 @@ private void ClosePublisherConnection() if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("ClosePublisherConnection"); - if (_publisherModel != null) + if (_publisherChannel != null) { - _publisherModel.Close(); - _publisherModel.Abort(); - _publisherModel.Dispose(); - _publisherModel = null; + _publisherChannel.Dispose(); + _publisherChannel = null; } if (_publisherConnection != null) { - _publisherConnection.Close(); _publisherConnection.Dispose(); _publisherConnection = null; } } } + private async Task ClosePublisherConnectionAsync() + { + if (_publisherConnection == null) + return; + + using (await _lock.LockAsync().AnyContext()) + { + if (_logger.IsEnabled(LogLevel.Trace)) + _logger.LogTrace("ClosePublisherConnectionAsync"); + + if (_publisherChannel != null) + { + _publisherChannel.DisposeAsync().AnyContext(); + _publisherChannel = null; + } + + if (_publisherConnection != null) + { + await _publisherConnection.DisposeAsync().AnyContext(); + _publisherConnection = null; + } + } + } + private void CloseSubscriberConnection() { if (_subscriberConnection == null) @@ -382,20 +415,41 @@ private void CloseSubscriberConnection() if (_logger.IsEnabled(LogLevel.Trace)) _logger.LogTrace("CloseSubscriberConnection"); - if (_subscriberModel != null) + if (_subscriberChannel != null) { - _subscriberModel.Close(); - _subscriberModel.Abort(); - _subscriberModel.Dispose(); - _subscriberModel = null; + _subscriberChannel.Dispose(); + _subscriberChannel = null; } if (_subscriberConnection != null) { - _subscriberConnection.Close(); _subscriberConnection.Dispose(); _subscriberConnection = null; } } } + + private async Task CloseSubscriberConnectionAsync() + { + if (_subscriberConnection == null) + return; + + using (await _lock.LockAsync().AnyContext()) + { + if (_logger.IsEnabled(LogLevel.Trace)) + _logger.LogTrace("CloseSubscriberConnectionAsync"); + + if (_subscriberChannel != null) + { + await _subscriberChannel.DisposeAsync().AnyContext(); + _subscriberChannel = null; + } + + if (_subscriberConnection != null) + { + await _subscriberConnection.DisposeAsync().AnyContext(); + _subscriberConnection = null; + } + } + } } diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusDelayedExchangeTests.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusDelayedExchangeTests.cs new file mode 100644 index 0000000..2b5a674 --- /dev/null +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusDelayedExchangeTests.cs @@ -0,0 +1,8 @@ +using Xunit.Abstractions; + +namespace Foundatio.RabbitMQ.Tests.Messaging; + +public class RabbitMqMessageBusDelayedExchangeTests : RabbitMqMessageBusTestBase +{ + public RabbitMqMessageBusDelayedExchangeTests(ITestOutputHelper output) : base("amqp://localhost:5673", output) { } +} diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs new file mode 100644 index 0000000..1aa51ed --- /dev/null +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs @@ -0,0 +1,203 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.AsyncEx; +using Foundatio.Messaging; +using Foundatio.Tests.Extensions; +using Foundatio.Tests.Messaging; +using Microsoft.Extensions.Logging; +using Xunit; +using Xunit.Abstractions; + +namespace Foundatio.RabbitMQ.Tests.Messaging; + +public abstract class RabbitMqMessageBusTestBase(string connectionString, ITestOutputHelper output) : MessageBusTestBase(output) +{ + private readonly string _topic = $"test_topic_{DateTime.UtcNow.Ticks}"; + + protected override IMessageBus GetMessageBus(Func config = null) + { + return new RabbitMQMessageBus(o => + { + o.ConnectionString(connectionString); + o.LoggerFactory(Log); + + config?.Invoke(o.Target); + + return o; + }); + } + + [Fact] + public override Task CanUseMessageOptionsAsync() + { + return base.CanUseMessageOptionsAsync(); + } + + [Fact] + public override Task CanSendMessageAsync() + { + return base.CanSendMessageAsync(); + } + + [Fact] + public override Task CanHandleNullMessageAsync() + { + return base.CanHandleNullMessageAsync(); + } + + [Fact] + public override Task CanSendDerivedMessageAsync() + { + return base.CanSendDerivedMessageAsync(); + } + + [Fact] + public override Task CanSendMappedMessageAsync() + { + return base.CanSendMappedMessageAsync(); + } + + [Fact] + public override Task CanSendDelayedMessageAsync() + { + return base.CanSendDelayedMessageAsync(); + } + + [Fact] + public override Task CanSubscribeConcurrentlyAsync() + { + return base.CanSubscribeConcurrentlyAsync(); + } + + [Fact] + public override Task CanReceiveMessagesConcurrentlyAsync() + { + return base.CanReceiveMessagesConcurrentlyAsync(); + } + + [Fact] + public override Task CanSendMessageToMultipleSubscribersAsync() + { + return base.CanSendMessageToMultipleSubscribersAsync(); + } + + [Fact] + public override Task CanTolerateSubscriberFailureAsync() + { + return base.CanTolerateSubscriberFailureAsync(); + } + + [Fact] + public override Task WillOnlyReceiveSubscribedMessageTypeAsync() + { + return base.WillOnlyReceiveSubscribedMessageTypeAsync(); + } + + [Fact] + public override Task WillReceiveDerivedMessageTypesAsync() + { + return base.WillReceiveDerivedMessageTypesAsync(); + } + + [Fact] + public override Task CanSubscribeToAllMessageTypesAsync() + { + return base.CanSubscribeToAllMessageTypesAsync(); + } + + [Fact] + public override Task CanSubscribeToRawMessagesAsync() + { + return base.CanSubscribeToRawMessagesAsync(); + } + + [Fact] + public override Task CanCancelSubscriptionAsync() + { + return base.CanCancelSubscriptionAsync(); + } + + [Fact] + public override Task WontKeepMessagesWithNoSubscribersAsync() + { + return base.WontKeepMessagesWithNoSubscribersAsync(); + } + + [Fact] + public override Task CanReceiveFromMultipleSubscribersAsync() + { + return base.CanReceiveFromMultipleSubscribersAsync(); + } + + [Fact] + public override void CanDisposeWithNoSubscribersOrPublishers() + { + base.CanDisposeWithNoSubscribersOrPublishers(); + } + + [Fact] + public async Task CanPersistAndNotLoseMessages() + { + var messageBus1 = new RabbitMQMessageBus(o => o + .ConnectionString(connectionString) + .LoggerFactory(Log) + .SubscriptionQueueName($"{_topic}-offline") + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic)); + + var countdownEvent = new AsyncCountdownEvent(1); + var cts = new CancellationTokenSource(); + await messageBus1.SubscribeAsync(msg => + { + _logger.LogInformation("[Subscriber1] Got message: {Message}", msg.Data); + countdownEvent.Signal(); + }, cts.Token); + + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 1" }); + await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(0, countdownEvent.CurrentCount); + await cts.CancelAsync(); + + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 2" }); + + cts = new CancellationTokenSource(); + countdownEvent.AddCount(1); + await messageBus1.SubscribeAsync(msg => + { + _logger.LogInformation("[Subscriber2] Got message: {Message}", msg.Data); + countdownEvent.Signal(); + }, cts.Token); + await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(0, countdownEvent.CurrentCount); + await cts.CancelAsync(); + + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 1" }); + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 2" }); + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 3" }); + + await messageBus1.DisposeAsync(); + + var messageBus2 = new RabbitMQMessageBus(o => o + .ConnectionString(connectionString) + .LoggerFactory(Log) + .SubscriptionQueueName($"{_topic}-offline") + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic)); + + cts = new CancellationTokenSource(); + countdownEvent.AddCount(4); + await messageBus2.SubscribeAsync(msg => + { + _logger.LogInformation("[Subscriber3] Got message: {Message}", msg.Data); + countdownEvent.Signal(); + }, cts.Token); + await messageBus2.PublishAsync(new SimpleMessageA { Data = "Another audit message 4" }); + await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(0, countdownEvent.CurrentCount); + + await messageBus2.DisposeAsync(); + } +} diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTests.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTests.cs index 3314bb1..3c1808e 100644 --- a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTests.cs +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTests.cs @@ -1,207 +1,8 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Foundatio.AsyncEx; -using Foundatio.Messaging; -using Foundatio.Tests.Extensions; -using Foundatio.Tests.Messaging; -using Microsoft.Extensions.Logging; -using Xunit; using Xunit.Abstractions; namespace Foundatio.RabbitMQ.Tests.Messaging; -public class RabbitMqMessageBusTests : MessageBusTestBase +public class RabbitMqMessageBusTests : RabbitMqMessageBusTestBase { - private readonly string _topic = $"test_topic_{DateTime.UtcNow.Ticks}"; - - public RabbitMqMessageBusTests(ITestOutputHelper output) : base(output) { } - - protected override IMessageBus GetMessageBus(Func config = null) - { - return new RabbitMQMessageBus(o => - { - o.ConnectionString("amqp://localhost:5672"); - o.LoggerFactory(Log); - - config?.Invoke(o.Target); - - return o; - }); - } - - [Fact] - public override Task CanUseMessageOptionsAsync() - { - return base.CanUseMessageOptionsAsync(); - } - - [Fact] - public override Task CanSendMessageAsync() - { - return base.CanSendMessageAsync(); - } - - [Fact] - public override Task CanHandleNullMessageAsync() - { - return base.CanHandleNullMessageAsync(); - } - - [Fact] - public override Task CanSendDerivedMessageAsync() - { - return base.CanSendDerivedMessageAsync(); - } - - [Fact] - public override Task CanSendMappedMessageAsync() - { - return base.CanSendMappedMessageAsync(); - } - - [Fact] - public override Task CanSendDelayedMessageAsync() - { - return base.CanSendDelayedMessageAsync(); - } - - [Fact] - public override Task CanSubscribeConcurrentlyAsync() - { - return base.CanSubscribeConcurrentlyAsync(); - } - - [Fact] - public override Task CanReceiveMessagesConcurrentlyAsync() - { - return base.CanReceiveMessagesConcurrentlyAsync(); - } - - [Fact] - public override Task CanSendMessageToMultipleSubscribersAsync() - { - return base.CanSendMessageToMultipleSubscribersAsync(); - } - - [Fact] - public override Task CanTolerateSubscriberFailureAsync() - { - return base.CanTolerateSubscriberFailureAsync(); - } - - [Fact] - public override Task WillOnlyReceiveSubscribedMessageTypeAsync() - { - return base.WillOnlyReceiveSubscribedMessageTypeAsync(); - } - - [Fact] - public override Task WillReceiveDerivedMessageTypesAsync() - { - return base.WillReceiveDerivedMessageTypesAsync(); - } - - [Fact] - public override Task CanSubscribeToAllMessageTypesAsync() - { - return base.CanSubscribeToAllMessageTypesAsync(); - } - - [Fact] - public override Task CanSubscribeToRawMessagesAsync() - { - return base.CanSubscribeToRawMessagesAsync(); - } - - [Fact] - public override Task CanCancelSubscriptionAsync() - { - return base.CanCancelSubscriptionAsync(); - } - - [Fact] - public override Task WontKeepMessagesWithNoSubscribersAsync() - { - return base.WontKeepMessagesWithNoSubscribersAsync(); - } - - [Fact] - public override Task CanReceiveFromMultipleSubscribersAsync() - { - return base.CanReceiveFromMultipleSubscribersAsync(); - } - - [Fact] - public override void CanDisposeWithNoSubscribersOrPublishers() - { - base.CanDisposeWithNoSubscribersOrPublishers(); - } - - [Fact] - public async Task CanPersistAndNotLoseMessages() - { - Log.DefaultMinimumLevel = LogLevel.Trace; - - var messageBus1 = new RabbitMQMessageBus(o => o - .ConnectionString("amqp://localhost:5673") - .LoggerFactory(Log) - .SubscriptionQueueName($"{_topic}-offline") - .IsSubscriptionQueueExclusive(false) - .SubscriptionQueueAutoDelete(false) - .AcknowledgementStrategy(AcknowledgementStrategy.Automatic)); - - var countdownEvent = new AsyncCountdownEvent(1); - var cts = new CancellationTokenSource(); - await messageBus1.SubscribeAsync(msg => - { - _logger.LogInformation("[Subscriber1] Got message: {Message}", msg.Data); - countdownEvent.Signal(); - }, cts.Token); - - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 1" }); - await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal(0, countdownEvent.CurrentCount); - await cts.CancelAsync(); - - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 2" }); - - cts = new CancellationTokenSource(); - countdownEvent.AddCount(1); - await messageBus1.SubscribeAsync(msg => - { - _logger.LogInformation("[Subscriber2] Got message: {Message}", msg.Data); - countdownEvent.Signal(); - }, cts.Token); - await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal(0, countdownEvent.CurrentCount); - await cts.CancelAsync(); - - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 1" }); - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 2" }); - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 3" }); - - messageBus1.Dispose(); - - var messageBus2 = new RabbitMQMessageBus(o => o - .ConnectionString("amqp://localhost:5673") - .LoggerFactory(Log) - .SubscriptionQueueName($"{_topic}-offline") - .IsSubscriptionQueueExclusive(false) - .SubscriptionQueueAutoDelete(false) - .AcknowledgementStrategy(AcknowledgementStrategy.Automatic)); - - cts = new CancellationTokenSource(); - countdownEvent.AddCount(4); - await messageBus2.SubscribeAsync(msg => - { - _logger.LogInformation("[Subscriber3] Got message: {Message}", msg.Data); - countdownEvent.Signal(); - }, cts.Token); - await messageBus2.PublishAsync(new SimpleMessageA { Data = "Another audit message 4" }); - await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal(0, countdownEvent.CurrentCount); - - messageBus2.Dispose(); - } + public RabbitMqMessageBusTests(ITestOutputHelper output) : base("amqp://localhost:5672", output) { } }