diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs index 4e8c00477..e710b6263 100644 --- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs @@ -20,12 +20,14 @@ public static class ClusterConfigurationBuilderExtensions /// The topic to be used by the admin commands /// The consumer group prefix /// The partition used to produce and consumer admin messages + /// To produce admin messages only /// public static IClusterConfigurationBuilder EnableAdminMessages( this IClusterConfigurationBuilder cluster, string topic, string consumerGroup = null, - int topicPartition = 0) + int topicPartition = 0, + bool producerOnly = false) { consumerGroup ??= $"Admin-{Assembly.GetEntryAssembly()!.GetName().Name}"; @@ -33,45 +35,54 @@ public static IClusterConfigurationBuilder EnableAdminMessages( .AddSingleton( resolver => new AdminProducer( resolver.Resolve>(), - topicPartition)) - .AddSingleton(); + topicPartition)); - return cluster + cluster .AddProducer( producer => producer .DefaultTopic(topic) .AddMiddlewares( middlewares => middlewares - .AddSerializer())) - .AddConsumer( - consumer => consumer - .ManualAssignPartitions(topic, new[] { topicPartition }) - .WithGroupId(consumerGroup) - .WithoutStoringOffsets() - .WithWorkersCount(1) - .WithBufferSize(1) - .WithAutoOffsetReset(AutoOffsetReset.Latest) - .DisableManagement() - .AddMiddlewares( - middlewares => middlewares - .AddSerializer() - .AddTypedHandlers( - handlers => handlers - .WithHandlerLifetime(InstanceLifetime.Singleton) - .AddHandlers( - new[] - { - typeof(ChangeConsumerWorkersCountHandler), - typeof(PauseConsumerByNameHandler), - typeof(PauseConsumersByGroupHandler), - typeof(ResetConsumerOffsetHandler), - typeof(RestartConsumerByNameHandler), - typeof(ResumeConsumerByNameHandler), - typeof(ResumeConsumersByGroupHandler), - typeof(RewindConsumerOffsetToDateTimeHandler), - typeof(StartConsumerByNameHandler), - typeof(StopConsumerByNameHandler), - })))); + .AddSerializer())); + + if (!producerOnly) + { + cluster.DependencyConfigurator + .AddSingleton(); + + cluster + .AddConsumer( + consumer => consumer + .ManualAssignPartitions(topic, new[] { topicPartition }) + .WithGroupId(consumerGroup) + .WithoutStoringOffsets() + .WithWorkersCount(1) + .WithBufferSize(1) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .DisableManagement() + .AddMiddlewares( + middlewares => middlewares + .AddSerializer() + .AddTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandlers( + new[] + { + typeof(ChangeConsumerWorkersCountHandler), + typeof(PauseConsumerByNameHandler), + typeof(PauseConsumersByGroupHandler), + typeof(ResetConsumerOffsetHandler), + typeof(RestartConsumerByNameHandler), + typeof(ResumeConsumerByNameHandler), + typeof(ResumeConsumersByGroupHandler), + typeof(RewindConsumerOffsetToDateTimeHandler), + typeof(StartConsumerByNameHandler), + typeof(StopConsumerByNameHandler), + })))); + } + + return cluster; } ///