Skip to content

Commit

Permalink
feat: add snapshot reproducer
Browse files Browse the repository at this point in the history
  • Loading branch information
jvandaal committed Oct 21, 2024
1 parent 63a44c1 commit 4a886de
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
namespace ParcelRegistry.Producer.Snapshot.Oslo.Infrastructure.Modules
{
using System;
using Amazon.SimpleNotificationService;
using Autofac;
using Autofac.Extensions.DependencyInjection;
using Be.Vlaanderen.Basisregisters.Api.Exceptions;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Be.Vlaanderen.Basisregisters.EventHandling.Autofac;
using Be.Vlaanderen.Basisregisters.GrAr.Notifications;
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
Expand All @@ -16,6 +18,7 @@ namespace ParcelRegistry.Producer.Snapshot.Oslo.Infrastructure.Modules
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NodaTime;
using ParcelRegistry.Infrastructure;

public class ApiModule : Module
Expand All @@ -36,7 +39,12 @@ public ApiModule(

protected override void Load(ContainerBuilder builder)
{
builder.Register(_ => SystemClock.Instance)
.As<IClock>()
.SingleInstance();

RegisterProjectionSetup(builder);
RegisterReproducers();

builder
.RegisterType<ProblemDetailsHelper>()
Expand All @@ -52,11 +60,8 @@ private void RegisterProjectionSetup(ContainerBuilder builder)
new EventHandlingModule(
typeof(DomainAssemblyMarker).Assembly,
EventsJsonSerializerSettingsProvider.CreateSerializerSettings()))

.RegisterModule<EnvelopeModule>()

.RegisterEventStreamModule(_configuration)

.RegisterModule(new ProjectorModule(_configuration));

RegisterProjections(builder);
Expand All @@ -83,22 +88,7 @@ private void RegisterProjections(ContainerBuilder builder)
_loggerFactory)
.RegisterProjections<ProducerProjections, ProducerContext>(c =>
{
var bootstrapServers = _configuration["Kafka:BootstrapServers"]!;
var topic = $"{_configuration[ProducerProjections.TopicKey]}" ?? throw new ArgumentException($"Configuration has no value for {ProducerProjections.TopicKey}");
var producerOptions = new ProducerOptions(
new BootstrapServers(bootstrapServers),
new Topic(topic),
true,
EventsJsonSerializerSettingsProvider.CreateSerializerSettings())
.ConfigureEnableIdempotence();
if (!string.IsNullOrEmpty(_configuration["Kafka:SaslUserName"])
&& !string.IsNullOrEmpty(_configuration["Kafka:SaslPassword"]))
{
producerOptions.ConfigureSaslAuthentication(new SaslAuthentication(
_configuration["Kafka:SaslUserName"]!,
_configuration["Kafka:SaslPassword"]!));
}

var producerOptions = CreateProducerOptions();
var osloProxy = c.Resolve<IOsloProxy>();

return new ProducerProjections(
Expand All @@ -113,5 +103,51 @@ private void RegisterProjections(ContainerBuilder builder)
},
connectedProjectionSettings);
}

private void RegisterReproducers()
{
_services.AddAWSService<IAmazonSimpleNotificationService>();
_services.AddSingleton<INotificationService>(sp =>
new NotificationService(sp.GetRequiredService<IAmazonSimpleNotificationService>(),
_configuration.GetValue<string>("NotificationTopicArn")!));

var connectionString = _configuration.GetConnectionString("Integration");
var utcHourToRunWithin = _configuration.GetValue<int>("SnapshotReproducerUtcHour");

_services.AddHostedService<SnapshotReproducer>(provider =>
{
var producerOptions = CreateProducerOptions();
return new SnapshotReproducer(
connectionString!,
provider.GetRequiredService<IOsloProxy>(),
new Producer(producerOptions),
provider.GetRequiredService<IClock>(),
provider.GetRequiredService<INotificationService>(),
utcHourToRunWithin,
_loggerFactory);
});
}

private ProducerOptions CreateProducerOptions()
{
var bootstrapServers = _configuration["Kafka:BootstrapServers"]!;
var topic = $"{_configuration[ProducerProjections.TopicKey]}" ??
throw new ArgumentException($"Configuration has no value for {ProducerProjections.TopicKey}");
var producerOptions = new ProducerOptions(
new BootstrapServers(bootstrapServers),
new Topic(topic),
true,
EventsJsonSerializerSettingsProvider.CreateSerializerSettings())
.ConfigureEnableIdempotence();
if (!string.IsNullOrEmpty(_configuration["Kafka:SaslUserName"])
&& !string.IsNullOrEmpty(_configuration["Kafka:SaslPassword"]))
{
producerOptions.ConfigureSaslAuthentication(new SaslAuthentication(
_configuration["Kafka:SaslUserName"]!,
_configuration["Kafka:SaslPassword"]!));
}

return producerOptions;
}
}
}
150 changes: 150 additions & 0 deletions src/ParcelRegistry.Producer.Snapshot.Oslo/SnapshotReproducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
namespace ParcelRegistry.Producer.Snapshot.Oslo
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Be.Vlaanderen.Basisregisters.GrAr.Notifications;
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
using Dapper;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NodaTime;
using Npgsql;

