diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 322e9f10..2dbd33e9 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -281,7 +281,7 @@ func (cl *Client) OptValues(opt any) []any { return []any{cfg.hooks} case namefn(ConcurrentTransactionsBackoff): return []any{cfg.txnBackoff} - case namefn(considerMissingTopicDeletedAfter): + case namefn(ConsiderMissingTopicDeletedAfter): return []any{cfg.missingTopicDelete} case namefn(DefaultProduceTopic): diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 1281628f..8c4f6807 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -833,10 +833,14 @@ func ConcurrentTransactionsBackoff(backoff time.Duration) Opt { return clientOpt{func(cfg *cfg) { cfg.txnBackoff = backoff }} } -// considerMissingTopicDeletedAfter sets the amount of time a topic can be +// ConsiderMissingTopicDeletedAfter sets the amount of time a topic can be // missing from metadata responses _after_ loading it at least once before it -// is considered deleted. -func considerMissingTopicDeletedAfter(t time.Duration) Opt { +// is considered deleted, overriding the default of 15s. Note that for newer +// versions of Kafka, it may take a bit of time (~15s) for the cluster to fully +// recognize a newly created topic. If this option is set too low, there is +// some risk that the client will internally purge and re-see a topic a few +// times until the cluster fully broadcasts the topic creation. +func ConsiderMissingTopicDeletedAfter(t time.Duration) Opt { return clientOpt{func(cfg *cfg) { cfg.missingTopicDelete = t }} }