From 55dc7a0310507cb698e6294b8ce1e7f90a24e1f7 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 25 May 2024 19:11:46 -0600 Subject: [PATCH] pkg/kgo: re-add fetch-canceled partitions AFTER the user callback See the command #705 for more details. --- pkg/kgo/consumer_group.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index cd76684b..1c915899 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -861,12 +861,6 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { fetchDone := make(chan struct{}) defer func() { <-fetchDone }() - // If cooperative consuming, we may have to resume fetches. See the - // comment on adjustCooperativeFetchOffsets. - if g.cooperative.Load() { - added = g.adjustCooperativeFetchOffsets(added, lost) - } - // Before we fetch offsets, we wait for the user's onAssign callback to // be done. This ensures a few things: // @@ -883,6 +877,18 @@ func (g *groupConsumer) setupAssignedAndHeartbeat() (string, error) { // necessarily run onRevoke before returning (because of a fatal // error). s.assign(g, added) + + // If cooperative consuming, we may have to resume fetches. See the + // comment on adjustCooperativeFetchOffsets. + // + // We do this AFTER the user's callback. If we add more partitions + // to `added` that are from a previously canceled fetch, we do NOT + // want to pass those fetch-resumed partitions to the user callback + // again. See #705. + if g.cooperative.Load() { + added = g.adjustCooperativeFetchOffsets(added, lost) + } + <-s.assignDone if len(added) > 0 {