Skip to content

Commit

Permalink
Merge pull request #674 from twmb/patches
Browse files Browse the repository at this point in the history
Patches
  • Loading branch information
twmb authored Feb 7, 2024
2 parents 41f0269 + e08d276 commit c5207aa
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 10 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ For the bug: if you tried using CommitOffsetsSync during a group rebalance, and
you canceled your context while the group was still rebalancing, then
CommitOffsetsSync would enter a deadlock and never return. That has been fixed.

- [`cd65d77`](https://github.com/twmb/franz-go/commit/cd65d77) kgo: fix bug
- [`cd65d77`](https://github.com/twmb/franz-go/commit/cd65d77) and [`99d6dfb`](https://github.com/twmb/franz-go/commit/99d6dfb) kgo: fix bug
- [`d40ac19`](https://github.com/twmb/franz-go/commit/d40ac19) kgo: un-deprecate SaramaHasher and add docs explaining why

v1.16.0
Expand Down
15 changes: 6 additions & 9 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2580,13 +2580,13 @@ func (g *groupConsumer) commitOffsetsSync(
onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {}
}

g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done.

if err := g.waitJoinSyncMu(ctx); err != nil {
onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err)
close(done)
return
}

g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done.
unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
g.noCommitDuringJoinAndSync.RUnlock()
defer close(done)
Expand Down Expand Up @@ -2663,19 +2663,16 @@ func (cl *Client) CommitOffsets(
return
}

g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us
unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
defer g.syncCommitMu.RUnlock()
onDone(cl, req, resp, err)
}

if err := g.waitJoinSyncMu(ctx); err != nil {
onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err)
return
}

g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us
unblockJoinSync := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
g.noCommitDuringJoinAndSync.RUnlock()
unblockSyncCommit(cl, req, resp, err)
defer g.syncCommitMu.RUnlock()
onDone(cl, req, resp, err)
}

g.mu.Lock()
Expand Down

0 comments on commit c5207aa

Please sign in to comment.