From 68163c554fe0d80188aa421e6f6b9485694ef99a Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 14 Oct 2024 12:34:32 -0600 Subject: [PATCH] kgo: do not add all topics to internal tps map when regex consuming The internal tps map is meant to be what we store topicPartitions in that we are candidates to be consumed. This is filtered in assignPartitions to only opt-in partitions that are actually being consumed. It's not BAD if we store all topics in that map, but it's not the intent. The rest of the client worked fine even with extra topics in the map. When regex consuming, the metadata function previously put all topics into the map always. Now, we move the regex evaluation logic -- duplicated in both the direct and group consumers -- into one function and use that for filtering within metadata. This introduces a required sequence of filtering THEN finding assignments, which is fine / was the way things operated anyway. Moving the filtering to metadata (only in the regex consuming logic) means that we no longer store information for topics we are not consuming. Indirectly, this fixes a bug where `GetConsumeTopics` would always return ALL topics when regex consuming, because `GetConsumeTopics` always just returned what was in the `tps` field. This adds a test for the fixed behavior, as well as tests that NOT regex consuming always returns all topics the user is interested in. Closes #810. --- pkg/kgo/consumer.go | 38 ++++++++++++++++++ pkg/kgo/consumer_direct.go | 20 +--------- pkg/kgo/consumer_direct_test.go | 69 +++++++++++++++++++++++++++++++++ pkg/kgo/consumer_group.go | 23 +---------- pkg/kgo/metadata.go | 8 ++++ 5 files changed, 118 insertions(+), 40 deletions(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 8b2b2318..eb15ec00 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1186,6 +1186,44 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how } } +// filterMetadataAllTopics, called BEFORE doOnMetadataUpdate, evaluates +// all topics received against the user provided regex. +func (c *consumer) filterMetadataAllTopics(topics []string) []string { + c.mu.Lock() + defer c.mu.Unlock() + + var rns reNews + defer rns.log(&c.cl.cfg) + + var reSeen map[string]bool + if c.d != nil { + reSeen = c.d.reSeen + } else { + reSeen = c.g.reSeen + } + + keep := topics[:0] + for _, topic := range topics { + want, seen := reSeen[topic] + if !seen { + for rawRe, re := range c.cl.cfg.topics { + if want = re.MatchString(topic); want { + rns.add(rawRe, topic) + break + } + } + if !want { + rns.skip(topic) + } + reSeen[topic] = want + } + if want { + keep = append(keep, topic) + } + } + return keep +} + func (c *consumer) doOnMetadataUpdate() { if !c.consuming() { return diff --git a/pkg/kgo/consumer_direct.go b/pkg/kgo/consumer_direct.go index bf42dbca..0dcbf989 100644 --- a/pkg/kgo/consumer_direct.go +++ b/pkg/kgo/consumer_direct.go @@ -65,29 +65,11 @@ func (*directConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset { topics := d.tps.load() - var rns reNews - if d.cfg.regex { - defer rns.log(d.cfg) - } - toUse := make(map[string]map[int32]Offset, 10) for topic, topicPartitions := range topics { var useTopic bool if d.cfg.regex { - want, seen := d.reSeen[topic] - if !seen { - for rawRe, re := range d.cfg.topics { - if want = re.MatchString(topic); want { - rns.add(rawRe, topic) - break - } - } - if !want { - rns.skip(topic) - } - d.reSeen[topic] = want - } - useTopic = want + useTopic = d.reSeen[topic] } else { useTopic = d.m.onlyt(topic) } diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index dc12083d..736730e8 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "sort" "sync/atomic" "testing" @@ -536,3 +537,71 @@ func TestIssue648(t *testing.T) { t.Errorf("did not see ErrUnknownTopicOrPartition") } } + +func TestIssue810(t *testing.T) { + t.Parallel() + + t1, cleanup1 := tmpTopicPartitions(t, 1) + defer cleanup1() + + _, cleanup2 := tmpTopicPartitions(t, 1) + defer cleanup2() + + // Non-regex consuming: topics are available immediately. + { + cl, _ := newTestClient( + ConsumeTopics(t1), + UnknownTopicRetries(-1), + ) + defer cl.Close() + + topics := cl.GetConsumeTopics() + exp := []string{t1} + + if !reflect.DeepEqual(topics, exp) { + t.Errorf("non-regex got %v != exp %v", topics, exp) + } + } + + // Regex consuming: topics are available only after discovery. + { + cl, _ := newTestClient( + ConsumeTopics(t1), + ConsumeRegex(), + UnknownTopicRetries(-1), + MetadataMaxAge(time.Second), + MetadataMinAge(100*time.Millisecond), + ) + defer cl.Close() + + var ( + ticker = time.NewTicker(100 * time.Millisecond) + fail = time.NewTimer(15 * time.Second) + failed bool + lastSaw []string + exp = []string{t1} + ) + + defer ticker.Stop() + defer fail.Stop() + + out: + for { + select { + case <-ticker.C: + lastSaw = cl.GetConsumeTopics() + if reflect.DeepEqual(lastSaw, exp) { + break out + } + cl.ForceMetadataRefresh() + case <-fail.C: + failed = true + break out + } + } + + if failed { + t.Errorf("did not see expected topics in time, last saw %v != exp %v", lastSaw, exp) + } + } +} diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index c1946eb4..dee54bd7 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -27,8 +27,7 @@ type groupConsumer struct { cooperative atomicBool // true if the group balancer chosen during Join is cooperative // The data for topics that the user assigned. Metadata updates the - // atomic.Value in each pointer atomically. If we are consuming via - // regex, metadata grabs the lock to add new topics. + // atomic.Value in each pointer atomically. tps *topicsPartitions reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not @@ -1714,11 +1713,6 @@ func (g *groupConsumer) findNewAssignments() { delta int } - var rns reNews - if g.cfg.regex { - defer rns.log(&g.cl.cfg) - } - var numNewTopics int toChange := make(map[string]change, len(topics)) for topic, topicPartitions := range topics { @@ -1741,20 +1735,7 @@ func (g *groupConsumer) findNewAssignments() { // support adding new regex). useTopic := true if g.cfg.regex { - want, seen := g.reSeen[topic] - if !seen { - for rawRe, re := range g.cfg.topics { - if want = re.MatchString(topic); want { - rns.add(rawRe, topic) - break - } - } - if !want { - rns.skip(topic) - } - g.reSeen[topic] = want - } - useTopic = want + useTopic = g.reSeen[topic] } // We only track using the topic if there are partitions for diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 33cac641..0552647d 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -350,6 +350,14 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { for topic := range latest { allTopics = append(allTopics, topic) } + + // We filter out topics will not match any of our regex's. + // This ensures that the `tps` field does not contain topics + // we will never use (the client works with misc. topics in + // there, but it's better to avoid it -- and allows us to use + // `tps` in GetConsumeTopics). + allTopics = c.filterMetadataAllTopics(allTopics) + tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics) defer tpsConsumer.storeData(tpsConsumerLoad)