Skip to content

Commit

Permalink
Merge pull request #737 from twmb/705
Browse files Browse the repository at this point in the history
pkg/kgo: re-add fetch-canceled partitions AFTER the user callback
  • Loading branch information
twmb authored May 26, 2024
2 parents ad5b742 + 55dc7a0 commit 051703b
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,12 +862,6 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) {
fetchDone := make(chan struct{})
defer func() { <-fetchDone }()

// If cooperative consuming, we may have to resume fetches. See the
// comment on adjustCooperativeFetchOffsets.
if g.cooperative.Load() {
added = g.adjustCooperativeFetchOffsets(added, lost)
}

// Before we fetch offsets, we wait for the user's onAssign callback to
// be done. This ensures a few things:
//
Expand All @@ -884,6 +878,18 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) {
// necessarily run onRevoke before returning (because of a fatal
// error).
s.assign(g, added)

// If cooperative consuming, we may have to resume fetches. See the
// comment on adjustCooperativeFetchOffsets.
//
// We do this AFTER the user's callback. If we add more partitions
// to `added` that are from a previously canceled fetch, we do NOT
// want to pass those fetch-resumed partitions to the user callback
// again. See #705.
if g.cooperative.Load() {
added = g.adjustCooperativeFetchOffsets(added, lost)
}

<-s.assignDone

if len(added) > 0 {
Expand Down

0 comments on commit 051703b

Please sign in to comment.