Skip to content

Commit

Permalink
Merge pull request #34 from FoundatioFx/feature/rabbitmq7
Browse files Browse the repository at this point in the history
Upgrade to RabbitMQ 7
  • Loading branch information
niemyjski authored Nov 25, 2024
2 parents af9c814 + 67077a2 commit c1c7ae9
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 299 deletions.
2 changes: 1 addition & 1 deletion .idea/.idea.Foundatio.RabbitMQ/.idea/indexLayout.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Foundatio.RabbitMQ.sln
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
README.md = README.md
docker-compose.yml = docker-compose.yml
tests\Directory.Build.props = tests\Directory.Build.props
build\Dockerfile = build\Dockerfile
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Foundatio.RabbitMQ", "src\Foundatio.RabbitMQ\Foundatio.RabbitMQ.csproj", "{EAE3607D-73A1-4D02-BDAA-24A37DDA15CB}"
Expand Down
4 changes: 2 additions & 2 deletions build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rabbitmq:3.10-management
FROM rabbitmq:4.0.3-management

COPY rabbitmq_delayed_message_exchange-3.10.0.ez /opt/rabbitmq/plugins
COPY rabbitmq_delayed_message_exchange-4.0.2.ez /opt/rabbitmq/plugins
RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2 changes: 1 addition & 1 deletion build/common.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project ToolsVersion="15.0">

<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<TargetFrameworks>netstandard2.1</TargetFrameworks>
<Product>Foundatio</Product>
<Description>Pluggable foundation blocks for building distributed apps.</Description>
<PackageProjectUrl>https://github.com/FoundatioFx/Foundatio.RabbitMQ</PackageProjectUrl>
Expand Down
Binary file removed build/rabbitmq_delayed_message_exchange-3.10.0.ez
Binary file not shown.
Binary file added build/rabbitmq_delayed_message_exchange-4.0.2.ez
Binary file not shown.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
rabbitmq:
image: rabbitmq:3.10-management
image: rabbitmq:4.0.3-management
ports:
- "5672:5672"
- "8080:15672" # management ui - login with guest:guest
Expand Down
6 changes: 6 additions & 0 deletions src/Foundatio.RabbitMQ/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,10 @@ public static ConfiguredTaskAwaitable<TResult> AnyContext<TResult>(this Awaitabl
{
return task.ConfigureAwait(continueOnCapturedContext: false);
}

[DebuggerStepThrough]
public static ConfiguredValueTaskAwaitable AnyContext(this ValueTask task)
{
return task.ConfigureAwait(continueOnCapturedContext: false);
}
}
2 changes: 1 addition & 1 deletion src/Foundatio.RabbitMQ/Foundatio.RabbitMQ.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />

<PackageReference Include="Foundatio" Version="11.0.5" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />
<ProjectReference Include="..\..\..\Foundatio\src\Foundatio\Foundatio.csproj" Condition="'$(ReferenceFoundatioSource)' == 'true'" />
Expand Down
238 changes: 146 additions & 92 deletions src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using Xunit.Abstractions;

namespace Foundatio.RabbitMQ.Tests.Messaging;

public class RabbitMqMessageBusDelayedExchangeTests : RabbitMqMessageBusTestBase
{
public RabbitMqMessageBusDelayedExchangeTests(ITestOutputHelper output) : base("amqp://localhost:5673", output) { }
}
203 changes: 203 additions & 0 deletions tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.AsyncEx;
using Foundatio.Messaging;
using Foundatio.Tests.Extensions;
using Foundatio.Tests.Messaging;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

namespace Foundatio.RabbitMQ.Tests.Messaging;

