From 1b6a721e340c593d69adee77a74e338043dc9aeb Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 3 Dec 2023 11:11:40 -0700 Subject: [PATCH] kgo source: use the proper topic-to-id map when forgetting topics Adding topics to a session needs to use the fetch request's topic2id map (which then promotes IDs into the session t2id map). Importantly, and previously this was wrong / not the case: removing topics from a session needs to use the session's t2id map. The topic does not exist in the request's topic2id map, because well, it's being forgotten. It's not in the fetch request. Adds some massive comments explaining the situation. Closes #620. --- pkg/kgo/source.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 25a3c2d3..49c3d042 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1741,6 +1741,26 @@ type fetchRequest struct { torder []string // order of topics to write porder map[string][]int32 // per topic, order of partitions to write + // topic2id and id2topic track bidirectional lookup of topics and IDs + // that are being added to *this* specific request. topic2id slightly + // duplicates the map t2id in the fetch session, but t2id is different + // in that t2id tracks IDs in use from all prior requests -- and, + // importantly, t2id is cleared of IDs that are no longer used (see + // ForgottenTopics). + // + // We need to have both a session t2id map and a request t2id map: + // + // * The session t2id is what we use when creating forgotten topics. + // If we are forgetting a topic, the ID is not in the req t2id. + // + // * The req topic2id is used for adding to the session t2id. When + // building a request, if the id is in req.topic2id but not + // session.t2id, we promote the ID into the session map. + // + // Lastly, id2topic is used when handling the response, as our reverse + // lookup from the ID back to the topic (and then we work with the + // topic name only). There is no equivalent in the session because + // there is no need for the id2topic lookup ever in the session. topic2id map[string][16]byte id2topic map[[16]byte]string @@ -2067,7 +2087,7 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { if forgottenTopic == nil { t := kmsg.NewFetchRequestForgottenTopic() t.Topic = topic - t.TopicID = f.topic2id[topic] + t.TopicID = f.session.t2id[topic] req.ForgottenTopics = append(req.ForgottenTopics, t) forgottenTopic = &req.ForgottenTopics[len(req.ForgottenTopics)-1] } @@ -2079,7 +2099,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { id := f.session.t2id[topic] delete(f.session.t2id, topic) // If we deleted a topic that was missing an ID, then we clear the - // previous disableIDs state and potentially reenable it. + // previous disableIDs state. We potentially *reenable* disableIDs + // if any remaining topics in our session are also missing their ID. var noID [16]byte if id == noID { f.session.disableIDs = false