diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 25a3c2d3..49c3d042 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1741,6 +1741,26 @@ type fetchRequest struct { torder []string // order of topics to write porder map[string][]int32 // per topic, order of partitions to write + // topic2id and id2topic track bidirectional lookup of topics and IDs + // that are being added to *this* specific request. topic2id slightly + // duplicates the map t2id in the fetch session, but t2id is different + // in that t2id tracks IDs in use from all prior requests -- and, + // importantly, t2id is cleared of IDs that are no longer used (see + // ForgottenTopics). + // + // We need to have both a session t2id map and a request t2id map: + // + // * The session t2id is what we use when creating forgotten topics. + // If we are forgetting a topic, the ID is not in the req t2id. + // + // * The req topic2id is used for adding to the session t2id. When + // building a request, if the id is in req.topic2id but not + // session.t2id, we promote the ID into the session map. + // + // Lastly, id2topic is used when handling the response, as our reverse + // lookup from the ID back to the topic (and then we work with the + // topic name only). There is no equivalent in the session because + // there is no need for the id2topic lookup ever in the session. topic2id map[string][16]byte id2topic map[[16]byte]string @@ -2067,7 +2087,7 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { if forgottenTopic == nil { t := kmsg.NewFetchRequestForgottenTopic() t.Topic = topic - t.TopicID = f.topic2id[topic] + t.TopicID = f.session.t2id[topic] req.ForgottenTopics = append(req.ForgottenTopics, t) forgottenTopic = &req.ForgottenTopics[len(req.ForgottenTopics)-1] } @@ -2079,7 +2099,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte { id := f.session.t2id[topic] delete(f.session.t2id, topic) // If we deleted a topic that was missing an ID, then we clear the - // previous disableIDs state and potentially reenable it. + // previous disableIDs state. We potentially *reenable* disableIDs + // if any remaining topics in our session are also missing their ID. var noID [16]byte if id == noID { f.session.disableIDs = false