Skip to content

Commit

Permalink
[InsideOut]: added logger to KafkaConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfabian committed Jan 27, 2024
1 parent bd4996e commit 35419a4
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace Blazor.Sample.Kafka.Consumers;

public class SensorsStreamConsumer : KafkaConsumer<string, SensorsStream>
{
public SensorsStreamConsumer(ConsumerConfig consumerConfig)
: base(TopicNames.SensorsStream, consumerConfig)
public SensorsStreamConsumer(ConsumerConfig consumerConfig, ILogger<SensorsStreamConsumer> logger)
: base(TopicNames.SensorsStream, consumerConfig, logger)
{
consumerConfig.Debug += ",consumer";
}
Expand All @@ -17,8 +17,7 @@ protected override void InterceptConsumerBuilder(ConsumerBuilder<string, Sensors
consumerBuilder
.SetPartitionsRevokedHandler((c, partitions) =>
{
var remaining = c.Assignment.Where(tp =>
partitions.Where(x => x.TopicPartition == tp).Count() == 0);
var remaining = c.Assignment.Where(tp => partitions.All(x => x.TopicPartition != tp));
var message =
"** MapWords consumer group partitions revoked: [" +
Expand Down
3 changes: 1 addition & 2 deletions Samples/InsideOut/Consumer/IKafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ public interface IKafkaConsumer : IDisposable

public interface IKafkaConsumer<TKey, TValue> : IKafkaConsumer
{
IAsyncEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopicAsync(TimeSpan? timeout, CancellationToken cancellationToken = default);
IEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopic(CancellationToken cancellationToken = default);
IEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopic(TimeSpan timeout);

IAsyncEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopicAsync(TimeSpan? timeout, CancellationToken cancellationToken = default);
}
78 changes: 43 additions & 35 deletions Samples/InsideOut/Consumer/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading.Channels;
using Confluent.Kafka;
using InsideOut.Serdes;
using Microsoft.Extensions.Logging;

namespace InsideOut.Consumer;

Expand All @@ -10,12 +11,13 @@ public class KafkaConsumer<TKey, TValue> : IKafkaConsumer<TKey, TValue>
#region Fields

private readonly ConsumerConfig consumerConfig;
private readonly ILogger logger;

#endregion

#region Constructors

public KafkaConsumer(string topicName, ConsumerConfig consumerConfig)
public KafkaConsumer(string topicName, ConsumerConfig consumerConfig, ILogger logger = null)
{
if(topicName == null)
throw new ArgumentNullException(nameof(topicName));
Expand All @@ -26,6 +28,12 @@ public KafkaConsumer(string topicName, ConsumerConfig consumerConfig)
TopicName = topicName;

this.consumerConfig = consumerConfig ?? throw new ArgumentNullException(nameof(consumerConfig));
this.logger = logger;
}

public KafkaConsumer(string topicName, ConsumerConfig consumerConfig)
: this(topicName, consumerConfig, null)
{
}

#endregion
Expand Down Expand Up @@ -85,39 +93,6 @@ public IEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopic(TimeSpan timeout)
return ConnectToTopic(timeout, cancellationTokenSource.Token);
}

private IEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopic(TimeSpan? timeout, CancellationToken cancellationToken = default)
{
if (disposed)
throw new ObjectDisposedException("Cannot access a disposed object.");

using (consumer = CreateConsumer())
{
try
{
OnConnectToTopic();

while (!cancellationToken.IsCancellationRequested && !disposed)
{
ConsumeResult<TKey, TValue> consumeResult;

if (timeout.HasValue)
consumeResult = consumer.Consume(timeout.Value);
else
consumeResult = consumer.Consume(cancellationToken);

yield return consumeResult;

if(consumeResult != null)
LastConsumedOffset = consumeResult.Offset;
}
}
finally
{
Dispose();
}
}
}

public async IAsyncEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopicAsync(TimeSpan? timeout, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var channel = Channel.CreateUnbounded<ConsumeResult<TKey, TValue>>();
Expand Down Expand Up @@ -149,7 +124,7 @@ public async IAsyncEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopicAsync(T
}
catch (OperationCanceledException e)
{
Console.WriteLine(e.Message);
logger?.LogError(e.Message);
}
finally
{
Expand All @@ -173,6 +148,39 @@ public async IAsyncEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopicAsync(T
}
}

private IEnumerable<ConsumeResult<TKey, TValue>> ConnectToTopic(TimeSpan? timeout, CancellationToken cancellationToken = default)
{
if (disposed)
throw new ObjectDisposedException("Cannot access a disposed object.");

using (consumer = CreateConsumer())
{
try
{
OnConnectToTopic();

while (!cancellationToken.IsCancellationRequested && !disposed)
{
ConsumeResult<TKey, TValue> consumeResult;

if (timeout.HasValue)
consumeResult = consumer.Consume(timeout.Value);
else
consumeResult = consumer.Consume(cancellationToken);

yield return consumeResult;

if(consumeResult != null)
LastConsumedOffset = consumeResult.Offset;
}
}
finally
{
Dispose();
}
}
}

private void OnConnectToTopic()
{
consumer.Subscribe(TopicName);
Expand Down
1 change: 1 addition & 0 deletions Samples/InsideOut/InsideOut.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.3.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageReference Include="System.Reactive" Version="6.0.0" />
<PackageReference Include="System.Text.Json" Version="8.0.0" />
</ItemGroup>
Expand Down

0 comments on commit 35419a4

Please sign in to comment.