Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Issues with cooperative-sticky strategy #593

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
7 changes: 7 additions & 0 deletions KafkaFlow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.OpenTelemetry", "samples\KafkaFlow.Sample.OpenTelemetry\KafkaFlow.Sample.OpenTelemetry.csproj", "{E9E8B374-4165-45F2-8DF5-F141E141AC1D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.CooperativeSticky", "samples\KafkaFlow.Sample.CooperativeSticky\KafkaFlow.Sample.CooperativeSticky.csproj", "{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -222,6 +224,10 @@ Global
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E9E8B374-4165-45F2-8DF5-F141E141AC1D}.Release|Any CPU.Build.0 = Release|Any CPU
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -265,6 +271,7 @@ Global
{1755E8DB-970C-4A24-8B7C-A2BEC1410BEE} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068}
{80080C1D-579E-4AB2-935D-5CFFC51843D8} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068}
{E9E8B374-4165-45F2-8DF5-F141E141AC1D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
{DBF7B091-11AE-402F-9F36-7E7EB3901B0B} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB}
Expand Down
44 changes: 44 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/HostedService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using KafkaFlow.Producers;
using Microsoft.Extensions.Hosting;

namespace KafkaFlow.Sample.CooperativeSticky;

public class HostedService : IHostedService
{
private IMessageProducer _producer;
const string producerName = "PrintConsole";
const string topicName = "sample-topic";


public HostedService(IProducerAccessor producerAccessor)
{
_producer = producerAccessor.GetProducer(producerName);
}

public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
while (true)
{
await _producer.ProduceAsync(
topicName,
Guid.NewGuid().ToString(),
new TestMessage { Text = $"Message: {Guid.NewGuid()}" });
await Task.Delay(500, cancellationToken);
}
}
catch (Exception e)
{
Console.WriteLine(e);
}
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<GenerateDocumentationFile>false</GenerateDocumentationFile>
<InvariantGlobalization>true</InvariantGlobalization>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<NoWarn>1701;1702;CS1591;SA1600</NoWarn>
</PropertyGroup>


<ItemGroup>
<ProjectReference Include="..\..\src\KafkaFlow.LogHandler.Console\KafkaFlow.LogHandler.Console.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Microsoft.DependencyInjection\KafkaFlow.Microsoft.DependencyInjection.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow.Serializer.ProtobufNet\KafkaFlow.Serializer.ProtobufNet.csproj" />
<ProjectReference Include="..\..\src\KafkaFlow\KafkaFlow.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
</ItemGroup>


</Project>
18 changes: 18 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/PrintConsoleHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Threading.Tasks;

namespace KafkaFlow.Sample.CooperativeSticky;

public class PrintConsoleHandler : IMessageHandler<TestMessage>
{
public Task Handle(IMessageContext context, TestMessage message)
{
Console.WriteLine(
"Partition: {0} | Offset: {1} | Message: {2}",
context.ConsumerContext.Partition,
context.ConsumerContext.Offset,
message.Text);

return Task.CompletedTask;
}
}
51 changes: 51 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Confluent.Kafka;
using KafkaFlow;
using KafkaFlow.Sample.CooperativeSticky;
using KafkaFlow.Serializer;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using AutoOffsetReset = KafkaFlow.AutoOffsetReset;

const string producerName = "PrintConsole";
const string topicName = "sample-topic";
var hostBuilder = new HostBuilder();
hostBuilder.ConfigureServices(services =>
services.AddHostedService<HostedService>().AddKafka(
kafka => kafka
.UseConsoleLog()
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.CreateTopicIfNotExists(topicName, 6, 1)
.AddProducer(
producerName,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(m => m.AddSerializer<ProtobufNetSerializer>())
)
.AddConsumer(
consumer => consumer
.WithConsumerConfig(new ConsumerConfig
{
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky})
.Topic(topicName)
.WithGroupId("print-console-handler")
.WithBufferSize(100)
.WithWorkersCount(3)
.WithAutoCommitIntervalMs(100)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddDeserializer<ProtobufNetDeserializer>()
.AddTypedHandlers(h => h.AddHandler<PrintConsoleHandler>())
)
)
)
));

var build = hostBuilder.Build();
var kafkaBus = build.Services.CreateKafkaBus();
await kafkaBus.StartAsync();

await build.RunAsync();
await kafkaBus.StopAsync();
28 changes: 28 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# KafkaFlow.Sample

This is a simple sample that shows how to produce and consume messages.

## How to run

### Requirements

