diff --git a/paket.dependencies b/paket.dependencies index 83545f581..7147fe2f5 100644 --- a/paket.dependencies +++ b/paket.dependencies @@ -57,10 +57,10 @@ nuget Be.Vlaanderen.Basisregisters.Aws.Lambda 4.0.0 nuget Be.Vlaanderen.Basisregisters.EventHandling 6.0.0 nuget Be.Vlaanderen.Basisregisters.EventHandling.Autofac 6.0.0 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.1.0 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.1.0 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer 5.1.0 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.1.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.2.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.2.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer 5.2.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.2.0 nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.LastChangedList 14.0.0 nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore 14.0.0 diff --git a/paket.lock b/paket.lock index de784e1b9..dd797c4be 100644 --- a/paket.lock +++ b/paket.lock @@ -312,21 +312,21 @@ NUGET Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1) Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.14.1) Microsoft.CSharp (>= 4.7) - Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.1) + Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.2) AWSSDK.Core (>= 3.7.302.15) AWSSDK.SQS (>= 3.7.300.54) Microsoft.Extensions.Logging (>= 8.0) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer (5.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer (5.2) Confluent.Kafka (>= 2.3) Microsoft.EntityFrameworkCore (>= 8.0.2) Microsoft.Extensions.Logging (>= 8.0) Newtonsoft.Json (>= 13.0.3) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.2) Confluent.Kafka (>= 2.3) Microsoft.EntityFrameworkCore.SqlServer (>= 8.0.2) Microsoft.Extensions.Logging (>= 8.0) Newtonsoft.Json (>= 13.0.3) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.2) Confluent.Kafka (>= 2.3) Newtonsoft.Json (>= 13.0.3) Be.Vlaanderen.Basisregisters.Middleware.AddProblemJsonHeader (3.0) diff --git a/src/BuildingRegistry.Consumer.Address/ConsumerAddressContext.cs b/src/BuildingRegistry.Consumer.Address/ConsumerAddressContext.cs index 3a99dc933..947040a62 100644 --- a/src/BuildingRegistry.Consumer.Address/ConsumerAddressContext.cs +++ b/src/BuildingRegistry.Consumer.Address/ConsumerAddressContext.cs @@ -6,6 +6,7 @@ namespace BuildingRegistry.Consumer.Address using System.Linq; using System.Reflection; using System.Threading.Tasks; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer; using Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.SqlServer.MigrationExtensions; using Building; @@ -15,7 +16,7 @@ namespace BuildingRegistry.Consumer.Address using Microsoft.EntityFrameworkCore.Design; using Microsoft.Extensions.Configuration; - public class ConsumerAddressContext : SqlServerConsumerDbContext, IAddresses + public class ConsumerAddressContext : SqlServerConsumerDbContext, IAddresses, IOffsetOverrideDbSet { public DbSet AddressConsumerItems => Set(); public DbSet OffsetOverrides => Set(); @@ -101,6 +102,7 @@ protected override void OnModelCreating(ModelBuilder modelBuilder) base.OnModelCreating(modelBuilder); modelBuilder.ApplyConfigurationsFromAssembly(typeof(ConsumerAddressContext).GetTypeInfo().Assembly); + modelBuilder.ApplyConfiguration(new OffsetOverrideConfiguration(Schema.ConsumerAddress)); } } diff --git a/src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs b/src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs index 38df33162..b1c4b78d7 100644 --- a/src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs +++ b/src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs @@ -2,7 +2,6 @@ namespace BuildingRegistry.Consumer.Address.Infrastructure { using System; using System.IO; - using System.Linq; using System.Threading; using System.Threading.Tasks; using Api.BackOffice.Abstractions; @@ -13,6 +12,7 @@ namespace BuildingRegistry.Consumer.Address.Infrastructure using Be.Vlaanderen.Basisregisters.EventHandling; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka; using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer; + using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.Extensions; using Building; using BuildingRegistry.Infrastructure; using BuildingRegistry.Infrastructure.Modules; @@ -117,14 +117,7 @@ public static async Task Main(string[] args) hostContext.Configuration["Kafka:SaslPassword"]!)); using var ctx = c.Resolve(); - var offsetOverride = ctx.GetOffsetOverride(consumerGroupId); - - if (offsetOverride is not null) - { - consumerOptions.ConfigureOffset(new Offset(offsetOverride.Offset)); - offsetOverride.Configured = true; - ctx.SaveChanges(); - } + ctx.OverrideConfigureOffset(consumerOptions); return consumerOptions; }); diff --git a/src/BuildingRegistry.Consumer.Address/OffsetOverride.cs b/src/BuildingRegistry.Consumer.Address/OffsetOverride.cs deleted file mode 100644 index 8b8786d67..000000000 --- a/src/BuildingRegistry.Consumer.Address/OffsetOverride.cs +++ /dev/null @@ -1,24 +0,0 @@ -namespace BuildingRegistry.Consumer.Address -{ - using BuildingRegistry.Infrastructure; - using Microsoft.EntityFrameworkCore; - using Microsoft.EntityFrameworkCore.Metadata.Builders; - - public class OffsetOverride - { - public string ConsumerGroupId { get; set; } - public long Offset { get; set; } - public bool Configured { get; set; } - } - - public class OffsetOverrideConfiguration : IEntityTypeConfiguration - { - public const string TableName = "OffsetOverrides"; - - public void Configure(EntityTypeBuilder builder) - { - builder.ToTable(TableName, Schema.ConsumerAddress) - .HasKey(x => x.ConsumerGroupId); - } - } -}