Skip to content

Commit

Permalink
Merge pull request #838 from twmb/810
Browse files Browse the repository at this point in the history
kgo: do not add all topics to internal tps map when regex consuming
  • Loading branch information
twmb authored Oct 15, 2024
2 parents 04356d7 + 68163c5 commit d771ddf
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 40 deletions.
38 changes: 38 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,44 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
}
}

// filterMetadataAllTopics, called BEFORE doOnMetadataUpdate, evaluates
// all topics received against the user provided regex.
func (c *consumer) filterMetadataAllTopics(topics []string) []string {
c.mu.Lock()
defer c.mu.Unlock()

var rns reNews
defer rns.log(&c.cl.cfg)

var reSeen map[string]bool
if c.d != nil {
reSeen = c.d.reSeen
} else {
reSeen = c.g.reSeen
}

keep := topics[:0]
for _, topic := range topics {
want, seen := reSeen[topic]
if !seen {
for rawRe, re := range c.cl.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
reSeen[topic] = want
}
if want {
keep = append(keep, topic)
}
}
return keep
}

func (c *consumer) doOnMetadataUpdate() {
if !c.consuming() {
return
Expand Down
20 changes: 1 addition & 19 deletions pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,11 @@ func (*directConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset
func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
topics := d.tps.load()

var rns reNews
if d.cfg.regex {
defer rns.log(d.cfg)
}

toUse := make(map[string]map[int32]Offset, 10)
for topic, topicPartitions := range topics {
var useTopic bool
if d.cfg.regex {
want, seen := d.reSeen[topic]
if !seen {
for rawRe, re := range d.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
d.reSeen[topic] = want
}
useTopic = want
useTopic = d.reSeen[topic]
} else {
useTopic = d.m.onlyt(topic)
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -536,3 +537,71 @@ func TestIssue648(t *testing.T) {
t.Errorf("did not see ErrUnknownTopicOrPartition")
}
}

func TestIssue810(t *testing.T) {
t.Parallel()

t1, cleanup1 := tmpTopicPartitions(t, 1)
defer cleanup1()

_, cleanup2 := tmpTopicPartitions(t, 1)
defer cleanup2()

// Non-regex consuming: topics are available immediately.
{
cl, _ := newTestClient(
ConsumeTopics(t1),
UnknownTopicRetries(-1),
)
defer cl.Close()

topics := cl.GetConsumeTopics()
exp := []string{t1}

if !reflect.DeepEqual(topics, exp) {
t.Errorf("non-regex got %v != exp %v", topics, exp)
}
}

// Regex consuming: topics are available only after discovery.
{
cl, _ := newTestClient(
ConsumeTopics(t1),
ConsumeRegex(),
UnknownTopicRetries(-1),
MetadataMaxAge(time.Second),
MetadataMinAge(100*time.Millisecond),
)
defer cl.Close()

var (
ticker = time.NewTicker(100 * time.Millisecond)
fail = time.NewTimer(15 * time.Second)
failed bool
lastSaw []string
exp = []string{t1}
)

defer ticker.Stop()
defer fail.Stop()

out:
for {
select {
case <-ticker.C:
lastSaw = cl.GetConsumeTopics()
if reflect.DeepEqual(lastSaw, exp) {
break out
}
cl.ForceMetadataRefresh()
case <-fail.C:
failed = true
break out
}
}

if failed {
t.Errorf("did not see expected topics in time, last saw %v != exp %v", lastSaw, exp)
}
}
}
23 changes: 2 additions & 21 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type groupConsumer struct {
cooperative atomicBool // true if the group balancer chosen during Join is cooperative

// The data for topics that the user assigned. Metadata updates the
// atomic.Value in each pointer atomically. If we are consuming via
// regex, metadata grabs the lock to add new topics.
// atomic.Value in each pointer atomically.
tps *topicsPartitions

reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
Expand Down Expand Up @@ -1714,11 +1713,6 @@ func (g *groupConsumer) findNewAssignments() {
delta int
}

var rns reNews
if g.cfg.regex {
defer rns.log(&g.cl.cfg)
}

var numNewTopics int
toChange := make(map[string]change, len(topics))
for topic, topicPartitions := range topics {
Expand All @@ -1741,20 +1735,7 @@ func (g *groupConsumer) findNewAssignments() {
// support adding new regex).
useTopic := true
if g.cfg.regex {
want, seen := g.reSeen[topic]
if !seen {
for rawRe, re := range g.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
g.reSeen[topic] = want
}
useTopic = want
useTopic = g.reSeen[topic]
}

// We only track using the topic if there are partitions for
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,14 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
for topic := range latest {
allTopics = append(allTopics, topic)
}

// We filter out topics will not match any of our regex's.
// This ensures that the `tps` field does not contain topics
// we will never use (the client works with misc. topics in
// there, but it's better to avoid it -- and allows us to use
// `tps` in GetConsumeTopics).
allTopics = c.filterMetadataAllTopics(allTopics)

tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics)
defer tpsConsumer.storeData(tpsConsumerLoad)

Expand Down

0 comments on commit d771ddf

Please sign in to comment.