From 253e1a93f3306a6acebe3f4b0f2787d63587f5f7 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 16 Sep 2023 16:13:38 +0100 Subject: [PATCH] kgo: add PurgeTopicsFrom{Producing,Consuming} For when you are using one client for both producing and consuming to the same topic. Closes #543. --- pkg/kgo/client.go | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 322e9f10..898f1105 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -560,7 +560,9 @@ func (cl *Client) Ping(ctx context.Context) error { } // PurgeTopicsFromClient internally removes all internal information about the -// input topics. +// input topics. If you you want to purge information for only consuming or +// only producing, see the related functions [PurgeTopicsFromConsuming] and +// [PurgeTopicsFromProducing]. // // For producing, this clears all knowledge that these topics have ever been // produced to. Producing to the topic again may result in out of order @@ -610,6 +612,34 @@ func (cl *Client) PurgeTopicsFromClient(topics ...string) { cl.mappedMetaMu.Unlock() } +// PurgeTopicsFromProducing internally removes all internal information for +// producing about the input topics. This runs the producer bit of logic that +// is documented in [PurgeTopicsFromClient]; see that function for more +// details. +func (cl *Client) PurgeTopicsFromProducing(topics ...string) { + if len(topics) == 0 { + return + } + sort.Strings(topics) + cl.blockingMetadataFn(func() { + cl.producer.purgeTopics(topics) + }) +} + +// PurgeTopicsFromConsuming internally removes all internal information for +// consuming about the input topics. This runs the consumer bit of logic that +// is documented in [PurgeTopicsFromClient]; see that function for more +// details. +func (cl *Client) PurgeTopicsFromConsuming(topics ...string) { + if len(topics) == 0 { + return + } + sort.Strings(topics) + cl.blockingMetadataFn(func() { + cl.consumer.purgeTopics(topics) + }) +} + // Parse broker IP/host and port from a string, using the default Kafka port if // unspecified. Supported address formats: //