Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PM-17562] Refactor existing RabbitMq implementation #5357

Merged
merged 4 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Core/AdminConsole/Services/IEventMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
๏ปฟusing Bit.Core.Models.Data;

namespace Bit.Core.Services;

public interface IEventMessageHandler
{
Task HandleEventAsync(EventMessage eventMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
๏ปฟusing Bit.Core.Models.Data;
using Microsoft.Extensions.DependencyInjection;

namespace Bit.Core.Services;

public class EventRepositoryHandler(
[FromKeyedServices("persistent")] IEventWriteService eventWriteService)
: IEventMessageHandler
{
public Task HandleEventAsync(EventMessage eventMessage)
{
return eventWriteService.CreateAsync(eventMessage);
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
๏ปฟusing System.Net.Http.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.Logging;

namespace Bit.Core.Services;

public class RabbitMqEventHttpPostListener : RabbitMqEventListenerBase
public class HttpPostEventHandler : IEventMessageHandler
{
private readonly HttpClient _httpClient;
private readonly string _httpPostUrl;
private readonly string _queueName;

protected override string QueueName => _queueName;
public const string HttpClientName = "HttpPostEventHandlerHttpClient";

public const string HttpClientName = "EventHttpPostListenerHttpClient";

public RabbitMqEventHttpPostListener(
public HttpPostEventHandler(
IHttpClientFactory httpClientFactory,
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
: base(logger, globalSettings)
{
_httpClient = httpClientFactory.CreateClient(HttpClientName);
_httpPostUrl = globalSettings.EventLogging.RabbitMq.HttpPostUrl;
_queueName = globalSettings.EventLogging.RabbitMq.HttpPostQueueName;
}

protected override async Task HandleMessageAsync(EventMessage eventMessage)
public async Task HandleEventAsync(EventMessage eventMessage)
{
var content = JsonContent.Create(eventMessage);
var response = await _httpClient.PostAsync(_httpPostUrl, content);
Expand Down

This file was deleted.

13 changes: 13 additions & 0 deletions src/Core/Services/EventLoggingListenerService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
๏ปฟusing Microsoft.Extensions.Hosting;

namespace Bit.Core.Services;

public abstract class EventLoggingListenerService : BackgroundService
{
protected readonly IEventMessageHandler _handler;

protected EventLoggingListenerService(IEventMessageHandler handler)
{

Check warning on line 10 in src/Core/Services/EventLoggingListenerService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Services/EventLoggingListenerService.cs#L9-L10

Added lines #L9 - L10 were not covered by tests
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
}

Check warning on line 12 in src/Core/Services/EventLoggingListenerService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Services/EventLoggingListenerService.cs#L12

Added line #L12 was not covered by tests
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
๏ปฟusing System.Text.Json;
using Bit.Core.Models.Data;
using Bit.Core.Settings;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Bit.Core.Services;

public abstract class RabbitMqEventListenerBase : BackgroundService
public class RabbitMqEventListenerService : EventLoggingListenerService
{
private IChannel _channel;
private IConnection _connection;
private readonly string _exchangeName;
private readonly ConnectionFactory _factory;
private readonly ILogger<RabbitMqEventListenerBase> _logger;
private readonly ILogger<RabbitMqEventListenerService> _logger;
private readonly string _queueName;

protected abstract string QueueName { get; }

protected RabbitMqEventListenerBase(
ILogger<RabbitMqEventListenerBase> logger,
GlobalSettings globalSettings)
public RabbitMqEventListenerService(
IEventMessageHandler handler,
ILogger<RabbitMqEventListenerService> logger,
GlobalSettings globalSettings,
string queueName) : base(handler)

Check warning on line 23 in src/Core/Services/Implementations/RabbitMqEventListenerService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Services/Implementations/RabbitMqEventListenerService.cs#L23

Added line #L23 was not covered by tests
{
_factory = new ConnectionFactory
{
Expand All @@ -30,6 +30,7 @@
};
_exchangeName = globalSettings.EventLogging.RabbitMq.ExchangeName;
_logger = logger;
_queueName = queueName;

Check warning on line 33 in src/Core/Services/Implementations/RabbitMqEventListenerService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Services/Implementations/RabbitMqEventListenerService.cs#L33

Added line #L33 was not covered by tests
}

public override async Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -38,13 +39,13 @@
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);

await _channel.ExchangeDeclareAsync(exchange: _exchangeName, type: ExchangeType.Fanout, durable: true);
await _channel.QueueDeclareAsync(queue: QueueName,
await _channel.QueueDeclareAsync(queue: _queueName,

Check warning on line 42 in src/Core/Services/Implementations/RabbitMqEventListenerService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Services/Implementations/RabbitMqEventListenerService.cs#L42

Added line #L42 was not covered by tests
durable: true,
exclusive: false,
autoDelete: false,
arguments: null,
cancellationToken: cancellationToken);
await _channel.QueueBindAsync(queue: QueueName,
await _channel.QueueBindAsync(queue: _queueName,

Check warning on line 48 in src/Core/Services/Implementations/RabbitMqEventListenerService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Services/Implementations/RabbitMqEventListenerService.cs#L48

Added line #L48 was not covered by tests
exchange: _exchangeName,
routingKey: string.Empty,
cancellationToken: cancellationToken);
Expand All @@ -59,15 +60,15 @@
try
{
var eventMessage = JsonSerializer.Deserialize<EventMessage>(eventArgs.Body.Span);
await HandleMessageAsync(eventMessage);
await _handler.HandleEventAsync(eventMessage);

Check warning on line 63 in src/Core/Services/Implementations/RabbitMqEventListenerService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Services/Implementations/RabbitMqEventListenerService.cs#L63

Added line #L63 was not covered by tests
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while processing the message");
}
};

await _channel.BasicConsumeAsync(QueueName, autoAck: true, consumer: consumer, cancellationToken: stoppingToken);
await _channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: stoppingToken);

Check warning on line 71 in src/Core/Services/Implementations/RabbitMqEventListenerService.cs

View check run for this annotation

Codecov / codecov/patch

src/Core/Services/Implementations/RabbitMqEventListenerService.cs#L71

Added line #L71 was not covered by tests

while (!stoppingToken.IsCancellationRequested)
{
Expand All @@ -88,6 +89,4 @@
_connection.Dispose();
base.Dispose();
}

protected abstract Task HandleMessageAsync(EventMessage eventMessage);
}
1 change: 1 addition & 0 deletions src/Core/Settings/IGlobalSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public interface IGlobalSettings
string DatabaseProvider { get; set; }
GlobalSettings.SqlSettings SqlServer { get; set; }
string DevelopmentDirectory { get; set; }
GlobalSettings.EventLoggingSettings EventLogging { get; set; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

โ„น๏ธ We're not totally sure that we're gonna keep maintaining this interface as we're almost always just asking for the implementation, but this is fine.

}
19 changes: 16 additions & 3 deletions src/Events/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,26 @@
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.Password) &&
CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.ExchangeName))
{
services.AddSingleton<EventRepositoryHandler>();

Check warning on line 92 in src/Events/Startup.cs

View check run for this annotation

Codecov / codecov/patch

src/Events/Startup.cs#L92

Added line #L92 was not covered by tests
services.AddKeyedSingleton<IEventWriteService, RepositoryEventWriteService>("persistent");
services.AddHostedService<RabbitMqEventRepositoryListener>();
services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService(
provider.GetRequiredService<EventRepositoryHandler>(),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(),
provider.GetRequiredService<GlobalSettings>(),
globalSettings.EventLogging.RabbitMq.EventRepositoryQueueName));

Check warning on line 99 in src/Events/Startup.cs

View check run for this annotation

Codecov / codecov/patch

src/Events/Startup.cs#L94-L99

Added lines #L94 - L99 were not covered by tests

if (CoreHelpers.SettingHasValue(globalSettings.EventLogging.RabbitMq.HttpPostUrl))
{
services.AddHttpClient(RabbitMqEventHttpPostListener.HttpClientName);
services.AddHostedService<RabbitMqEventHttpPostListener>();
services.AddSingleton<HttpPostEventHandler>();
services.AddHttpClient(HttpPostEventHandler.HttpClientName);

Check warning on line 104 in src/Events/Startup.cs

View check run for this annotation

Codecov / codecov/patch

src/Events/Startup.cs#L103-L104

Added lines #L103 - L104 were not covered by tests

services.AddSingleton<IHostedService>(provider =>
new RabbitMqEventListenerService(
provider.GetRequiredService<HttpPostEventHandler>(),
provider.GetRequiredService<ILogger<RabbitMqEventListenerService>>(),
provider.GetRequiredService<GlobalSettings>(),
globalSettings.EventLogging.RabbitMq.HttpPostQueueName));

Check warning on line 111 in src/Events/Startup.cs

View check run for this annotation

Codecov / codecov/patch

src/Events/Startup.cs#L106-L111

Added lines #L106 - L111 were not covered by tests
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions test/Common/MockedHttpClient/MockedHttpMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public class MockedHttpMessageHandler : HttpMessageHandler
{
private readonly List<IHttpRequestMatcher> _matchers = new();

public List<HttpRequestMessage> CapturedRequests { get; } = new List<HttpRequestMessage>();

/// <summary>
/// The fallback handler to use when the request does not match any of the provided matchers.
/// </summary>
Expand All @@ -16,6 +18,7 @@ public class MockedHttpMessageHandler : HttpMessageHandler

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
CapturedRequests.Add(request);
var matcher = _matchers.FirstOrDefault(x => x.Matches(request));
if (matcher == null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
๏ปฟusing Bit.Core.Models.Data;
using Bit.Core.Services;
using Bit.Test.Common.AutoFixture;
using Bit.Test.Common.AutoFixture.Attributes;
using Bit.Test.Common.Helpers;
using NSubstitute;
using Xunit;

namespace Bit.Core.Test.Services;

[SutProviderCustomize]
public class EventRepositoryHandlerTests
{
[Theory, BitAutoData]
public async Task HandleEventAsync_WritesEventToIEventWriteService(
EventMessage eventMessage,
SutProvider<EventRepositoryHandler> sutProvider)
{
await sutProvider.Sut.HandleEventAsync(eventMessage);
await sutProvider.GetDependency<IEventWriteService>().Received(1).CreateAsync(
Arg.Is(AssertHelper.AssertPropertyEqual<IEvent>(eventMessage))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
๏ปฟusing System.Net;
using System.Net.Http.Json;
using Bit.Core.Models.Data;
using Bit.Core.Services;
using Bit.Test.Common.AutoFixture;
using Bit.Test.Common.AutoFixture.Attributes;
using Bit.Test.Common.Helpers;
using Bit.Test.Common.MockedHttpClient;
using NSubstitute;
using Xunit;
using GlobalSettings = Bit.Core.Settings.GlobalSettings;

namespace Bit.Core.Test.Services;

[SutProviderCustomize]
public class HttpPostEventHandlerTests
{
private readonly MockedHttpMessageHandler _handler;
private HttpClient _httpClient;

private const string _httpPostUrl = "http://localhost/test/event";

public HttpPostEventHandlerTests()
{
_handler = new MockedHttpMessageHandler();
_handler.Fallback
.WithStatusCode(HttpStatusCode.OK)
.WithContent(new StringContent("<html><head><title>test</title></head><body>test</body></html>"));
_httpClient = _handler.ToHttpClient();
}

public SutProvider<HttpPostEventHandler> GetSutProvider()
{
var clientFactory = Substitute.For<IHttpClientFactory>();
clientFactory.CreateClient(HttpPostEventHandler.HttpClientName).Returns(_httpClient);

var globalSettings = new GlobalSettings();
globalSettings.EventLogging.RabbitMq.HttpPostUrl = _httpPostUrl;

return new SutProvider<HttpPostEventHandler>()
.SetDependency(globalSettings)
.SetDependency(clientFactory)
.Create();
}

[Theory, BitAutoData]
public async Task HandleEventAsync_PostsEventsToUrl(EventMessage eventMessage)
{
var sutProvider = GetSutProvider();
var content = JsonContent.Create(eventMessage);

await sutProvider.Sut.HandleEventAsync(eventMessage);
sutProvider.GetDependency<IHttpClientFactory>().Received(1).CreateClient(
Arg.Is(AssertHelper.AssertPropertyEqual<string>(HttpPostEventHandler.HttpClientName))
);

Assert.Single(_handler.CapturedRequests);
var request = _handler.CapturedRequests[0];
Assert.NotNull(request);
var returned = await request.Content.ReadFromJsonAsync<EventMessage>();

Assert.Equal(HttpMethod.Post, request.Method);
Assert.Equal(_httpPostUrl, request.RequestUri.ToString());
AssertHelper.AssertPropertyEqual(eventMessage, returned, new[] { "IdempotencyId" });
}
}
Loading