diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusDelayedExchangeTests.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusDelayedExchangeTests.cs new file mode 100644 index 0000000..2b5a674 --- /dev/null +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusDelayedExchangeTests.cs @@ -0,0 +1,8 @@ +using Xunit.Abstractions; + +namespace Foundatio.RabbitMQ.Tests.Messaging; + +public class RabbitMqMessageBusDelayedExchangeTests : RabbitMqMessageBusTestBase +{ + public RabbitMqMessageBusDelayedExchangeTests(ITestOutputHelper output) : base("amqp://localhost:5673", output) { } +} diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs new file mode 100644 index 0000000..1aa51ed --- /dev/null +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs @@ -0,0 +1,203 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.AsyncEx; +using Foundatio.Messaging; +using Foundatio.Tests.Extensions; +using Foundatio.Tests.Messaging; +using Microsoft.Extensions.Logging; +using Xunit; +using Xunit.Abstractions; + +namespace Foundatio.RabbitMQ.Tests.Messaging; + +public abstract class RabbitMqMessageBusTestBase(string connectionString, ITestOutputHelper output) : MessageBusTestBase(output) +{ + private readonly string _topic = $"test_topic_{DateTime.UtcNow.Ticks}"; + + protected override IMessageBus GetMessageBus(Func config = null) + { + return new RabbitMQMessageBus(o => + { + o.ConnectionString(connectionString); + o.LoggerFactory(Log); + + config?.Invoke(o.Target); + + return o; + }); + } + + [Fact] + public override Task CanUseMessageOptionsAsync() + { + return base.CanUseMessageOptionsAsync(); + } + + [Fact] + public override Task CanSendMessageAsync() + { + return base.CanSendMessageAsync(); + } + + [Fact] + public override Task CanHandleNullMessageAsync() + { + return base.CanHandleNullMessageAsync(); + } + + [Fact] + public override Task CanSendDerivedMessageAsync() + { + return base.CanSendDerivedMessageAsync(); + } + + [Fact] + public override Task CanSendMappedMessageAsync() + { + return base.CanSendMappedMessageAsync(); + } + + [Fact] + public override Task CanSendDelayedMessageAsync() + { + return base.CanSendDelayedMessageAsync(); + } + + [Fact] + public override Task CanSubscribeConcurrentlyAsync() + { + return base.CanSubscribeConcurrentlyAsync(); + } + + [Fact] + public override Task CanReceiveMessagesConcurrentlyAsync() + { + return base.CanReceiveMessagesConcurrentlyAsync(); + } + + [Fact] + public override Task CanSendMessageToMultipleSubscribersAsync() + { + return base.CanSendMessageToMultipleSubscribersAsync(); + } + + [Fact] + public override Task CanTolerateSubscriberFailureAsync() + { + return base.CanTolerateSubscriberFailureAsync(); + } + + [Fact] + public override Task WillOnlyReceiveSubscribedMessageTypeAsync() + { + return base.WillOnlyReceiveSubscribedMessageTypeAsync(); + } + + [Fact] + public override Task WillReceiveDerivedMessageTypesAsync() + { + return base.WillReceiveDerivedMessageTypesAsync(); + } + + [Fact] + public override Task CanSubscribeToAllMessageTypesAsync() + { + return base.CanSubscribeToAllMessageTypesAsync(); + } + + [Fact] + public override Task CanSubscribeToRawMessagesAsync() + { + return base.CanSubscribeToRawMessagesAsync(); + } + + [Fact] + public override Task CanCancelSubscriptionAsync() + { + return base.CanCancelSubscriptionAsync(); + } + + [Fact] + public override Task WontKeepMessagesWithNoSubscribersAsync() + { + return base.WontKeepMessagesWithNoSubscribersAsync(); + } + + [Fact] + public override Task CanReceiveFromMultipleSubscribersAsync() + { + return base.CanReceiveFromMultipleSubscribersAsync(); + } + + [Fact] + public override void CanDisposeWithNoSubscribersOrPublishers() + { + base.CanDisposeWithNoSubscribersOrPublishers(); + } + + [Fact] + public async Task CanPersistAndNotLoseMessages() + { + var messageBus1 = new RabbitMQMessageBus(o => o + .ConnectionString(connectionString) + .LoggerFactory(Log) + .SubscriptionQueueName($"{_topic}-offline") + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic)); + + var countdownEvent = new AsyncCountdownEvent(1); + var cts = new CancellationTokenSource(); + await messageBus1.SubscribeAsync(msg => + { + _logger.LogInformation("[Subscriber1] Got message: {Message}", msg.Data); + countdownEvent.Signal(); + }, cts.Token); + + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 1" }); + await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(0, countdownEvent.CurrentCount); + await cts.CancelAsync(); + + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 2" }); + + cts = new CancellationTokenSource(); + countdownEvent.AddCount(1); + await messageBus1.SubscribeAsync(msg => + { + _logger.LogInformation("[Subscriber2] Got message: {Message}", msg.Data); + countdownEvent.Signal(); + }, cts.Token); + await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(0, countdownEvent.CurrentCount); + await cts.CancelAsync(); + + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 1" }); + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 2" }); + await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 3" }); + + await messageBus1.DisposeAsync(); + + var messageBus2 = new RabbitMQMessageBus(o => o + .ConnectionString(connectionString) + .LoggerFactory(Log) + .SubscriptionQueueName($"{_topic}-offline") + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic)); + + cts = new CancellationTokenSource(); + countdownEvent.AddCount(4); + await messageBus2.SubscribeAsync(msg => + { + _logger.LogInformation("[Subscriber3] Got message: {Message}", msg.Data); + countdownEvent.Signal(); + }, cts.Token); + await messageBus2.PublishAsync(new SimpleMessageA { Data = "Another audit message 4" }); + await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.Equal(0, countdownEvent.CurrentCount); + + await messageBus2.DisposeAsync(); + } +} diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTests.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTests.cs index 3314bb1..3c1808e 100644 --- a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTests.cs +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTests.cs @@ -1,207 +1,8 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Foundatio.AsyncEx; -using Foundatio.Messaging; -using Foundatio.Tests.Extensions; -using Foundatio.Tests.Messaging; -using Microsoft.Extensions.Logging; -using Xunit; using Xunit.Abstractions; namespace Foundatio.RabbitMQ.Tests.Messaging; -public class RabbitMqMessageBusTests : MessageBusTestBase +public class RabbitMqMessageBusTests : RabbitMqMessageBusTestBase { - private readonly string _topic = $"test_topic_{DateTime.UtcNow.Ticks}"; - - public RabbitMqMessageBusTests(ITestOutputHelper output) : base(output) { } - - protected override IMessageBus GetMessageBus(Func config = null) - { - return new RabbitMQMessageBus(o => - { - o.ConnectionString("amqp://localhost:5672"); - o.LoggerFactory(Log); - - config?.Invoke(o.Target); - - return o; - }); - } - - [Fact] - public override Task CanUseMessageOptionsAsync() - { - return base.CanUseMessageOptionsAsync(); - } - - [Fact] - public override Task CanSendMessageAsync() - { - return base.CanSendMessageAsync(); - } - - [Fact] - public override Task CanHandleNullMessageAsync() - { - return base.CanHandleNullMessageAsync(); - } - - [Fact] - public override Task CanSendDerivedMessageAsync() - { - return base.CanSendDerivedMessageAsync(); - } - - [Fact] - public override Task CanSendMappedMessageAsync() - { - return base.CanSendMappedMessageAsync(); - } - - [Fact] - public override Task CanSendDelayedMessageAsync() - { - return base.CanSendDelayedMessageAsync(); - } - - [Fact] - public override Task CanSubscribeConcurrentlyAsync() - { - return base.CanSubscribeConcurrentlyAsync(); - } - - [Fact] - public override Task CanReceiveMessagesConcurrentlyAsync() - { - return base.CanReceiveMessagesConcurrentlyAsync(); - } - - [Fact] - public override Task CanSendMessageToMultipleSubscribersAsync() - { - return base.CanSendMessageToMultipleSubscribersAsync(); - } - - [Fact] - public override Task CanTolerateSubscriberFailureAsync() - { - return base.CanTolerateSubscriberFailureAsync(); - } - - [Fact] - public override Task WillOnlyReceiveSubscribedMessageTypeAsync() - { - return base.WillOnlyReceiveSubscribedMessageTypeAsync(); - } - - [Fact] - public override Task WillReceiveDerivedMessageTypesAsync() - { - return base.WillReceiveDerivedMessageTypesAsync(); - } - - [Fact] - public override Task CanSubscribeToAllMessageTypesAsync() - { - return base.CanSubscribeToAllMessageTypesAsync(); - } - - [Fact] - public override Task CanSubscribeToRawMessagesAsync() - { - return base.CanSubscribeToRawMessagesAsync(); - } - - [Fact] - public override Task CanCancelSubscriptionAsync() - { - return base.CanCancelSubscriptionAsync(); - } - - [Fact] - public override Task WontKeepMessagesWithNoSubscribersAsync() - { - return base.WontKeepMessagesWithNoSubscribersAsync(); - } - - [Fact] - public override Task CanReceiveFromMultipleSubscribersAsync() - { - return base.CanReceiveFromMultipleSubscribersAsync(); - } - - [Fact] - public override void CanDisposeWithNoSubscribersOrPublishers() - { - base.CanDisposeWithNoSubscribersOrPublishers(); - } - - [Fact] - public async Task CanPersistAndNotLoseMessages() - { - Log.DefaultMinimumLevel = LogLevel.Trace; - - var messageBus1 = new RabbitMQMessageBus(o => o - .ConnectionString("amqp://localhost:5673") - .LoggerFactory(Log) - .SubscriptionQueueName($"{_topic}-offline") - .IsSubscriptionQueueExclusive(false) - .SubscriptionQueueAutoDelete(false) - .AcknowledgementStrategy(AcknowledgementStrategy.Automatic)); - - var countdownEvent = new AsyncCountdownEvent(1); - var cts = new CancellationTokenSource(); - await messageBus1.SubscribeAsync(msg => - { - _logger.LogInformation("[Subscriber1] Got message: {Message}", msg.Data); - countdownEvent.Signal(); - }, cts.Token); - - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 1" }); - await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal(0, countdownEvent.CurrentCount); - await cts.CancelAsync(); - - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 2" }); - - cts = new CancellationTokenSource(); - countdownEvent.AddCount(1); - await messageBus1.SubscribeAsync(msg => - { - _logger.LogInformation("[Subscriber2] Got message: {Message}", msg.Data); - countdownEvent.Signal(); - }, cts.Token); - await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal(0, countdownEvent.CurrentCount); - await cts.CancelAsync(); - - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 1" }); - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 2" }); - await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 3" }); - - messageBus1.Dispose(); - - var messageBus2 = new RabbitMQMessageBus(o => o - .ConnectionString("amqp://localhost:5673") - .LoggerFactory(Log) - .SubscriptionQueueName($"{_topic}-offline") - .IsSubscriptionQueueExclusive(false) - .SubscriptionQueueAutoDelete(false) - .AcknowledgementStrategy(AcknowledgementStrategy.Automatic)); - - cts = new CancellationTokenSource(); - countdownEvent.AddCount(4); - await messageBus2.SubscribeAsync(msg => - { - _logger.LogInformation("[Subscriber3] Got message: {Message}", msg.Data); - countdownEvent.Signal(); - }, cts.Token); - await messageBus2.PublishAsync(new SimpleMessageA { Data = "Another audit message 4" }); - await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5)); - Assert.Equal(0, countdownEvent.CurrentCount); - - messageBus2.Dispose(); - } + public RabbitMqMessageBusTests(ITestOutputHelper output) : base("amqp://localhost:5672", output) { } }