public class SnapshotReproducer : BackgroundService
{
private readonly string _integrationConnectionString;
private readonly IOsloProxy _osloProxy;
private readonly IProducer _producer;
private readonly IClock _clock;
private readonly INotificationService _notificationService;
private readonly int _utcHourToRunWithin;
private readonly ILogger<SnapshotReproducer> _logger;

public SnapshotReproducer(
string integrationConnectionString,
IOsloProxy osloProxy,
IProducer producer,
IClock clock,
INotificationService notificationService,
int utcHourToRunWithin,
ILoggerFactory loggerFactory)
{
_integrationConnectionString = integrationConnectionString;
_osloProxy = osloProxy;
_producer = producer;
_notificationService = notificationService;
_utcHourToRunWithin = utcHourToRunWithin;
_clock = clock;

_logger = loggerFactory.CreateLogger<SnapshotReproducer>();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var now = _clock.GetCurrentInstant().ToDateTimeUtc();
if (now.Hour == _utcHourToRunWithin)
{
_logger.LogInformation($"Starting {GetType().Name}");

try
{
//execute query
var idsToProcess = GetIdsToProcess(now);

//reproduce
foreach (var id in idsToProcess)
{
await FindAndProduce(async () =>
await _osloProxy.GetSnapshot(id.CaPaKey, stoppingToken),
id.Position,
stoppingToken);
}

await Task.Delay(TimeSpan.FromHours(1), stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, ex.Message);

await _notificationService.PublishToTopicAsync(new NotificationMessage(
GetType().Name,
$"Reproducing snapshot failed: {ex}",
GetType().Name,
NotificationSeverity.Danger));
}
}

await Task.Delay(TimeSpan.FromMinutes(15), stoppingToken);
}
}

private async Task FindAndProduce(Func<Task<OsloResult?>> findMatchingSnapshot, long storePosition, CancellationToken ct)
{
var result = await findMatchingSnapshot.Invoke();

if (result != null)
{
await Produce(result.Identificator.Id, result.Identificator.ObjectId, result.JsonContent, storePosition, ct);
}
}

private async Task Produce(string puri, string objectId, string jsonContent, long storePosition, CancellationToken cancellationToken = default)
{
var result = await _producer.Produce(
new MessageKey(puri),
jsonContent,
new List<MessageHeader> { new(MessageHeader.IdempotenceKey, $"{objectId}-{storePosition.ToString()}") },
cancellationToken);

if (!result.IsSuccess)
{
throw new InvalidOperationException(result.Error + Environment.NewLine + result.ErrorReason); //TODO: create custom exception
}
}

private List<(string CaPaKey, long Position)> GetIdsToProcess(DateTime utcNow)
{
using var connection = new NpgsqlConnection(_integrationConnectionString);

var todayMidnight = utcNow.Date;
var yesterdayMidnight = todayMidnight.AddDays(-1);

var records = connection.Query<ParcelPosition>(
$"""
SELECT capakey, position, version_timestamp
FROM integration_parcel.parcel_versions
where version_timestamp >= '{yesterdayMidnight:yyyy-MM-dd}' and version_timestamp < '{todayMidnight:yyyy-MM-dd}'
""");

var duplicateEvents = records
.GroupBy(x => new
{
x.capakey,
TimeStamp = x.version_timestamp.ToString("yyyyMMddHHmmss") // Format the timestamp to seconds as OSLO API doesn't return the milliseconds of the timestamp
})
.Where(x => x.Count() > 1)
.Select(x =>
{
var latest = x.MaxBy(y => y.position)!;
return (latest.capakey, latest.position);
})
.ToList();

return duplicateEvents;
}

private sealed class ParcelPosition
{
public string capakey { get; init; }
public long position { get; init; }
public DateTimeOffset version_timestamp { get; init; }
}
}
}
4 changes: 4 additions & 0 deletions src/ParcelRegistry.Producer.Snapshot.Oslo/appsettings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"ConnectionStrings": {
"Events": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.ParcelRegistry;Trusted_Connection=True;TrustServerCertificate=True;",
"Integration": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.AddressRegistry;Trusted_Connection=True;TrustServerCertificate=True;",
"ProducerSnapshotProjections": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.ParcelRegistry;Trusted_Connection=True;TrustServerCertificate=True;",
"ProducerSnapshotProjectionsAdmin": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.ParcelRegistry;Trusted_Connection=True;TrustServerCertificate=True;"
},
Expand All @@ -16,6 +17,9 @@
"RetryBackoffFactor": 5
},

"NotificationTopicArn": "",
"SnapshotReproducerUtcHour": 1,

"Kafka": {
"BootstrapServers": "localhost:29092/"
},
Expand Down
7 changes: 7 additions & 0 deletions src/ParcelRegistry.Producer.Snapshot.Oslo/paket.references
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Be.Vlaanderen.Basisregisters.Api
Be.Vlaanderen.Basisregisters.EventHandling.Autofac
Be.Vlaanderen.Basisregisters.GrAr.Legacy
Be.Vlaanderen.Basisregisters.GrAr.Notifications
Be.Vlaanderen.Basisregisters.GrAr.Oslo
Be.Vlaanderen.BasisRegisters.MessageHandling.Kafka.Producer
Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Autofac
Expand All @@ -9,7 +10,13 @@ Be.Vlaanderen.Basisregisters.Projector

Microsoft.Extensions.DependencyInjection

AWSSDK.Extensions.NETCore.Setup

AspNetCore.HealthChecks.SqlServer
AspNetCore.HealthChecks.NpgSql

Npgsql.EntityFrameworkCore.PostgreSQL
Dapper

Datadog.Trace.Bundle

Expand Down

0 comments on commit 4a886de

Please sign in to comment.