Skip to content

Commit

Permalink
feat(consumer): add offset as projection state to read consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ArneD committed Aug 20, 2024
1 parent 240208b commit daa76af
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
8 changes: 4 additions & 4 deletions paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ nuget Be.Vlaanderen.Basisregisters.Aws.Lambda 4.0.0
nuget Be.Vlaanderen.Basisregisters.EventHandling 5.0.0
nuget Be.Vlaanderen.Basisregisters.EventHandling.Autofac 5.0.0

nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.1.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.1.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer 5.1.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.1.0

nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.LastChangedList 14.0.0
nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore 14.0.0
Expand Down
8 changes: 4 additions & 4 deletions paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -302,21 +302,21 @@ NUGET
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1)
Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.14.1)
Microsoft.CSharp (>= 4.7)
Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.1)
AWSSDK.Core (>= 3.7.302.15)
AWSSDK.SQS (>= 3.7.300.54)
Microsoft.Extensions.Logging (>= 8.0)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer (5.1)
Confluent.Kafka (>= 2.3)
Microsoft.EntityFrameworkCore (>= 8.0.2)
Microsoft.Extensions.Logging (>= 8.0)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.1)
Confluent.Kafka (>= 2.3)
Microsoft.EntityFrameworkCore.SqlServer (>= 8.0.2)
Microsoft.Extensions.Logging (>= 8.0)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.1)
Confluent.Kafka (>= 2.3)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.Middleware.AddProblemJsonHeader (3.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

try
{
await _consumer.ConsumeContinuously(async message =>
await _consumer.ConsumeContinuously(async (message, messageContext) =>
{
_logger.LogInformation("Handling next message");

await using var context = await _consumerParcelDbContextFactory.CreateDbContextAsync(stoppingToken);
await projector.ProjectAsync(context, message, stoppingToken).ConfigureAwait(false);

//CancellationToken.None to prevent halfway consumption
await context.UpdateProjectionState(typeof(ConsumerParcel).FullName, messageContext.Offset, stoppingToken);
await context.SaveChangesAsync(CancellationToken.None);

}, stoppingToken);
Expand Down

0 comments on commit daa76af

Please sign in to comment.