Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimistically cache mapped metadata when cluster metadata is periodically refreshed #725

Merged
merged 1 commit into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2401,7 +2401,7 @@ func (cl *Client) maybeDeleteMappedMetadata(unknownTopic bool, ts ...string) (sh
// requests that are sharded and use metadata, and the one this benefits most
// is ListOffsets. Likely, ListOffsets for the same topic will be issued back
// to back, so not caching for so long is ok.
func (cl *Client) cachedMappedMetadata(ts ...string) (map[string]mappedMetadataTopic, []string) {
func (cl *Client) fetchCachedMappedMetadata(ts ...string) (map[string]mappedMetadataTopic, []string) {
cl.mappedMetaMu.Lock()
defer cl.mappedMetaMu.Unlock()
if cl.mappedMeta == nil {
Expand Down Expand Up @@ -2429,7 +2429,7 @@ func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useC
var r map[string]mappedMetadataTopic
needed := topics
if useCache {
r, needed = cl.cachedMappedMetadata(topics...)
r, needed = cl.fetchCachedMappedMetadata(topics...)
if len(needed) == 0 {
return r, nil
}
Expand All @@ -2443,6 +2443,17 @@ func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useC
return nil, err
}

// Cache the mapped metadata, and also store each topic in the results.
cl.storeCachedMappedMetadata(meta, func(entry mappedMetadataTopic) {
r[*entry.t.Topic] = entry
})

return r, nil
}

// storeCachedMappedMetadata caches the fetched metadata in the Client, and calls the onEachTopic callback
// function for each topic in the MetadataResponse.
func (cl *Client) storeCachedMappedMetadata(meta *kmsg.MetadataResponse, onEachTopic func(_ mappedMetadataTopic)) {
cl.mappedMetaMu.Lock()
defer cl.mappedMetaMu.Unlock()
if cl.mappedMeta == nil {
Expand All @@ -2461,10 +2472,13 @@ func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useC
when: when,
}
cl.mappedMeta[*topic.Topic] = t
r[*topic.Topic] = t
for _, partition := range topic.Partitions {
t.ps[partition.Partition] = partition
}

if onEachTopic != nil {
onEachTopic(t)
}
}
if len(meta.Topics) != len(cl.mappedMeta) {
for topic, mapped := range cl.mappedMeta {
Expand All @@ -2476,7 +2490,6 @@ func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useC
}
}
}
return r, nil
}

func unknownOrCode(exists bool, code int16) error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,11 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
return nil, err
}

// Since we've fetched the metadata for some topics we can optimistically cache it
// for mapped metadata too. This may reduce the number of Metadata requests issued
// by the client.
cl.storeCachedMappedMetadata(meta, nil)

topics := make(map[string]*metadataTopic, len(meta.Topics))

// Even if metadata returns a leader epoch, we do not use it unless we
Expand Down
Loading