diff --git a/src/BuildingRegistry.Consumer.Address/ConsumerAddressContext.cs b/src/BuildingRegistry.Consumer.Address/ConsumerAddressContext.cs index 7f83dd0b4..3a99dc933 100644 --- a/src/BuildingRegistry.Consumer.Address/ConsumerAddressContext.cs +++ b/src/BuildingRegistry.Consumer.Address/ConsumerAddressContext.cs @@ -17,7 +17,8 @@ namespace BuildingRegistry.Consumer.Address public class ConsumerAddressContext : SqlServerConsumerDbContext, IAddresses { - public DbSet AddressConsumerItems { get; set; } + public DbSet AddressConsumerItems => Set(); + public DbSet OffsetOverrides => Set(); // This needs to be here to please EF public ConsumerAddressContext() @@ -30,6 +31,12 @@ public ConsumerAddressContext(DbContextOptions options) public override string ProcessedMessagesSchema => Schema.ConsumerAddress; + public OffsetOverride? GetOffsetOverride(string consumerGroupId) + { + return OffsetOverrides + .SingleOrDefault(x => x.ConsumerGroupId == consumerGroupId && x.Configured == false); + } + public AddressData? GetOptional(AddressPersistentLocalId addressPersistentLocalId) { var item = AddressConsumerItems diff --git a/src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs b/src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs index e7fa1aafe..38df33162 100644 --- a/src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs +++ b/src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs @@ -101,7 +101,7 @@ public static async Task Main(string[] args) builder.Register(c => { - var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"]; + var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"]!; var topic = $"{hostContext.Configuration["AddressTopic"]}" ?? throw new ArgumentException("Configuration has no AddressTopic."); var suffix = hostContext.Configuration["GroupSuffix"]; var consumerGroupId = $"BuildingRegistry.ConsumerAddress.{topic}{suffix}"; @@ -113,26 +113,17 @@ public static async Task Main(string[] args) EventsJsonSerializerSettingsProvider.CreateSerializerSettings()); consumerOptions.ConfigureSaslAuthentication(new SaslAuthentication( - hostContext.Configuration["Kafka:SaslUserName"], - hostContext.Configuration["Kafka:SaslPassword"])); + hostContext.Configuration["Kafka:SaslUserName"]!, + hostContext.Configuration["Kafka:SaslPassword"]!)); - var offsetStr = hostContext.Configuration["AddressTopicOffset"]; - if (!string.IsNullOrEmpty(offsetStr) && long.TryParse(offsetStr, out var offset)) - { - var ignoreDataCheck = hostContext.Configuration.GetValue("IgnoreAddressTopicOffsetDataCheck", false); - - if (!ignoreDataCheck) - { - using var ctx = c.Resolve(); + using var ctx = c.Resolve(); + var offsetOverride = ctx.GetOffsetOverride(consumerGroupId); - if (ctx.AddressConsumerItems.Any()) - { - throw new InvalidOperationException( - $"Cannot set Kafka offset to {offset} because {nameof(ctx.AddressConsumerItems)} has data."); - } - } - - consumerOptions.ConfigureOffset(new Offset(offset)); + if (offsetOverride is not null) + { + consumerOptions.ConfigureOffset(new Offset(offsetOverride.Offset)); + offsetOverride.Configured = true; + ctx.SaveChanges(); } return consumerOptions; diff --git a/src/BuildingRegistry.Consumer.Address/Migrations/20241204111246_AddOffsetOverride.Designer.cs b/src/BuildingRegistry.Consumer.Address/Migrations/20241204111246_AddOffsetOverride.Designer.cs new file mode 100644 index 000000000..7d0d1da75 --- /dev/null +++ b/src/BuildingRegistry.Consumer.Address/Migrations/20241204111246_AddOffsetOverride.Designer.cs @@ -0,0 +1,90 @@ +// +using System; +using BuildingRegistry.Consumer.Address; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Metadata; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; + +#nullable disable + +namespace BuildingRegistry.Consumer.Address.Migrations +{ + [DbContext(typeof(ConsumerAddressContext))] + [Migration("20241204111246_AddOffsetOverride")] + partial class AddOffsetOverride + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "8.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 128); + + SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); + + modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b => + { + b.Property("IdempotenceKey") + .HasMaxLength(128) + .HasColumnType("nvarchar(128)"); + + b.Property("DateProcessed") + .HasColumnType("datetimeoffset"); + + b.HasKey("IdempotenceKey"); + + SqlServerKeyBuilderExtensions.IsClustered(b.HasKey("IdempotenceKey")); + + b.HasIndex("DateProcessed"); + + b.ToTable("ProcessedMessages", "BuildingRegistryConsumerAddress"); + }); + + modelBuilder.Entity("BuildingRegistry.Consumer.Address.AddressConsumerItem", b => + { + b.Property("AddressPersistentLocalId") + .HasColumnType("int"); + + b.Property("AddressId") + .HasColumnType("uniqueidentifier"); + + b.Property("IsRemoved") + .HasColumnType("bit"); + + b.Property("Status") + .IsRequired() + .HasColumnType("nvarchar(max)"); + + b.HasKey("AddressPersistentLocalId"); + + SqlServerKeyBuilderExtensions.IsClustered(b.HasKey("AddressPersistentLocalId")); + + b.HasIndex("AddressId"); + + b.HasIndex("IsRemoved"); + + b.ToTable("Addresses", "BuildingRegistryConsumerAddress"); + }); + + modelBuilder.Entity("BuildingRegistry.Consumer.Address.OffsetOverride", b => + { + b.Property("ConsumerGroupId") + .HasColumnType("nvarchar(450)"); + + b.Property("Configured") + .HasColumnType("bit"); + + b.Property("Offset") + .HasColumnType("bigint"); + + b.HasKey("ConsumerGroupId"); + + b.ToTable("OffsetOverrides", "BuildingRegistryConsumerAddress"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/BuildingRegistry.Consumer.Address/Migrations/20241204111246_AddOffsetOverride.cs b/src/BuildingRegistry.Consumer.Address/Migrations/20241204111246_AddOffsetOverride.cs new file mode 100644 index 000000000..b4edb2d99 --- /dev/null +++ b/src/BuildingRegistry.Consumer.Address/Migrations/20241204111246_AddOffsetOverride.cs @@ -0,0 +1,36 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace BuildingRegistry.Consumer.Address.Migrations +{ + /// + public partial class AddOffsetOverride : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.CreateTable( + name: "OffsetOverrides", + schema: "BuildingRegistryConsumerAddress", + columns: table => new + { + ConsumerGroupId = table.Column(type: "nvarchar(450)", nullable: false), + Offset = table.Column(type: "bigint", nullable: false), + Configured = table.Column(type: "bit", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("PK_OffsetOverrides", x => x.ConsumerGroupId); + }); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "OffsetOverrides", + schema: "BuildingRegistryConsumerAddress"); + } + } +} diff --git a/src/BuildingRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs b/src/BuildingRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs index 42068f26b..bbea450a2 100644 --- a/src/BuildingRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs +++ b/src/BuildingRegistry.Consumer.Address/Migrations/ConsumerAddressContextModelSnapshot.cs @@ -17,10 +17,10 @@ protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 modelBuilder - .HasAnnotation("ProductVersion", "6.0.3") + .HasAnnotation("ProductVersion", "8.0.3") .HasAnnotation("Relational:MaxIdentifierLength", 128); - SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1); + SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder); modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b => { @@ -65,6 +65,22 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.ToTable("Addresses", "BuildingRegistryConsumerAddress"); }); + + modelBuilder.Entity("BuildingRegistry.Consumer.Address.OffsetOverride", b => + { + b.Property("ConsumerGroupId") + .HasColumnType("nvarchar(450)"); + + b.Property("Configured") + .HasColumnType("bit"); + + b.Property("Offset") + .HasColumnType("bigint"); + + b.HasKey("ConsumerGroupId"); + + b.ToTable("OffsetOverrides", "BuildingRegistryConsumerAddress"); + }); #pragma warning restore 612, 618 } } diff --git a/src/BuildingRegistry.Consumer.Address/OffsetOverride.cs b/src/BuildingRegistry.Consumer.Address/OffsetOverride.cs new file mode 100644 index 000000000..8b8786d67 --- /dev/null +++ b/src/BuildingRegistry.Consumer.Address/OffsetOverride.cs @@ -0,0 +1,24 @@ +namespace BuildingRegistry.Consumer.Address +{ + using BuildingRegistry.Infrastructure; + using Microsoft.EntityFrameworkCore; + using Microsoft.EntityFrameworkCore.Metadata.Builders; + + public class OffsetOverride + { + public string ConsumerGroupId { get; set; } + public long Offset { get; set; } + public bool Configured { get; set; } + } + + public class OffsetOverrideConfiguration : IEntityTypeConfiguration + { + public const string TableName = "OffsetOverrides"; + + public void Configure(EntityTypeBuilder builder) + { + builder.ToTable(TableName, Schema.ConsumerAddress) + .HasKey(x => x.ConsumerGroupId); + } + } +} diff --git a/src/BuildingRegistry.Consumer.Address/appsettings.json b/src/BuildingRegistry.Consumer.Address/appsettings.json index d2b1babe6..39bc646e5 100644 --- a/src/BuildingRegistry.Consumer.Address/appsettings.json +++ b/src/BuildingRegistry.Consumer.Address/appsettings.json @@ -18,8 +18,6 @@ "GroupSuffix": "", "AddressTopic": "dev.address", - "AddressTopicOffset": "", - "IgnoreAddressTopicOffsetDataCheck": false, "BaseUrl": "https://api.staging-basisregisters.vlaanderen/",