From 7c5af294b21fefb95a48f49af949b3d4e3c02b3d Mon Sep 17 00:00:00 2001 From: Andrii Voznesenskyi Date: Fri, 27 Sep 2024 17:22:14 +0200 Subject: [PATCH 1/2] (#75) mongo outbox: add Outbox for Mongo extesnsions --- .../Core/MongoMessageOutbox.cs | 171 ++++++++++++++++++ .../Core/ParalaxOutboxInitializer.cs | 59 ++++++ .../Extensions.cs | 41 +++++ ...Paralax.MessageBrokers.Outbox.Mongo.csproj | 16 ++ .../Paralax.MessageBrokers.Outbox.Mongo.sln | 25 +++ 5 files changed, 312 insertions(+) create mode 100644 src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Core/MongoMessageOutbox.cs create mode 100644 src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Core/ParalaxOutboxInitializer.cs create mode 100644 src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Extensions.cs create mode 100644 src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Paralax.MessageBrokers.Outbox.Mongo.csproj create mode 100644 src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Paralax.MessageBrokers.Outbox.Mongo.sln diff --git a/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Core/MongoMessageOutbox.cs b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Core/MongoMessageOutbox.cs new file mode 100644 index 0000000..92378da --- /dev/null +++ b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Core/MongoMessageOutbox.cs @@ -0,0 +1,171 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using MongoDB.Driver; +using Paralax; +using Paralax.MessageBrokers.Outbox; +using Paralax.MessageBrokers.Outbox.Messages; +using Paralax.Persistence.MongoDB; + +namespace Convey.MessageBrokers.Outbox.Mongo.Internals +{ + internal sealed class MongoMessageOutbox : IMessageOutbox, IMessageOutboxAccessor + { + private static readonly JsonSerializerOptions SerializerOptions = new() + { + PropertyNameCaseInsensitive = true, + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + NumberHandling = JsonNumberHandling.AllowReadingFromString, + Converters = { new JsonStringEnumConverter(JsonNamingPolicy.CamelCase) } + }; + + private const string EmptyJsonObject = "{}"; + private readonly IMongoSessionFactory _sessionFactory; + private readonly IMongoRepository _inboxRepository; + private readonly IMongoRepository _outboxRepository; + private readonly ILogger _logger; + private readonly bool _transactionsEnabled; + + public bool Enabled { get; } + + public MongoMessageOutbox(IMongoSessionFactory sessionFactory, + IMongoRepository inboxRepository, + IMongoRepository outboxRepository, + OutboxOptions options, ILogger logger) + { + _sessionFactory = sessionFactory; + _inboxRepository = inboxRepository; + _outboxRepository = outboxRepository; + _logger = logger; + _transactionsEnabled = !options.DisableTransactions; + Enabled = options.Enabled; + } + + public async Task HandleAsync(string messageId, Func handler) + { + if (!Enabled) + { + _logger.LogWarning("Outbox is disabled, incoming messages won't be processed."); + return; + } + + if (string.IsNullOrWhiteSpace(messageId)) + { + throw new ArgumentException("Message id to be processed cannot be empty.", nameof(messageId)); + } + + _logger.LogTrace($"Received a message with id: '{messageId}' to be processed."); + if (await _inboxRepository.ExistsAsync(m => m.Id == messageId)) + { + _logger.LogTrace($"Message with id: '{messageId}' was already processed."); + return; + } + + IClientSessionHandle session = null; + if (_transactionsEnabled) + { + session = await _sessionFactory.CreateAsync(); + session.StartTransaction(); + } + + try + { + _logger.LogTrace($"Processing a message with id: '{messageId}'..."); + await handler(); + await _inboxRepository.AddAsync(new InboxMessage + { + Id = messageId, + ProcessedAt = DateTime.UtcNow + }); + + if (session is not null) + { + await session.CommitTransactionAsync(); + } + + _logger.LogTrace($"Processed a message with id: '{messageId}'."); + } + catch (Exception ex) + { + _logger.LogError(ex, $"There was an error when processing a message with id: '{messageId}'."); + if (session is not null) + { + await session.AbortTransactionAsync(); + } + + throw; + } + finally + { + session?.Dispose(); + } + } + + public async Task SendAsync(T message, string originatedMessageId = null, string messageId = null, + string correlationId = null, string spanContext = null, object messageContext = null, + IDictionary headers = null) where T : class + { + if (!Enabled) + { + _logger.LogWarning("Outbox is disabled, outgoing messages won't be saved into the storage."); + return; + } + + var outboxMessage = new OutboxMessage + { + Id = string.IsNullOrWhiteSpace(messageId) ? Guid.NewGuid().ToString("N") : messageId, + OriginatedMessageId = originatedMessageId, + CorrelationId = correlationId, + SpanContext = spanContext, + SerializedMessageContext = + messageContext is null + ? EmptyJsonObject + : JsonSerializer.Serialize(messageContext, SerializerOptions), + MessageContextType = messageContext?.GetType().AssemblyQualifiedName, + Headers = (Dictionary) headers, + SerializedMessage = message is null + ? EmptyJsonObject + : JsonSerializer.Serialize(message, SerializerOptions), + MessageType = message?.GetType().AssemblyQualifiedName, + SentAt = DateTime.UtcNow + }; + await _outboxRepository.AddAsync(outboxMessage); + } + + async Task> IMessageOutboxAccessor.GetUnsentAsync() + { + var outboxMessages = await _outboxRepository.FindAsync(om => om.ProcessedAt == null); + return outboxMessages.Select(om => + { + if (om.MessageContextType is not null) + { + var messageContextType = Type.GetType(om.MessageContextType); + om.MessageContext = JsonSerializer.Deserialize(om.SerializedMessageContext, messageContextType, + SerializerOptions); + } + + if (om.MessageType is not null) + { + var messageType = Type.GetType(om.MessageType); + om.Message = JsonSerializer.Deserialize(om.SerializedMessage, messageType, SerializerOptions); + } + + return om; + }).ToList(); + } + + Task IMessageOutboxAccessor.ProcessAsync(OutboxMessage message) + => _outboxRepository.Collection.UpdateOneAsync( + Builders.Filter.Eq(m => m.Id, message.Id), + Builders.Update.Set(m => m.ProcessedAt, DateTime.UtcNow)); + + Task IMessageOutboxAccessor.ProcessAsync(IEnumerable outboxMessages) + => _outboxRepository.Collection.UpdateManyAsync( + Builders.Filter.In(m => m.Id, outboxMessages.Select(m => m.Id)), + Builders.Update.Set(m => m.ProcessedAt, DateTime.UtcNow)); + } +} diff --git a/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Core/ParalaxOutboxInitializer.cs b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Core/ParalaxOutboxInitializer.cs new file mode 100644 index 0000000..a4971c1 --- /dev/null +++ b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Core/ParalaxOutboxInitializer.cs @@ -0,0 +1,59 @@ +using System; +using System.Threading.Tasks; +using MongoDB.Driver; +using Paralax; +using Paralax.MessageBrokers.Outbox; +using Paralax.MessageBrokers.Outbox.Messages; +using Paralax.Types; + +namespace Convey.MessageBrokers.Outbox.Paralax.Internals +{ + internal sealed class MongoOutboxInitializer : IInitializer + { + private readonly IMongoDatabase _database; + private readonly OutboxOptions _options; + + public MongoOutboxInitializer(IMongoDatabase database, OutboxOptions options) + { + _database = database; + _options = options; + } + + public async Task InitializeAsync() + { + if (!_options.Enabled) + { + return; + } + + if (_options.Expiry <= 0) + { + return; + } + + var inboxCollection = string.IsNullOrWhiteSpace(_options.InboxCollection) + ? "inbox" + : _options.InboxCollection; + var builder = Builders.IndexKeys; + await _database.GetCollection(inboxCollection) + .Indexes.CreateOneAsync( + new CreateIndexModel(builder.Ascending(i => i.ProcessedAt), + new CreateIndexOptions + { + ExpireAfter = TimeSpan.FromSeconds(_options.Expiry) + })); + + var outboxCollection = string.IsNullOrWhiteSpace(_options.OutboxCollection) + ? "outbox" + : _options.OutboxCollection; + var outboxBuilder = Builders.IndexKeys; + await _database.GetCollection(outboxCollection) + .Indexes.CreateOneAsync( + new CreateIndexModel(outboxBuilder.Ascending(i => i.ProcessedAt), + new CreateIndexOptions + { + ExpireAfter = TimeSpan.FromSeconds(_options.Expiry) + })); + } + } +} diff --git a/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Extensions.cs b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Extensions.cs new file mode 100644 index 0000000..21ea960 --- /dev/null +++ b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Extensions.cs @@ -0,0 +1,41 @@ +using Convey.MessageBrokers.Outbox.Mongo.Internals; +using Convey.MessageBrokers.Outbox.Paralax.Internals; +using Microsoft.Extensions.DependencyInjection; +using MongoDB.Bson.Serialization; +using Paralax.MessageBrokers.Outbox.Messages; +using Paralax.Persistence.MongoDB; + +namespace Paralax.MessageBrokers.Outbox.Mongo +{ + public static class Extensions +{ + public static IMessageOutboxConfigurator AddMongo(this IMessageOutboxConfigurator configurator) + { + var builder = configurator.Builder; + var options = configurator.Options; + + var inboxCollection = string.IsNullOrWhiteSpace(options.InboxCollection) + ? "inbox" + : options.InboxCollection; + var outboxCollection = string.IsNullOrWhiteSpace(options.OutboxCollection) + ? "outbox" + : options.OutboxCollection; + + builder.AddMongoRepository(inboxCollection); + builder.AddMongoRepository(outboxCollection); + builder.AddInitializer(); + builder.Services.AddTransient(); + builder.Services.AddTransient(); + builder.Services.AddTransient(); + + BsonClassMap.RegisterClassMap(m => + { + m.AutoMap(); + m.UnmapMember(p => p.Message); + m.UnmapMember(p => p.MessageContext); + }); + + return configurator; + } +} +} \ No newline at end of file diff --git a/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Paralax.MessageBrokers.Outbox.Mongo.csproj b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Paralax.MessageBrokers.Outbox.Mongo.csproj new file mode 100644 index 0000000..4340c46 --- /dev/null +++ b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Paralax.MessageBrokers.Outbox.Mongo.csproj @@ -0,0 +1,16 @@ + + + + net9.0 + enable + enable + + + + + + + + + + diff --git a/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Paralax.MessageBrokers.Outbox.Mongo.sln b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Paralax.MessageBrokers.Outbox.Mongo.sln new file mode 100644 index 0000000..616cd6d --- /dev/null +++ b/src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo/Paralax.MessageBrokers.Outbox.Mongo.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.5.002.0 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Paralax.MessageBrokers.Outbox.Mongo", "Paralax.MessageBrokers.Outbox.Mongo.csproj", "{83E0D5A1-4C34-43E3-8F2E-148D9C521552}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {83E0D5A1-4C34-43E3-8F2E-148D9C521552}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {83E0D5A1-4C34-43E3-8F2E-148D9C521552}.Debug|Any CPU.Build.0 = Debug|Any CPU + {83E0D5A1-4C34-43E3-8F2E-148D9C521552}.Release|Any CPU.ActiveCfg = Release|Any CPU + {83E0D5A1-4C34-43E3-8F2E-148D9C521552}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {75385AFA-7B01-4E95-A9B2-601FC0D83511} + EndGlobalSection +EndGlobal From a70e220bed94eccd94f16a8be45239e2fd888ef2 Mon Sep 17 00:00:00 2001 From: Andrii Voznesenskyi Date: Fri, 27 Sep 2024 17:22:40 +0200 Subject: [PATCH 2/2] (#75) mongo outbox: add scripts [pack-all-force] --- .../scripts/build-and-pack.sh | 35 +++++++++++++++++++ .../scripts/test-and-collect-coverage.sh | 18 ++++++++++ 2 files changed, 53 insertions(+) create mode 100755 src/Paralax.MessageBrokers.Outbox.Mongo/scripts/build-and-pack.sh create mode 100644 src/Paralax.MessageBrokers.Outbox.Mongo/scripts/test-and-collect-coverage.sh diff --git a/src/Paralax.MessageBrokers.Outbox.Mongo/scripts/build-and-pack.sh b/src/Paralax.MessageBrokers.Outbox.Mongo/scripts/build-and-pack.sh new file mode 100755 index 0000000..0b95bd3 --- /dev/null +++ b/src/Paralax.MessageBrokers.Outbox.Mongo/scripts/build-and-pack.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +echo "Executing post-success scripts for branch $GITHUB_REF_NAME" +echo "Starting build and NuGet package creation for Paralax framework..." + +cd src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo + +echo "Restoring NuGet packages..." +dotnet restore + +PACKAGE_VERSION="1.0.$GITHUB_RUN_NUMBER" +echo "Building and packing the Paralax.MessageBrokers.Outbox.Mongo library..." +dotnet pack -c release /p:PackageVersion=$PACKAGE_VERSION --no-restore -o ./nupkg + +PACKAGE_PATH="./nupkg/Paralax.MessageBrokers.Outbox.Mongo.$PACKAGE_VERSION.nupkg" + +if [ -f "$PACKAGE_PATH" ]; then + echo "Checking if the package is already signed..." + if dotnet nuget verify "$PACKAGE_PATH" | grep -q 'Package is signed'; then + echo "Package is already signed, skipping signing." + else + echo "Signing the NuGet package..." + dotnet nuget sign "$PACKAGE_PATH" \ + --certificate-path "$CERTIFICATE_PATH" \ + --timestamper http://timestamp.digicert.com + fi + + echo "Uploading Paralax.MessageBrokers.Outbox.Mongo package to NuGet..." + dotnet nuget push "$PACKAGE_PATH" -k "$NUGET_API_KEY" \ + -s https://api.nuget.org/v3/index.json --skip-duplicate + echo "Package uploaded to NuGet." +else + echo "Error: Package $PACKAGE_PATH not found." + exit 1 +fi diff --git a/src/Paralax.MessageBrokers.Outbox.Mongo/scripts/test-and-collect-coverage.sh b/src/Paralax.MessageBrokers.Outbox.Mongo/scripts/test-and-collect-coverage.sh new file mode 100644 index 0000000..7979034 --- /dev/null +++ b/src/Paralax.MessageBrokers.Outbox.Mongo/scripts/test-and-collect-coverage.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +echo "Running tests and collecting coverage for Paralax.MessageBrokers.Outbox.Mongo..." + +cd src/Paralax.MessageBrokers.Outbox.Mongo/tests/Paralax.MessageBrokers.Outbox.Mongo.Tests + +echo "Restoring NuGet packages..." +dotnet restore + +echo "Running tests and generating code coverage report..." +dotnet test --collect:"XPlat Code Coverage" --results-directory ./TestResults + +# Check if tests succeeded +if [ $? -ne 0 ]; then + echo "Tests failed. Exiting..." + exit 1 +fi +