diff --git a/CHANGELOG.md b/CHANGELOG.md index a16e2063..a19d22d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ For the bug: if you tried using CommitOffsetsSync during a group rebalance, and you canceled your context while the group was still rebalancing, then CommitOffsetsSync would enter a deadlock and never return. That has been fixed. -- [`cd65d77`](https://github.com/twmb/franz-go/commit/cd65d77) kgo: fix bug +- [`cd65d77`](https://github.com/twmb/franz-go/commit/cd65d77) and [`99d6dfb`](https://github.com/twmb/franz-go/commit/99d6dfb) kgo: fix bug - [`d40ac19`](https://github.com/twmb/franz-go/commit/d40ac19) kgo: un-deprecate SaramaHasher and add docs explaining why v1.16.0 diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 81832a6e..cd76684b 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2580,13 +2580,13 @@ func (g *groupConsumer) commitOffsetsSync( onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {} } - g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done. - if err := g.waitJoinSyncMu(ctx); err != nil { onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err) close(done) return } + + g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done. unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { g.noCommitDuringJoinAndSync.RUnlock() defer close(done) @@ -2663,19 +2663,16 @@ func (cl *Client) CommitOffsets( return } - g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us - unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { - defer g.syncCommitMu.RUnlock() - onDone(cl, req, resp, err) - } - if err := g.waitJoinSyncMu(ctx); err != nil { onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err) return } + + g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us unblockJoinSync := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { g.noCommitDuringJoinAndSync.RUnlock() - unblockSyncCommit(cl, req, resp, err) + defer g.syncCommitMu.RUnlock() + onDone(cl, req, resp, err) } g.mu.Lock()