diff --git a/pkg/kgo/internal/sticky/go121.go b/pkg/kgo/internal/sticky/go121.go new file mode 100644 index 00000000..3cf972b6 --- /dev/null +++ b/pkg/kgo/internal/sticky/go121.go @@ -0,0 +1,28 @@ +//go:build go1.21 +// +build go1.21 + +package sticky + +import "slices" + +func sortPartNums(ps memberPartitions) { + slices.Sort(ps) +} + +func (b *balancer) sortMemberByLiteralPartNum(memberNum int) { + partNums := b.plan[memberNum] + slices.SortFunc(partNums, func(lpNum, rpNum int32) int { + ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum] + li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum] + lt, rt := li.topic, ri.topic + lp, rp := lpNum-li.partNum, rpNum-ri.partNum + if lp < rp { + return -1 + } else if lp > rp { + return 1 + } else if lt < rt { + return -1 + } + return 1 + }) +} diff --git a/pkg/kgo/internal/sticky/goold.go b/pkg/kgo/internal/sticky/goold.go new file mode 100644 index 00000000..addd2bbc --- /dev/null +++ b/pkg/kgo/internal/sticky/goold.go @@ -0,0 +1,22 @@ +//go:build !go1.21 +// +build !go1.21 + +package sticky + +import "sort" + +func sortPartNums(partNums memberPartitions) { + sort.Slice(partNums, func(i, j int) bool { return partNums[i] < partNums[j] }) +} + +func (b *balancer) sortMemberByLiteralPartNum(memberNum int) { + partNums := b.plan[memberNum] + sort.Slice(partNums, func(i, j int) bool { + lpNum, rpNum := partNums[i], partNums[j] + ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum] + li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum] + lt, rt := li.topic, ri.topic + lp, rp := lpNum-li.partNum, rpNum-ri.partNum + return lp < rp || (lp == rp && lt < rt) + }) +} diff --git a/pkg/kgo/internal/sticky/sticky.go b/pkg/kgo/internal/sticky/sticky.go index a653b67d..9f48d7dc 100644 --- a/pkg/kgo/internal/sticky/sticky.go +++ b/pkg/kgo/internal/sticky/sticky.go @@ -7,7 +7,6 @@ package sticky import ( "math" - "sort" "github.com/twmb/franz-go/pkg/kbin" "github.com/twmb/franz-go/pkg/kmsg" @@ -138,7 +137,7 @@ func (b *balancer) into() Plan { // partOwners is created by topic, and partNums refers to // indices in partOwners. If we sort by partNum, we have sorted // topics and partitions. - sort.Sort(&partNums) //nolint:gosec // sorting the slice, not using the pointer across iter + sortPartNums(partNums) // We can reuse partNums for our topic partitions. topicParts := partNums[:0] @@ -203,10 +202,6 @@ func (m *memberPartitions) add(partNum int32) { *m = append(*m, partNum) } -func (m *memberPartitions) Len() int { return len(*m) } -func (m *memberPartitions) Less(i, j int) bool { return (*m)[i] < (*m)[j] } -func (m *memberPartitions) Swap(i, j int) { (*m)[i], (*m)[j] = (*m)[j], (*m)[i] } - // membersPartitions maps members to their partitions. type membersPartitions []memberPartitions @@ -479,33 +474,42 @@ func (b *balancer) assignUnassignedAndInitGraph() { // all topics. This benefits the standard case the most, where all // members consume equally. for memberNum := range b.plan { - partNums := b.plan[memberNum] - sort.Slice(partNums, func(i, j int) bool { - lpNum, rpNum := partNums[i], partNums[j] - ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum] - li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum] - lt, rt := li.topic, ri.topic - lp, rp := lpNum-li.partNum, rpNum-ri.partNum - return lp < rp || (lp == rp && lt < rt) - }) - } - - for _, potentials := range topicPotentials { - (&membersByPartitions{potentials, b.plan}).init() + b.sortMemberByLiteralPartNum(memberNum) } - for partNum, owner := range partitionConsumers { - if owner.memberNum != unassignedPart { - continue + if !b.isComplex && len(topicPotentials) > 0 { + potentials := topicPotentials[0] + (&membersByPartitions{potentials, b.plan}).init() + for partNum, owner := range partitionConsumers { + if owner.memberNum != unassignedPart { + continue + } + assigned := potentials[0] + b.plan[assigned].add(int32(partNum)) + (&membersByPartitions{potentials, b.plan}).fix0() + partitionConsumers[partNum].memberNum = assigned } - potentials := topicPotentials[b.partOwners[partNum]] - if len(potentials) == 0 { - continue + } else { + for partNum, owner := range partitionConsumers { + if owner.memberNum != unassignedPart { + continue + } + potentials := topicPotentials[b.partOwners[partNum]] + if len(potentials) == 0 { + continue + } + leastConsumingPotential := potentials[0] + leastConsuming := len(b.plan[leastConsumingPotential]) + for _, potential := range potentials[1:] { + potentialConsuming := len(b.plan[potential]) + if potentialConsuming < leastConsuming { + leastConsumingPotential = potential + leastConsuming = potentialConsuming + } + } + b.plan[leastConsumingPotential].add(int32(partNum)) + partitionConsumers[partNum].memberNum = leastConsumingPotential } - assigned := potentials[0] - b.plan[assigned].add(int32(partNum)) - (&membersByPartitions{potentials, b.plan}).fix0() - partitionConsumers[partNum].memberNum = assigned } // Lastly, with everything assigned, we build our steal graph for @@ -553,7 +557,7 @@ func (b *balancer) tryRestickyStales( currentOwner := partitionConsumers[staleNum].memberNum lastOwnerPartitions := &b.plan[lastOwnerNum] currentOwnerPartitions := &b.plan[currentOwner] - if lastOwnerPartitions.Len()+1 < currentOwnerPartitions.Len() { + if len(*lastOwnerPartitions)+1 < len(*currentOwnerPartitions) { currentOwnerPartitions.remove(staleNum) lastOwnerPartitions.add(staleNum) } @@ -704,8 +708,8 @@ func (b *balancer) reassignPartition(src, dst uint16, partNum int32) { srcPartitions := &b.plan[src] dstPartitions := &b.plan[dst] - oldSrcLevel := srcPartitions.Len() - oldDstLevel := dstPartitions.Len() + oldSrcLevel := len(*srcPartitions) + oldDstLevel := len(*dstPartitions) srcPartitions.remove(partNum) dstPartitions.add(partNum) diff --git a/pkg/kgo/internal/sticky/sticky_test.go b/pkg/kgo/internal/sticky/sticky_test.go index 264b41e2..b5e0eef5 100644 --- a/pkg/kgo/internal/sticky/sticky_test.go +++ b/pkg/kgo/internal/sticky/sticky_test.go @@ -1541,6 +1541,20 @@ func Test_stickyAddEqualMove(t *testing.T) { } } +func Test_stickyTwoJoinEqualBalance(t *testing.T) { + t.Parallel() + topics := map[string]int32{"foo": 16, "bar": 16} + members := []GroupMember{ + {ID: "1", Topics: []string{"foo", "bar"}}, + {ID: "2", Topics: []string{"foo", "bar"}}, + } + plan := Balance(members, topics) + if len(plan["1"]["foo"]) != 8 || len(plan["1"]["bar"]) != 8 || + len(plan["2"]["foo"]) != 8 || len(plan["2"]["bar"]) != 8 { + t.Errorf("bad distribution: %v", plan) + } +} + func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) { t.Parallel()