Skip to content

Commit

Permalink
kgo: be sure to use topics when other topics are paused
Browse files Browse the repository at this point in the history
Follow up from #585, there was a bug in the commit for it. If any topic
was paused, then all non-paused topics would be returned once, but they
would not be marked as fetchable after that.

I _think_ the non-fetchability would eventually be cleared on a metadata
update, _but_ the source would re-fetch from the old position again. The
only way the topic would advance would be if no topics were paused after
the metadata update.

However this is a bit confusing, and overall this patch is required.
  • Loading branch information
twmb committed Oct 31, 2023
1 parent 6ebcb43 commit 47aa13f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 3 deletions.
89 changes: 86 additions & 3 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ func TestAddRemovePartitions(t *testing.T) {
}
}

func closed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}

func TestPauseIssue489(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -297,8 +306,9 @@ func TestPauseIssue489(t *testing.T) {
defer cancel()

for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawZero, sawOne bool
for !sawZero || !sawOne {
for (!sawZero || !sawOne) && !closed(ctx.Done()) {
fs := cl.PollFetches(ctx)
fs.EachRecord(func(r *Record) {
sawZero = sawZero || r.Partition == 0
Expand All @@ -307,7 +317,7 @@ func TestPauseIssue489(t *testing.T) {
}
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
sawZero, sawOne = false, false
for i := 0; i < 10; i++ {
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
var fs Fetches
if i < 5 {
fs = cl.PollFetches(ctx)
Expand All @@ -319,13 +329,86 @@ func TestPauseIssue489(t *testing.T) {
sawOne = sawOne || r.Partition == 1
})
}
cancel()
if sawZero {
t.Error("saw partition zero even though it was paused")
t.Fatal("saw partition zero even though it was paused")
}
if !sawOne {
t.Fatal("did not see partition one even though it was not paused")
}
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
}
}

func TestPauseIssueOct2023(t *testing.T) {
t.Parallel()

t1, cleanup1 := tmpTopicPartitions(t, 1)
t2, cleanup2 := tmpTopicPartitions(t, 1)
defer cleanup1()
defer cleanup2()
ts := []string{t1, t2}

cl, _ := NewClient(
getSeedBrokers(),
UnknownTopicRetries(-1),
ConsumeTopics(ts...),
FetchMaxWait(100*time.Millisecond),
)
defer cl.Close()

ctx, cancel := context.WithCancel(context.Background())
go func() {
var exit atomic.Bool
var which int
for !exit.Load() {
r := StringRecord("v")
r.Topic = ts[which%len(ts)]
which++
cl.Produce(ctx, r, func(r *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
})
}
}()
defer cancel()

for i := 0; i < 10; i++ {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
var sawt1, sawt2 bool
for (!sawt1 || !sawt2) && !closed(ctx.Done()) {
fs := cl.PollFetches(ctx)
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
})
}
cl.PauseFetchTopics(t1)
sawt1, sawt2 = false, false
for i := 0; i < 10 && !closed(ctx.Done()); i++ {
var fs Fetches
if i < 5 {
fs = cl.PollFetches(ctx)
} else {
fs = cl.PollRecords(ctx, 2)
}
fs.EachRecord(func(r *Record) {
sawt1 = sawt1 || r.Topic == t1
sawt2 = sawt2 || r.Topic == t2
})
}
cancel()
if sawt1 {
t.Fatal("saw topic t1 even though it was paused")
}
if !sawt2 {
t.Fatal("did not see topic t2 even though it was not paused")
}
cl.ResumeFetchTopics(t1)
}
}

func TestIssue523(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ func (s *source) takeBuffered(paused pausedTopics) Fetch {
// and strip the topic entirely.
pps, ok := paused.t(t)
if !ok {
for _, o := range ps {
o.from.setOffset(o.cursorOffset)
o.from.allowUsable()
}
continue
}
if strip == nil {
Expand Down

0 comments on commit 47aa13f

Please sign in to comment.