diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs
index 4e8c00477..e710b6263 100644
--- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs
+++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs
@@ -20,12 +20,14 @@ public static class ClusterConfigurationBuilderExtensions
/// The topic to be used by the admin commands
/// The consumer group prefix
/// The partition used to produce and consumer admin messages
+ /// To produce admin messages only
///
public static IClusterConfigurationBuilder EnableAdminMessages(
this IClusterConfigurationBuilder cluster,
string topic,
string consumerGroup = null,
- int topicPartition = 0)
+ int topicPartition = 0,
+ bool producerOnly = false)
{
consumerGroup ??= $"Admin-{Assembly.GetEntryAssembly()!.GetName().Name}";
@@ -33,45 +35,54 @@ public static IClusterConfigurationBuilder EnableAdminMessages(
.AddSingleton(
resolver => new AdminProducer(
resolver.Resolve>(),
- topicPartition))
- .AddSingleton();
+ topicPartition));
- return cluster
+ cluster
.AddProducer(
producer => producer
.DefaultTopic(topic)
.AddMiddlewares(
middlewares => middlewares
- .AddSerializer()))
- .AddConsumer(
- consumer => consumer
- .ManualAssignPartitions(topic, new[] { topicPartition })
- .WithGroupId(consumerGroup)
- .WithoutStoringOffsets()
- .WithWorkersCount(1)
- .WithBufferSize(1)
- .WithAutoOffsetReset(AutoOffsetReset.Latest)
- .DisableManagement()
- .AddMiddlewares(
- middlewares => middlewares
- .AddSerializer()
- .AddTypedHandlers(
- handlers => handlers
- .WithHandlerLifetime(InstanceLifetime.Singleton)
- .AddHandlers(
- new[]
- {
- typeof(ChangeConsumerWorkersCountHandler),
- typeof(PauseConsumerByNameHandler),
- typeof(PauseConsumersByGroupHandler),
- typeof(ResetConsumerOffsetHandler),
- typeof(RestartConsumerByNameHandler),
- typeof(ResumeConsumerByNameHandler),
- typeof(ResumeConsumersByGroupHandler),
- typeof(RewindConsumerOffsetToDateTimeHandler),
- typeof(StartConsumerByNameHandler),
- typeof(StopConsumerByNameHandler),
- }))));
+ .AddSerializer()));
+
+ if (!producerOnly)
+ {
+ cluster.DependencyConfigurator
+ .AddSingleton();
+
+ cluster
+ .AddConsumer(
+ consumer => consumer
+ .ManualAssignPartitions(topic, new[] { topicPartition })
+ .WithGroupId(consumerGroup)
+ .WithoutStoringOffsets()
+ .WithWorkersCount(1)
+ .WithBufferSize(1)
+ .WithAutoOffsetReset(AutoOffsetReset.Latest)
+ .DisableManagement()
+ .AddMiddlewares(
+ middlewares => middlewares
+ .AddSerializer()
+ .AddTypedHandlers(
+ handlers => handlers
+ .WithHandlerLifetime(InstanceLifetime.Singleton)
+ .AddHandlers(
+ new[]
+ {
+ typeof(ChangeConsumerWorkersCountHandler),
+ typeof(PauseConsumerByNameHandler),
+ typeof(PauseConsumersByGroupHandler),
+ typeof(ResetConsumerOffsetHandler),
+ typeof(RestartConsumerByNameHandler),
+ typeof(ResumeConsumerByNameHandler),
+ typeof(ResumeConsumersByGroupHandler),
+ typeof(RewindConsumerOffsetToDateTimeHandler),
+ typeof(StartConsumerByNameHandler),
+ typeof(StopConsumerByNameHandler),
+ }))));
+ }
+
+ return cluster;
}
///