diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 322e9f10..898f1105 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -560,7 +560,9 @@ func (cl *Client) Ping(ctx context.Context) error { } // PurgeTopicsFromClient internally removes all internal information about the -// input topics. +// input topics. If you you want to purge information for only consuming or +// only producing, see the related functions [PurgeTopicsFromConsuming] and +// [PurgeTopicsFromProducing]. // // For producing, this clears all knowledge that these topics have ever been // produced to. Producing to the topic again may result in out of order @@ -610,6 +612,34 @@ func (cl *Client) PurgeTopicsFromClient(topics ...string) { cl.mappedMetaMu.Unlock() } +// PurgeTopicsFromProducing internally removes all internal information for +// producing about the input topics. This runs the producer bit of logic that +// is documented in [PurgeTopicsFromClient]; see that function for more +// details. +func (cl *Client) PurgeTopicsFromProducing(topics ...string) { + if len(topics) == 0 { + return + } + sort.Strings(topics) + cl.blockingMetadataFn(func() { + cl.producer.purgeTopics(topics) + }) +} + +// PurgeTopicsFromConsuming internally removes all internal information for +// consuming about the input topics. This runs the consumer bit of logic that +// is documented in [PurgeTopicsFromClient]; see that function for more +// details. +func (cl *Client) PurgeTopicsFromConsuming(topics ...string) { + if len(topics) == 0 { + return + } + sort.Strings(topics) + cl.blockingMetadataFn(func() { + cl.consumer.purgeTopics(topics) + }) +} + // Parse broker IP/host and port from a string, using the default Kafka port if // unspecified. Supported address formats: //