diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 8b2b2318..eb15ec00 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1186,6 +1186,44 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how } } +// filterMetadataAllTopics, called BEFORE doOnMetadataUpdate, evaluates +// all topics received against the user provided regex. +func (c *consumer) filterMetadataAllTopics(topics []string) []string { + c.mu.Lock() + defer c.mu.Unlock() + + var rns reNews + defer rns.log(&c.cl.cfg) + + var reSeen map[string]bool + if c.d != nil { + reSeen = c.d.reSeen + } else { + reSeen = c.g.reSeen + } + + keep := topics[:0] + for _, topic := range topics { + want, seen := reSeen[topic] + if !seen { + for rawRe, re := range c.cl.cfg.topics { + if want = re.MatchString(topic); want { + rns.add(rawRe, topic) + break + } + } + if !want { + rns.skip(topic) + } + reSeen[topic] = want + } + if want { + keep = append(keep, topic) + } + } + return keep +} + func (c *consumer) doOnMetadataUpdate() { if !c.consuming() { return diff --git a/pkg/kgo/consumer_direct.go b/pkg/kgo/consumer_direct.go index bf42dbca..419089bb 100644 --- a/pkg/kgo/consumer_direct.go +++ b/pkg/kgo/consumer_direct.go @@ -65,29 +65,11 @@ func (*directConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset { topics := d.tps.load() - var rns reNews - if d.cfg.regex { - defer rns.log(d.cfg) - } - toUse := make(map[string]map[int32]Offset, 10) for topic, topicPartitions := range topics { var useTopic bool if d.cfg.regex { - want, seen := d.reSeen[topic] - if !seen { - for rawRe, re := range d.cfg.topics { - if want = re.MatchString(topic); want { - rns.add(rawRe, topic) - break - } - } - if !want { - rns.skip(topic) - } - d.reSeen[topic] = want - } - useTopic = want + useTopic, _ = d.reSeen[topic] } else { useTopic = d.m.onlyt(topic) } diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index dc12083d..1d53932f 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "reflect" "sort" "sync/atomic" "testing" @@ -536,3 +537,68 @@ func TestIssue648(t *testing.T) { t.Errorf("did not see ErrUnknownTopicOrPartition") } } + +func TestIssue810(t *testing.T) { + t.Parallel() + + t1, cleanup1 := tmpTopicPartitions(t, 1) + defer cleanup1() + + _, cleanup2 := tmpTopicPartitions(t, 1) + defer cleanup2() + + // Non-regex consuming: topics are available immediately. + { + cl, _ := newTestClient( + ConsumeTopics(t1), + UnknownTopicRetries(-1), + ) + defer cl.Close() + + topics := cl.GetConsumeTopics() + exp := []string{t1} + + if !reflect.DeepEqual(topics, exp) { + t.Errorf("non-regex got %v != exp %v", topics, exp) + } + } + + // Regex consuming: topics are available only after discovery. + { + cl, _ := newTestClient( + ConsumeTopics(t1), + ConsumeRegex(), + UnknownTopicRetries(-1), + ) + defer cl.Close() + + var ( + ticker = time.NewTicker(100 * time.Millisecond) + fail = time.NewTimer(15 * time.Second) + failed bool + lastSaw []string + exp = []string{t1} + ) + + defer ticker.Stop() + defer fail.Stop() + + out: + for { + select { + case <-ticker.C: + lastSaw = cl.GetConsumeTopics() + if reflect.DeepEqual(lastSaw, exp) { + break out + } + case <-fail.C: + failed = true + break out + } + } + + if failed { + t.Errorf("did not see expected topics in time, last saw %v != exp %v", lastSaw, exp) + } + } +} diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index c1946eb4..50482fad 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -27,8 +27,7 @@ type groupConsumer struct { cooperative atomicBool // true if the group balancer chosen during Join is cooperative // The data for topics that the user assigned. Metadata updates the - // atomic.Value in each pointer atomically. If we are consuming via - // regex, metadata grabs the lock to add new topics. + // atomic.Value in each pointer atomically. tps *topicsPartitions reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not @@ -1714,11 +1713,6 @@ func (g *groupConsumer) findNewAssignments() { delta int } - var rns reNews - if g.cfg.regex { - defer rns.log(&g.cl.cfg) - } - var numNewTopics int toChange := make(map[string]change, len(topics)) for topic, topicPartitions := range topics { @@ -1741,20 +1735,7 @@ func (g *groupConsumer) findNewAssignments() { // support adding new regex). useTopic := true if g.cfg.regex { - want, seen := g.reSeen[topic] - if !seen { - for rawRe, re := range g.cfg.topics { - if want = re.MatchString(topic); want { - rns.add(rawRe, topic) - break - } - } - if !want { - rns.skip(topic) - } - g.reSeen[topic] = want - } - useTopic = want + useTopic, _ = g.reSeen[topic] } // We only track using the topic if there are partitions for diff --git a/pkg/kgo/metadata.go b/pkg/kgo/metadata.go index 33cac641..0552647d 100644 --- a/pkg/kgo/metadata.go +++ b/pkg/kgo/metadata.go @@ -350,6 +350,14 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) { for topic := range latest { allTopics = append(allTopics, topic) } + + // We filter out topics will not match any of our regex's. + // This ensures that the `tps` field does not contain topics + // we will never use (the client works with misc. topics in + // there, but it's better to avoid it -- and allows us to use + // `tps` in GetConsumeTopics). + allTopics = c.filterMetadataAllTopics(allTopics) + tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics) defer tpsConsumer.storeData(tpsConsumerLoad)