Skip to content

Commit

Permalink
Merge pull request #627 from twmb/620
Browse files Browse the repository at this point in the history
kgo source: use the proper topic-to-id map when forgetting topics
  • Loading branch information
twmb authored Dec 6, 2023
2 parents a6d10d4 + 1b6a721 commit 5076659
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,26 @@ type fetchRequest struct {
torder []string // order of topics to write
porder map[string][]int32 // per topic, order of partitions to write

// topic2id and id2topic track bidirectional lookup of topics and IDs
// that are being added to *this* specific request. topic2id slightly
// duplicates the map t2id in the fetch session, but t2id is different
// in that t2id tracks IDs in use from all prior requests -- and,
// importantly, t2id is cleared of IDs that are no longer used (see
// ForgottenTopics).
//
// We need to have both a session t2id map and a request t2id map:
//
// * The session t2id is what we use when creating forgotten topics.
// If we are forgetting a topic, the ID is not in the req t2id.
//
// * The req topic2id is used for adding to the session t2id. When
// building a request, if the id is in req.topic2id but not
// session.t2id, we promote the ID into the session map.
//
// Lastly, id2topic is used when handling the response, as our reverse
// lookup from the ID back to the topic (and then we work with the
// topic name only). There is no equivalent in the session because
// there is no need for the id2topic lookup ever in the session.
topic2id map[string][16]byte
id2topic map[[16]byte]string

Expand Down Expand Up @@ -2067,7 +2087,7 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
if forgottenTopic == nil {
t := kmsg.NewFetchRequestForgottenTopic()
t.Topic = topic
t.TopicID = f.topic2id[topic]
t.TopicID = f.session.t2id[topic]
req.ForgottenTopics = append(req.ForgottenTopics, t)
forgottenTopic = &req.ForgottenTopics[len(req.ForgottenTopics)-1]
}
Expand All @@ -2079,7 +2099,8 @@ func (f *fetchRequest) AppendTo(dst []byte) []byte {
id := f.session.t2id[topic]
delete(f.session.t2id, topic)
// If we deleted a topic that was missing an ID, then we clear the
// previous disableIDs state and potentially reenable it.
// previous disableIDs state. We potentially *reenable* disableIDs
// if any remaining topics in our session are also missing their ID.
var noID [16]byte
if id == noID {
f.session.disableIDs = false
Expand Down

0 comments on commit 5076659

Please sign in to comment.