- [.NET 6.0 SDK](https://dotnet.microsoft.com/en-us/download/dotnet/6.0)
- [Docker Desktop](https://www.docker.com/products/docker-desktop/)

### Start the cluster

Using your terminal of choice, start the cluster.
You can find a docker-compose file at the root of this repository.
Position the terminal in that folder and run the following command.

```bash
docker-compose up -d
```

### Run the Sample

Using your terminal of choice, start the sample for the sample folder.

```bash
dotnet run
```
10 changes: 10 additions & 0 deletions samples/KafkaFlow.Sample.CooperativeSticky/TestMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Runtime.Serialization;

namespace KafkaFlow.Sample.CooperativeSticky;

[DataContract]
public class TestMessage
{
[DataMember(Order = 1)]
public string Text { get; set; }
}
4 changes: 3 additions & 1 deletion src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Threading.Tasks;
using KafkaFlow.Consumers.DistributionStrategies;
using KafkaFlow.Extensions;

namespace KafkaFlow.Configuration;

Expand Down Expand Up @@ -251,7 +252,8 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
consumerConfigCopy.StatisticsIntervalMs = _consumerConfig.StatisticsIntervalMs ?? _statisticsInterval;

consumerConfigCopy.EnableAutoOffsetStore = false;
consumerConfigCopy.EnableAutoCommit = false;
consumerConfigCopy.EnableAutoCommit = _consumerConfig.PartitionAssignmentStrategy.IsStopTheWorldStrategy() is false;
consumerConfigCopy.AutoCommitIntervalMs = (int?)_autoCommitInterval.TotalMilliseconds;

consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration);

Expand Down
70 changes: 56 additions & 14 deletions src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
using Confluent.Kafka;
using KafkaFlow.Authentication;
using KafkaFlow.Configuration;
using KafkaFlow.Extensions;

namespace KafkaFlow.Consumers;

internal class Consumer : IConsumer
{
private readonly IDependencyResolver _dependencyResolver;
private readonly ILogHandler _logHandler;
private readonly bool _stopTheWorldStrategy;

private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, List<TopicPartition>>>
_partitionsAssignedHandlers = new();
Expand All @@ -40,6 +42,7 @@ public Consumer(
this.Configuration = configuration;
_flowManager = new ConsumerFlowManager(this, _logHandler);
_maxPollIntervalExceeded = new(_logHandler);
_stopTheWorldStrategy = Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy();

foreach (var handler in this.Configuration.StatisticsHandlers)
{
Expand Down Expand Up @@ -148,7 +151,17 @@ public void Commit(IReadOnlyCollection<Confluent.Kafka.TopicPartitionOffset> off
return;
}

_consumer.Commit(validOffsets);
if (_stopTheWorldStrategy)
{
_consumer.Commit(validOffsets);
}
else
{
foreach (var topicPartitionOffset in validOffsets)
{
_consumer.StoreOffset(topicPartitionOffset);
}
}

foreach (var offset in validOffsets)
{
Expand Down Expand Up @@ -237,17 +250,8 @@ private void EnsureConsumer()
var kafkaConfig = this.Configuration.GetKafkaConfig();

var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(kafkaConfig)
.SetPartitionsAssignedHandler(
(consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions))
.SetPartitionsRevokedHandler(
(consumer, partitions) =>
{
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
this.Assignment = new List<TopicPartition>();
this.Subscription = new List<string>();
_currentPartitionsOffsets.Clear();
_flowManager.Stop();
})
.SetPartitionsAssignedHandler(FirePartitionsAssignedHandlers)
.SetPartitionsRevokedHandler(FirePartitionRevokedHandlers)
.SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
.SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)));

Expand Down Expand Up @@ -293,13 +297,51 @@ private void FirePartitionsAssignedHandlers(
IConsumer<byte[], byte[]> consumer,
List<TopicPartition> partitions)
{
this.Assignment = partitions;
if (_stopTheWorldStrategy)
{
this.Assignment = partitions;
this.Subscription = consumer.Subscription;
_flowManager.Start(consumer);
_partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
return;
}

if (partitions.Count == 0)
{
return;
}

this.Assignment = this.Assignment.Union(partitions).ToArray();
this.Subscription = consumer.Subscription;
_flowManager.Stop();
_flowManager.Start(consumer);

_partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
}

private void FirePartitionRevokedHandlers(IConsumer<byte[], byte[]> consumer, List<Confluent.Kafka.TopicPartitionOffset> partitions)
{
if (_stopTheWorldStrategy)
{
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
this.Assignment = new List<TopicPartition>();
this.Subscription = new List<string>();
_currentPartitionsOffsets.Clear();
_flowManager.Stop();
return;
}

this.Assignment = this.Assignment.Except(partitions.Select(x => x.TopicPartition)).ToArray();
this.Subscription = consumer.Subscription;
foreach (var partition in partitions)
{
_currentPartitionsOffsets.TryRemove(partition.TopicPartition, out _);
}

_flowManager.Stop();
_flowManager.Start(consumer);
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
}

private void InvalidateConsumer()
{
_consumer?.Close();
Expand Down
Loading