Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: do not add all topics to internal tps map when regex consuming #838

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,44 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
}
}

// filterMetadataAllTopics, called BEFORE doOnMetadataUpdate, evaluates
// all topics received against the user provided regex.
func (c *consumer) filterMetadataAllTopics(topics []string) []string {
c.mu.Lock()
defer c.mu.Unlock()

var rns reNews
defer rns.log(&c.cl.cfg)

var reSeen map[string]bool
if c.d != nil {
reSeen = c.d.reSeen
} else {
reSeen = c.g.reSeen
}

keep := topics[:0]
for _, topic := range topics {
want, seen := reSeen[topic]
if !seen {
for rawRe, re := range c.cl.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
reSeen[topic] = want
}
if want {
keep = append(keep, topic)
}
}
return keep
}

func (c *consumer) doOnMetadataUpdate() {
if !c.consuming() {
return
Expand Down
20 changes: 1 addition & 19 deletions pkg/kgo/consumer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,11 @@ func (*directConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffset
func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
topics := d.tps.load()

var rns reNews
if d.cfg.regex {
defer rns.log(d.cfg)
}

toUse := make(map[string]map[int32]Offset, 10)
for topic, topicPartitions := range topics {
var useTopic bool
if d.cfg.regex {
want, seen := d.reSeen[topic]
if !seen {
for rawRe, re := range d.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
d.reSeen[topic] = want
}
useTopic = want
useTopic = d.reSeen[topic]
} else {
useTopic = d.m.onlyt(topic)
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -536,3 +537,71 @@ func TestIssue648(t *testing.T) {
t.Errorf("did not see ErrUnknownTopicOrPartition")
}
}

func TestIssue810(t *testing.T) {
t.Parallel()

t1, cleanup1 := tmpTopicPartitions(t, 1)
defer cleanup1()

_, cleanup2 := tmpTopicPartitions(t, 1)
defer cleanup2()

// Non-regex consuming: topics are available immediately.
{
cl, _ := newTestClient(
ConsumeTopics(t1),
UnknownTopicRetries(-1),
)
defer cl.Close()

topics := cl.GetConsumeTopics()
exp := []string{t1}

if !reflect.DeepEqual(topics, exp) {
t.Errorf("non-regex got %v != exp %v", topics, exp)
}
}

// Regex consuming: topics are available only after discovery.
{
cl, _ := newTestClient(
ConsumeTopics(t1),
ConsumeRegex(),
UnknownTopicRetries(-1),
MetadataMaxAge(time.Second),
MetadataMinAge(100*time.Millisecond),
)
defer cl.Close()

var (
ticker = time.NewTicker(100 * time.Millisecond)
fail = time.NewTimer(15 * time.Second)
failed bool
lastSaw []string
exp = []string{t1}
)

defer ticker.Stop()
defer fail.Stop()

out:
for {
select {
case <-ticker.C:
lastSaw = cl.GetConsumeTopics()
if reflect.DeepEqual(lastSaw, exp) {
break out
}
cl.ForceMetadataRefresh()
case <-fail.C:
failed = true
break out
}
}

if failed {
t.Errorf("did not see expected topics in time, last saw %v != exp %v", lastSaw, exp)
}
}
}
23 changes: 2 additions & 21 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type groupConsumer struct {
cooperative atomicBool // true if the group balancer chosen during Join is cooperative

// The data for topics that the user assigned. Metadata updates the
// atomic.Value in each pointer atomically. If we are consuming via
// regex, metadata grabs the lock to add new topics.
// atomic.Value in each pointer atomically.
tps *topicsPartitions

reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
Expand Down Expand Up @@ -1714,11 +1713,6 @@ func (g *groupConsumer) findNewAssignments() {
delta int
}

var rns reNews
if g.cfg.regex {
defer rns.log(&g.cl.cfg)
}

var numNewTopics int
toChange := make(map[string]change, len(topics))
for topic, topicPartitions := range topics {
Expand All @@ -1741,20 +1735,7 @@ func (g *groupConsumer) findNewAssignments() {
// support adding new regex).
useTopic := true
if g.cfg.regex {
want, seen := g.reSeen[topic]
if !seen {
for rawRe, re := range g.cfg.topics {
if want = re.MatchString(topic); want {
rns.add(rawRe, topic)
break
}
}
if !want {
rns.skip(topic)
}
g.reSeen[topic] = want
}
useTopic = want
useTopic = g.reSeen[topic]
}

// We only track using the topic if there are partitions for
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,14 @@ func (cl *Client) updateMetadata() (retryWhy multiUpdateWhy, err error) {
for topic := range latest {
allTopics = append(allTopics, topic)
}

// We filter out topics will not match any of our regex's.
// This ensures that the `tps` field does not contain topics
// we will never use (the client works with misc. topics in
// there, but it's better to avoid it -- and allows us to use
// `tps` in GetConsumeTopics).
allTopics = c.filterMetadataAllTopics(allTopics)

tpsConsumerLoad = tpsConsumer.ensureTopics(allTopics)
defer tpsConsumer.storeData(tpsConsumerLoad)

Expand Down