Skip to content

Commit

Permalink
Merge pull request #610 from twmb/oct-issue
Browse files Browse the repository at this point in the history
kgo: be sure to use topics when other topics are paused
  • Loading branch information
twmb authored Nov 1, 2023
2 parents e35a2a1 + af5bc1f commit d815037
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 34 deletions.
14 changes: 10 additions & 4 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,10 +1763,16 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload
// guard this entire function.

debug := s.c.cl.cfg.logger.Level() >= LogLevelDebug
var using, reloading map[string]map[int32]EpochOffset

var using map[string]map[int32]EpochOffset
type epochOffsetWhy struct {
EpochOffset
error
}
var reloading map[string]map[int32]epochOffsetWhy
if debug {
using = make(map[string]map[int32]EpochOffset)
reloading = make(map[string]map[int32]EpochOffset)
reloading = make(map[string]map[int32]epochOffsetWhy)
defer func() {
t := "list"
if loaded.loadType == loadTypeEpoch {
Expand Down Expand Up @@ -1818,10 +1824,10 @@ func (s *consumerSession) handleListOrEpochResults(loaded loadedOffsets) (reload
if debug {
treloading := reloading[load.topic]
if treloading == nil {
treloading = make(map[int32]EpochOffset)
treloading = make(map[int32]epochOffsetWhy)
reloading[load.topic] = treloading
}
treloading[load.partition] = EpochOffset{load.leaderEpoch, load.offset}
treloading[load.partition] = epochOffsetWhy{EpochOffset{load.leaderEpoch, load.offset}, load.err}
}
}
}
Expand Down
156 changes: 130 additions & 26 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,19 @@ func TestAddRemovePartitions(t *testing.T) {
}
}

func closed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}

func TestPauseIssue489(t *testing.T) {
t.Parallel()

t1, cleanup := tmpTopicPartitions(t, 2)
t1, cleanup := tmpTopicPartitions(t, 3)
defer cleanup()

cl, _ := NewClient(
Expand All @@ -282,47 +291,142 @@ func TestPauseIssue489(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit atomic.Bool
var zeroOne uint8
var which uint8
for !exit.Load() {
r := StringRecord("v")
r.Partition = int32(zeroOne % 2)
zeroOne++
r.Partition = int32(which % 3)
which++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
}
}()
defer cancel()

for i := 0; i < 10; i++ {
var sawZero, sawOne bool
for !sawZero || !sawOne {
fs := cl.PollFetches(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
})
}
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne = false, false
for _, pollfn := range []struct {
name string
fn func(context.Context) Fetches
}{
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
} {
for i := 0; i < 10; i++ {
var fs Fetches
if i < 5 {
fs = cl.PollFetches(ctx)
} else {
fs = cl.PollRecords(ctx, 2)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawZero, sawOne, sawTwo bool
for (!sawZero || !sawOne || !sawTwo) && !closed(ctx.Done()) {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
sawTwo = sawTwo || r.Partition == 2
})
}
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne, sawTwo = false, false, false
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
sawOne = sawOne || r.Partition == 1
sawTwo = sawTwo || r.Partition == 2
})
}
cancel()
if sawZero {
t.Fatalf("%s: saw partition zero even though it was paused", pollfn.name)
}
if !sawOne {
t.Fatalf("%s: did not see partition one even though it was not paused", pollfn.name)
}
if !sawTwo {
t.Fatalf("%s: did not see partition two even though it was not paused", pollfn.name)
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}
}

func TestPauseIssueOct2023(t *testing.T) {
t.Parallel()

t1, cleanup1 := tmpTopicPartitions(t, 1)
t2, cleanup2 := tmpTopicPartitions(t, 1)
t3, cleanup3 := tmpTopicPartitions(t, 1)
defer cleanup1()
defer cleanup2()
defer cleanup3()
ts := []string{t1, t2, t3}

cl, _ := NewClient(
getSeedBrokers(),
UnknownTopicRetries(-1),
ConsumeTopics(ts...),
MetadataMinAge(50*time.Millisecond),
FetchMaxWait(100*time.Millisecond),
)
defer cl.Close()

ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit atomic.Bool
var which int
for !exit.Load() {
r := StringRecord("v")
r.Topic = ts[which%len(ts)]
which++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
})
time.Sleep(100 * time.Microsecond)
}
if sawZero {
t.Error("saw partition zero even though it was paused")
}()
defer cancel()

for _, pollfn := range []struct {
name string
fn func(context.Context) Fetches
}{
{"fetches", func(ctx context.Context) Fetches { return cl.PollFetches(ctx) }},
{"records", func(ctx context.Context) Fetches { return cl.PollRecords(ctx, 1000) }},
} {
for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawt1, sawt2, sawt3 bool
for (!sawt1 || !sawt2 || !sawt3) && !closed(ctx.Done()) {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
sawt3 = sawt3 || r.Topic == t3
})
}
cl.PauseFetchTopics(t1)
sawt1, sawt2, sawt3 = false, false, false
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
fs := pollfn.fn(ctx)
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
sawt3 = sawt3 || r.Topic == t3
})
}
cancel()
if sawt1 {
t.Fatalf("%s: saw topic t1 even though it was paused", pollfn.name)
}
if !sawt2 {
t.Fatalf("%s: did not see topic t2 even though it was not paused", pollfn.name)
}
if !sawt3 {
t.Fatalf("%s: did not see topic t3 even though it was not paused", pollfn.name)
}
cl.ResumeFetchTopics(t1)
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}

Expand Down
27 changes: 23 additions & 4 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
// and strip the topic entirely.
pps, ok := paused.t(t)
if !ok {
for _, o := range ps {
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
continue
}
if strip == nil {
Expand All @@ -368,7 +372,6 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
continue
}
stript := make(map[int32]struct{})
strip[t] = stript
for _, o := range ps {
if _, ok := pps.m[o.from.partition]; ok {
o.from.allowUsable()
Expand All @@ -378,6 +381,15 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
// We only add stript to strip if there are any
// stripped partitions. We could have a paused
// partition that is on another broker, while this
// broker has no paused partitions -- if we add stript
// here, our logic below (stripping this entire topic)
// is more confusing (present nil vs. non-present nil).
if len(stript) > 0 {
strip[t] = stript
}
}
})
if strip != nil {
Expand Down Expand Up @@ -435,9 +447,15 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
continue
}

r.Topics = append(r.Topics, *t)
rt := &r.Topics[len(r.Topics)-1]
rt.Partitions = nil
var rt *FetchTopic
ensureTopicAdded := func() {
if rt != nil {
return
}
r.Topics = append(r.Topics, *t)
rt = &r.Topics[len(r.Topics)-1]
rt.Partitions = nil
}

tCursors := b.usedOffsets[t.Topic]

Expand All @@ -455,6 +473,7 @@ func (s *source) takeNBuffered(paused pausedTopics, n int) (Fetch, int, bool) {
continue
}

ensureTopicAdded()
rt.Partitions = append(rt.Partitions, *p)
rp := &rt.Partitions[len(rt.Partitions)-1]

Expand Down

0 comments on commit d815037

Please sign in to comment.