Skip to content

Commit

Permalink
Merge pull request #76 from itsharppro/dev [pack-all-force]
Browse files Browse the repository at this point in the history
(#75) mongo outbox: add classlib [pack-all-force]
  • Loading branch information
SaintAngeLs authored Sep 27, 2024
2 parents e16661a + a70e220 commit 95ccc4f
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 0 deletions.
35 changes: 35 additions & 0 deletions src/Paralax.MessageBrokers.Outbox.Mongo/scripts/build-and-pack.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash

echo "Executing post-success scripts for branch $GITHUB_REF_NAME"
echo "Starting build and NuGet package creation for Paralax framework..."

cd src/Paralax.MessageBrokers.Outbox.Mongo/src/Paralax.MessageBrokers.Outbox.Mongo

echo "Restoring NuGet packages..."
dotnet restore

PACKAGE_VERSION="1.0.$GITHUB_RUN_NUMBER"
echo "Building and packing the Paralax.MessageBrokers.Outbox.Mongo library..."
dotnet pack -c release /p:PackageVersion=$PACKAGE_VERSION --no-restore -o ./nupkg

PACKAGE_PATH="./nupkg/Paralax.MessageBrokers.Outbox.Mongo.$PACKAGE_VERSION.nupkg"

if [ -f "$PACKAGE_PATH" ]; then
echo "Checking if the package is already signed..."
if dotnet nuget verify "$PACKAGE_PATH" | grep -q 'Package is signed'; then
echo "Package is already signed, skipping signing."
else
echo "Signing the NuGet package..."
dotnet nuget sign "$PACKAGE_PATH" \
--certificate-path "$CERTIFICATE_PATH" \
--timestamper http://timestamp.digicert.com
fi

echo "Uploading Paralax.MessageBrokers.Outbox.Mongo package to NuGet..."
dotnet nuget push "$PACKAGE_PATH" -k "$NUGET_API_KEY" \
-s https://api.nuget.org/v3/index.json --skip-duplicate
echo "Package uploaded to NuGet."
else
echo "Error: Package $PACKAGE_PATH not found."
exit 1
fi
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

echo "Running tests and collecting coverage for Paralax.MessageBrokers.Outbox.Mongo..."

cd src/Paralax.MessageBrokers.Outbox.Mongo/tests/Paralax.MessageBrokers.Outbox.Mongo.Tests

echo "Restoring NuGet packages..."
dotnet restore

echo "Running tests and generating code coverage report..."
dotnet test --collect:"XPlat Code Coverage" --results-directory ./TestResults

# Check if tests succeeded
if [ $? -ne 0 ]; then
echo "Tests failed. Exiting..."
exit 1
fi

Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
using Paralax;
using Paralax.MessageBrokers.Outbox;
using Paralax.MessageBrokers.Outbox.Messages;
using Paralax.Persistence.MongoDB;

namespace Convey.MessageBrokers.Outbox.Mongo.Internals
{
internal sealed class MongoMessageOutbox : IMessageOutbox, IMessageOutboxAccessor
{
private static readonly JsonSerializerOptions SerializerOptions = new()
{
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
NumberHandling = JsonNumberHandling.AllowReadingFromString,
Converters = { new JsonStringEnumConverter(JsonNamingPolicy.CamelCase) }
};

private const string EmptyJsonObject = "{}";
private readonly IMongoSessionFactory _sessionFactory;
private readonly IMongoRepository<InboxMessage, string> _inboxRepository;
private readonly IMongoRepository<OutboxMessage, string> _outboxRepository;
private readonly ILogger<MongoMessageOutbox> _logger;
private readonly bool _transactionsEnabled;

public bool Enabled { get; }

public MongoMessageOutbox(IMongoSessionFactory sessionFactory,
IMongoRepository<InboxMessage, string> inboxRepository,
IMongoRepository<OutboxMessage, string> outboxRepository,
OutboxOptions options, ILogger<MongoMessageOutbox> logger)
{
_sessionFactory = sessionFactory;
_inboxRepository = inboxRepository;
_outboxRepository = outboxRepository;
_logger = logger;
_transactionsEnabled = !options.DisableTransactions;
Enabled = options.Enabled;
}

public async Task HandleAsync(string messageId, Func<Task> handler)
{
if (!Enabled)
{
_logger.LogWarning("Outbox is disabled, incoming messages won't be processed.");
return;
}

if (string.IsNullOrWhiteSpace(messageId))
{
throw new ArgumentException("Message id to be processed cannot be empty.", nameof(messageId));
}

_logger.LogTrace($"Received a message with id: '{messageId}' to be processed.");
if (await _inboxRepository.ExistsAsync(m => m.Id == messageId))
{
_logger.LogTrace($"Message with id: '{messageId}' was already processed.");
return;
}

IClientSessionHandle session = null;
if (_transactionsEnabled)
{
session = await _sessionFactory.CreateAsync();
session.StartTransaction();
}

try
{
_logger.LogTrace($"Processing a message with id: '{messageId}'...");
await handler();
await _inboxRepository.AddAsync(new InboxMessage
{
Id = messageId,
ProcessedAt = DateTime.UtcNow
});

if (session is not null)
{
await session.CommitTransactionAsync();
}

_logger.LogTrace($"Processed a message with id: '{messageId}'.");
}
catch (Exception ex)
{
_logger.LogError(ex, $"There was an error when processing a message with id: '{messageId}'.");
if (session is not null)
{
await session.AbortTransactionAsync();
}

throw;
}
finally
{
session?.Dispose();
}
}

public async Task SendAsync<T>(T message, string originatedMessageId = null, string messageId = null,
string correlationId = null, string spanContext = null, object messageContext = null,
IDictionary<string, object> headers = null) where T : class
{
if (!Enabled)
{
_logger.LogWarning("Outbox is disabled, outgoing messages won't be saved into the storage.");
return;
}

var outboxMessage = new OutboxMessage
{
Id = string.IsNullOrWhiteSpace(messageId) ? Guid.NewGuid().ToString("N") : messageId,
OriginatedMessageId = originatedMessageId,
CorrelationId = correlationId,
SpanContext = spanContext,
SerializedMessageContext =
messageContext is null
? EmptyJsonObject
: JsonSerializer.Serialize(messageContext, SerializerOptions),
MessageContextType = messageContext?.GetType().AssemblyQualifiedName,
Headers = (Dictionary<string, object>) headers,
SerializedMessage = message is null
? EmptyJsonObject
: JsonSerializer.Serialize(message, SerializerOptions),
MessageType = message?.GetType().AssemblyQualifiedName,
SentAt = DateTime.UtcNow
};
await _outboxRepository.AddAsync(outboxMessage);
}

async Task<IReadOnlyList<OutboxMessage>> IMessageOutboxAccessor.GetUnsentAsync()
{
var outboxMessages = await _outboxRepository.FindAsync(om => om.ProcessedAt == null);
return outboxMessages.Select(om =>
{
if (om.MessageContextType is not null)
{
var messageContextType = Type.GetType(om.MessageContextType);
om.MessageContext = JsonSerializer.Deserialize(om.SerializedMessageContext, messageContextType,
SerializerOptions);
}

if (om.MessageType is not null)
{
var messageType = Type.GetType(om.MessageType);
om.Message = JsonSerializer.Deserialize(om.SerializedMessage, messageType, SerializerOptions);
}

return om;
}).ToList();
}

Task IMessageOutboxAccessor.ProcessAsync(OutboxMessage message)
=> _outboxRepository.Collection.UpdateOneAsync(
Builders<OutboxMessage>.Filter.Eq(m => m.Id, message.Id),
Builders<OutboxMessage>.Update.Set(m => m.ProcessedAt, DateTime.UtcNow));

Task IMessageOutboxAccessor.ProcessAsync(IEnumerable<OutboxMessage> outboxMessages)
=> _outboxRepository.Collection.UpdateManyAsync(
Builders<OutboxMessage>.Filter.In(m => m.Id, outboxMessages.Select(m => m.Id)),
Builders<OutboxMessage>.Update.Set(m => m.ProcessedAt, DateTime.UtcNow));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using System;
using System.Threading.Tasks;
using MongoDB.Driver;
using Paralax;
using Paralax.MessageBrokers.Outbox;
using Paralax.MessageBrokers.Outbox.Messages;
using Paralax.Types;

namespace Convey.MessageBrokers.Outbox.Paralax.Internals
{
internal sealed class MongoOutboxInitializer : IInitializer
{
private readonly IMongoDatabase _database;
private readonly OutboxOptions _options;

public MongoOutboxInitializer(IMongoDatabase database, OutboxOptions options)
{
_database = database;
_options = options;
}

public async Task InitializeAsync()
{
if (!_options.Enabled)
{
return;
}

if (_options.Expiry <= 0)
{
return;
}

var inboxCollection = string.IsNullOrWhiteSpace(_options.InboxCollection)
? "inbox"
: _options.InboxCollection;
var builder = Builders<InboxMessage>.IndexKeys;
await _database.GetCollection<InboxMessage>(inboxCollection)
.Indexes.CreateOneAsync(
new CreateIndexModel<InboxMessage>(builder.Ascending(i => i.ProcessedAt),
new CreateIndexOptions
{
ExpireAfter = TimeSpan.FromSeconds(_options.Expiry)
}));

var outboxCollection = string.IsNullOrWhiteSpace(_options.OutboxCollection)
? "outbox"
: _options.OutboxCollection;
var outboxBuilder = Builders<OutboxMessage>.IndexKeys;
await _database.GetCollection<OutboxMessage>(outboxCollection)
.Indexes.CreateOneAsync(
new CreateIndexModel<OutboxMessage>(outboxBuilder.Ascending(i => i.ProcessedAt),
new CreateIndexOptions
{
ExpireAfter = TimeSpan.FromSeconds(_options.Expiry)
}));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Convey.MessageBrokers.Outbox.Mongo.Internals;
using Convey.MessageBrokers.Outbox.Paralax.Internals;
using Microsoft.Extensions.DependencyInjection;
using MongoDB.Bson.Serialization;
using Paralax.MessageBrokers.Outbox.Messages;
using Paralax.Persistence.MongoDB;

namespace Paralax.MessageBrokers.Outbox.Mongo
{
public static class Extensions
{
public static IMessageOutboxConfigurator AddMongo(this IMessageOutboxConfigurator configurator)
{
var builder = configurator.Builder;
var options = configurator.Options;

var inboxCollection = string.IsNullOrWhiteSpace(options.InboxCollection)
? "inbox"
: options.InboxCollection;
var outboxCollection = string.IsNullOrWhiteSpace(options.OutboxCollection)
? "outbox"
: options.OutboxCollection;

builder.AddMongoRepository<InboxMessage, string>(inboxCollection);
builder.AddMongoRepository<OutboxMessage, string>(outboxCollection);
builder.AddInitializer<MongoOutboxInitializer>();
builder.Services.AddTransient<IMessageOutbox, MongoMessageOutbox>();
builder.Services.AddTransient<IMessageOutboxAccessor, MongoMessageOutbox>();
builder.Services.AddTransient<MongoOutboxInitializer>();

BsonClassMap.RegisterClassMap<OutboxMessage>(m =>
{
m.AutoMap();
m.UnmapMember(p => p.Message);
m.UnmapMember(p => p.MessageContext);
});

return configurator;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MongoDB.Driver" Version="2.29.0" />
<PackageReference Include="Paralax" Version="1.0.175" />
<PackageReference Include="Paralax.MessageBrokers.Outbox" Version="1.0.169" />
<PackageReference Include="Paralax.Persistence.MongoDB" Version="1.0.175" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.002.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Paralax.MessageBrokers.Outbox.Mongo", "Paralax.MessageBrokers.Outbox.Mongo.csproj", "{83E0D5A1-4C34-43E3-8F2E-148D9C521552}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{83E0D5A1-4C34-43E3-8F2E-148D9C521552}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{83E0D5A1-4C34-43E3-8F2E-148D9C521552}.Debug|Any CPU.Build.0 = Debug|Any CPU
{83E0D5A1-4C34-43E3-8F2E-148D9C521552}.Release|Any CPU.ActiveCfg = Release|Any CPU
{83E0D5A1-4C34-43E3-8F2E-148D9C521552}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {75385AFA-7B01-4E95-A9B2-601FC0D83511}
EndGlobalSection
EndGlobal

0 comments on commit 95ccc4f

Please sign in to comment.