public abstract class RabbitMqMessageBusTestBase(string connectionString, ITestOutputHelper output) : MessageBusTestBase(output)
{
private readonly string _topic = $"test_topic_{DateTime.UtcNow.Ticks}";

protected override IMessageBus GetMessageBus(Func<SharedMessageBusOptions, SharedMessageBusOptions> config = null)
{
return new RabbitMQMessageBus(o =>
{
o.ConnectionString(connectionString);
o.LoggerFactory(Log);

config?.Invoke(o.Target);

return o;
});
}

[Fact]
public override Task CanUseMessageOptionsAsync()
{
return base.CanUseMessageOptionsAsync();
}

[Fact]
public override Task CanSendMessageAsync()
{
return base.CanSendMessageAsync();
}

[Fact]
public override Task CanHandleNullMessageAsync()
{
return base.CanHandleNullMessageAsync();
}

[Fact]
public override Task CanSendDerivedMessageAsync()
{
return base.CanSendDerivedMessageAsync();
}

[Fact]
public override Task CanSendMappedMessageAsync()
{
return base.CanSendMappedMessageAsync();
}

[Fact]
public override Task CanSendDelayedMessageAsync()
{
return base.CanSendDelayedMessageAsync();
}

[Fact]
public override Task CanSubscribeConcurrentlyAsync()
{
return base.CanSubscribeConcurrentlyAsync();
}

[Fact]
public override Task CanReceiveMessagesConcurrentlyAsync()
{
return base.CanReceiveMessagesConcurrentlyAsync();
}

[Fact]
public override Task CanSendMessageToMultipleSubscribersAsync()
{
return base.CanSendMessageToMultipleSubscribersAsync();
}

[Fact]
public override Task CanTolerateSubscriberFailureAsync()
{
return base.CanTolerateSubscriberFailureAsync();
}

[Fact]
public override Task WillOnlyReceiveSubscribedMessageTypeAsync()
{
return base.WillOnlyReceiveSubscribedMessageTypeAsync();
}

[Fact]
public override Task WillReceiveDerivedMessageTypesAsync()
{
return base.WillReceiveDerivedMessageTypesAsync();
}

[Fact]
public override Task CanSubscribeToAllMessageTypesAsync()
{
return base.CanSubscribeToAllMessageTypesAsync();
}

[Fact]
public override Task CanSubscribeToRawMessagesAsync()
{
return base.CanSubscribeToRawMessagesAsync();
}

[Fact]
public override Task CanCancelSubscriptionAsync()
{
return base.CanCancelSubscriptionAsync();
}

[Fact]
public override Task WontKeepMessagesWithNoSubscribersAsync()
{
return base.WontKeepMessagesWithNoSubscribersAsync();
}

[Fact]
public override Task CanReceiveFromMultipleSubscribersAsync()
{
return base.CanReceiveFromMultipleSubscribersAsync();
}

[Fact]
public override void CanDisposeWithNoSubscribersOrPublishers()
{
base.CanDisposeWithNoSubscribersOrPublishers();
}

[Fact]
public async Task CanPersistAndNotLoseMessages()
{
var messageBus1 = new RabbitMQMessageBus(o => o
.ConnectionString(connectionString)
.LoggerFactory(Log)
.SubscriptionQueueName($"{_topic}-offline")
.IsSubscriptionQueueExclusive(false)
.SubscriptionQueueAutoDelete(false)
.AcknowledgementStrategy(AcknowledgementStrategy.Automatic));

var countdownEvent = new AsyncCountdownEvent(1);
var cts = new CancellationTokenSource();
await messageBus1.SubscribeAsync<SimpleMessageA>(msg =>
{
_logger.LogInformation("[Subscriber1] Got message: {Message}", msg.Data);
countdownEvent.Signal();
}, cts.Token);

await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 1" });
await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal(0, countdownEvent.CurrentCount);
await cts.CancelAsync();

await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit message 2" });

cts = new CancellationTokenSource();
countdownEvent.AddCount(1);
await messageBus1.SubscribeAsync<SimpleMessageA>(msg =>
{
_logger.LogInformation("[Subscriber2] Got message: {Message}", msg.Data);
countdownEvent.Signal();
}, cts.Token);
await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal(0, countdownEvent.CurrentCount);
await cts.CancelAsync();

await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 1" });
await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 2" });
await messageBus1.PublishAsync(new SimpleMessageA { Data = "Audit offline message 3" });

await messageBus1.DisposeAsync();

var messageBus2 = new RabbitMQMessageBus(o => o
.ConnectionString(connectionString)
.LoggerFactory(Log)
.SubscriptionQueueName($"{_topic}-offline")
.IsSubscriptionQueueExclusive(false)
.SubscriptionQueueAutoDelete(false)
.AcknowledgementStrategy(AcknowledgementStrategy.Automatic));

cts = new CancellationTokenSource();
countdownEvent.AddCount(4);
await messageBus2.SubscribeAsync<SimpleMessageA>(msg =>
{
_logger.LogInformation("[Subscriber3] Got message: {Message}", msg.Data);
countdownEvent.Signal();
}, cts.Token);
await messageBus2.PublishAsync(new SimpleMessageA { Data = "Another audit message 4" });
await countdownEvent.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal(0, countdownEvent.CurrentCount);

await messageBus2.DisposeAsync();
}
}
Loading

0 comments on commit c1c7ae9

Please sign in to comment.