Skip to content

Commit

Permalink
Merge pull request #673 from twmb/patch
Browse files Browse the repository at this point in the history
v1.16.1 patches
  • Loading branch information
twmb authored Feb 7, 2024
2 parents a2d69ce + 20867cd commit 41f0269
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
v1.16.1
===

This patch release fixes one bug and un-deprecates SaramaHasher.

SaramaHasher, while not identical to Sarama's partitioner, actually _is_
identical to some other partitioners in the Kafka client ecosystem. So, the old
function is now un-deprecated, but the documentation correctly points you to
SaramaCompatHasher and mentions why you may still want to use SaramaHasher.

For the bug: if you tried using CommitOffsetsSync during a group rebalance, and
you canceled your context while the group was still rebalancing, then
CommitOffsetsSync would enter a deadlock and never return. That has been fixed.

- [`cd65d77`](https://github.com/twmb/franz-go/commit/cd65d77) kgo: fix bug
- [`d40ac19`](https://github.com/twmb/franz-go/commit/d40ac19) kgo: un-deprecate SaramaHasher and add docs explaining why

v1.16.0
===

Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2584,6 +2584,7 @@ func (g *groupConsumer) commitOffsetsSync(

if err := g.waitJoinSyncMu(ctx); err != nil {
onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err)
close(done)
return
}
unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
Expand Down
13 changes: 10 additions & 3 deletions pkg/kgo/partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,16 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher {
}
}

// Deprecated: SaramaHasher is not compatible with Sarama's default partitioner
// and only remains to avoid re-keying records for existing users of this API. See
// [SaramaCompatHasher] for a correct partitioner.
// SaramaHasher is a historical misnamed partitioner. This library's original
// implementation of the SaramaHasher was incorrect, if you want an exact
// match for the Sarama partitioner, use the [SaramaCompatHasher].
//
// This partitioner remains because as it turns out, other ecosystems provide
// a similar partitioner and this partitioner is useful for compatibility.
//
// In particular, using this function with a crc32.ChecksumIEEE hasher makes
// this partitioner match librdkafka's consistent partitioner, or the
// zendesk/ruby-kafka partitioner.
func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher {
return func(key []byte, n int) int {
p := int(hashFn(key)) % n
Expand Down

0 comments on commit 41f0269

Please sign in to comment.