Skip to content

Commit

Permalink
feat: configure consumer offset without deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
jvandaal authored Dec 4, 2024
1 parent f105dd9 commit 21b9669
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ namespace BuildingRegistry.Consumer.Address

public class ConsumerAddressContext : SqlServerConsumerDbContext<ConsumerAddressContext>, IAddresses
{
public DbSet<AddressConsumerItem> AddressConsumerItems { get; set; }
public DbSet<AddressConsumerItem> AddressConsumerItems => Set<AddressConsumerItem>();
public DbSet<OffsetOverride> OffsetOverrides => Set<OffsetOverride>();

// This needs to be here to please EF
public ConsumerAddressContext()
Expand All @@ -30,6 +31,12 @@ public ConsumerAddressContext(DbContextOptions<ConsumerAddressContext> options)

public override string ProcessedMessagesSchema => Schema.ConsumerAddress;

public OffsetOverride? GetOffsetOverride(string consumerGroupId)
{
return OffsetOverrides
.SingleOrDefault(x => x.ConsumerGroupId == consumerGroupId && x.Configured == false);
}

public AddressData? GetOptional(AddressPersistentLocalId addressPersistentLocalId)
{
var item = AddressConsumerItems
Expand Down
29 changes: 10 additions & 19 deletions src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static async Task Main(string[] args)

builder.Register(c =>
{
var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"];
var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"]!;
var topic = $"{hostContext.Configuration["AddressTopic"]}" ?? throw new ArgumentException("Configuration has no AddressTopic.");
var suffix = hostContext.Configuration["GroupSuffix"];
var consumerGroupId = $"BuildingRegistry.ConsumerAddress.{topic}{suffix}";
Expand All @@ -113,26 +113,17 @@ public static async Task Main(string[] args)
EventsJsonSerializerSettingsProvider.CreateSerializerSettings());

consumerOptions.ConfigureSaslAuthentication(new SaslAuthentication(
hostContext.Configuration["Kafka:SaslUserName"],
hostContext.Configuration["Kafka:SaslPassword"]));
hostContext.Configuration["Kafka:SaslUserName"]!,
hostContext.Configuration["Kafka:SaslPassword"]!));

var offsetStr = hostContext.Configuration["AddressTopicOffset"];
if (!string.IsNullOrEmpty(offsetStr) && long.TryParse(offsetStr, out var offset))
{
var ignoreDataCheck = hostContext.Configuration.GetValue<bool>("IgnoreAddressTopicOffsetDataCheck", false);

if (!ignoreDataCheck)
{
using var ctx = c.Resolve<ConsumerAddressContext>();
using var ctx = c.Resolve<ConsumerAddressContext>();
var offsetOverride = ctx.GetOffsetOverride(consumerGroupId);

if (ctx.AddressConsumerItems.Any())
{
throw new InvalidOperationException(
$"Cannot set Kafka offset to {offset} because {nameof(ctx.AddressConsumerItems)} has data.");
}
}

consumerOptions.ConfigureOffset(new Offset(offset));
if (offsetOverride is not null)
{
consumerOptions.ConfigureOffset(new Offset(offsetOverride.Offset));
offsetOverride.Configured = true;
ctx.SaveChanges();
}

return consumerOptions;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace BuildingRegistry.Consumer.Address.Migrations
{
/// <inheritdoc />
public partial class AddOffsetOverride : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "OffsetOverrides",
schema: "BuildingRegistryConsumerAddress",
columns: table => new
{
ConsumerGroupId = table.Column<string>(type: "nvarchar(450)", nullable: false),
Offset = table.Column<long>(type: "bigint", nullable: false),
Configured = table.Column<bool>(type: "bit", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_OffsetOverrides", x => x.ConsumerGroupId);
});
}

/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "OffsetOverrides",
schema: "BuildingRegistryConsumerAddress");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "6.0.3")
.HasAnnotation("ProductVersion", "8.0.3")
.HasAnnotation("Relational:MaxIdentifierLength", 128);

SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1);
SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder);

modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b =>
{
Expand Down Expand Up @@ -65,6 +65,22 @@ protected override void BuildModel(ModelBuilder modelBuilder)

b.ToTable("Addresses", "BuildingRegistryConsumerAddress");
});

modelBuilder.Entity("BuildingRegistry.Consumer.Address.OffsetOverride", b =>
{
b.Property<string>("ConsumerGroupId")
.HasColumnType("nvarchar(450)");

b.Property<bool>("Configured")
.HasColumnType("bit");

b.Property<long>("Offset")
.HasColumnType("bigint");

b.HasKey("ConsumerGroupId");

b.ToTable("OffsetOverrides", "BuildingRegistryConsumerAddress");
});
#pragma warning restore 612, 618
}
}
Expand Down
24 changes: 24 additions & 0 deletions src/BuildingRegistry.Consumer.Address/OffsetOverride.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace BuildingRegistry.Consumer.Address
{
using BuildingRegistry.Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

public class OffsetOverride
{
public string ConsumerGroupId { get; set; }
public long Offset { get; set; }
public bool Configured { get; set; }
}

public class OffsetOverrideConfiguration : IEntityTypeConfiguration<OffsetOverride>
{
public const string TableName = "OffsetOverrides";

public void Configure(EntityTypeBuilder<OffsetOverride> builder)
{
builder.ToTable(TableName, Schema.ConsumerAddress)
.HasKey(x => x.ConsumerGroupId);
}
}
}
2 changes: 0 additions & 2 deletions src/BuildingRegistry.Consumer.Address/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

"GroupSuffix": "",
"AddressTopic": "dev.address",
"AddressTopicOffset": "",
"IgnoreAddressTopicOffsetDataCheck": false,

"BaseUrl": "https://api.staging-basisregisters.vlaanderen/",

Expand Down

0 comments on commit 21b9669

Please sign in to comment.