diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 285becad..775a22e6 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -2401,7 +2401,7 @@ func (cl *Client) maybeDeleteMappedMetadata(unknownTopic bool, ts ...string) (sh // requests that are sharded and use metadata, and the one this benefits most // is ListOffsets. Likely, ListOffsets for the same topic will be issued back // to back, so not caching for so long is ok. -func (cl *Client) cachedMappedMetadata(ts ...string) (map[string]mappedMetadataTopic, []string) { +func (cl *Client) fetchCachedMappedMetadata(ts ...string) (map[string]mappedMetadataTopic, []string) { cl.mappedMetaMu.Lock() defer cl.mappedMetaMu.Unlock() if cl.mappedMeta == nil { @@ -2429,7 +2429,7 @@ func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useC var r map[string]mappedMetadataTopic needed := topics if useCache { - r, needed = cl.cachedMappedMetadata(topics...) + r, needed = cl.fetchCachedMappedMetadata(topics...) if len(needed) == 0 { return r, nil } @@ -2443,6 +2443,17 @@ func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useC return nil, err } + // Cache the mapped metadata, and also store each topic in the results. + cl.storeCachedMappedMetadata(meta, func(entry mappedMetadataTopic) { + r[*entry.t.Topic] = entry + }) + + return r, nil +} + +// storeCachedMappedMetadata caches the fetched metadata in the Client, and calls the onEachTopic callback +// function for each topic in the MetadataResponse. +func (cl *Client) storeCachedMappedMetadata(meta *kmsg.MetadataResponse, onEachTopic func(_ mappedMetadataTopic)) { cl.mappedMetaMu.Lock() defer cl.mappedMetaMu.Unlock() if cl.mappedMeta == nil { @@ -2461,10 +2472,13 @@ func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useC when: when, } cl.mappedMeta[*topic.Topic] = t - r[*topic.Topic] = t for _, partition := range topic.Partitions { t.ps[partition.Partition] = partition } + + if onEachTopic != nil { + onEachTopic(t) + } } if len(meta.Topics) != len(cl.mappedMeta) { for topic, mapped := range cl.mappedMeta { @@ -2476,7 +2490,6 @@ func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useC } } } - return r, nil } func unknownOrCode(exists bool, code int16) error { diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index b3bf6bf0..b29b5735 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -555,6 +555,11 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]* return nil, err } + // Since we've fetched the metadata for some topics we can optimistically cache it + // for mapped metadata too. This may reduce the number of Metadata requests issued + // by the client. + cl.storeCachedMappedMetadata(meta, nil) + topics := make(map[string]*metadataTopic, len(meta.Topics)) // Even if metadata returns a leader epoch, we do not use it unless we