Skip to content

Commit

Permalink
Merge pull request #612 from twmb/sticky
Browse files Browse the repository at this point in the history
sticky: further improvements
  • Loading branch information
twmb authored Nov 1, 2023
2 parents d815037 + 36b4437 commit 3c628d5
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 32 deletions.
28 changes: 28 additions & 0 deletions pkg/kgo/internal/sticky/go121.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//go:build go1.21
// +build go1.21

package sticky

import "slices"

func sortPartNums(ps memberPartitions) {
slices.Sort(ps)
}

func (b *balancer) sortMemberByLiteralPartNum(memberNum int) {
partNums := b.plan[memberNum]
slices.SortFunc(partNums, func(lpNum, rpNum int32) int {
ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum]
li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum]
lt, rt := li.topic, ri.topic
lp, rp := lpNum-li.partNum, rpNum-ri.partNum
if lp < rp {
return -1
} else if lp > rp {
return 1
} else if lt < rt {
return -1
}
return 1
})
}
22 changes: 22 additions & 0 deletions pkg/kgo/internal/sticky/goold.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build !go1.21
// +build !go1.21

package sticky

import "sort"

func sortPartNums(partNums memberPartitions) {
sort.Slice(partNums, func(i, j int) bool { return partNums[i] < partNums[j] })
}

func (b *balancer) sortMemberByLiteralPartNum(memberNum int) {
partNums := b.plan[memberNum]
sort.Slice(partNums, func(i, j int) bool {
lpNum, rpNum := partNums[i], partNums[j]
ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum]
li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum]
lt, rt := li.topic, ri.topic
lp, rp := lpNum-li.partNum, rpNum-ri.partNum
return lp < rp || (lp == rp && lt < rt)
})
}
68 changes: 36 additions & 32 deletions pkg/kgo/internal/sticky/sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package sticky

import (
"math"
"sort"

"github.com/twmb/franz-go/pkg/kbin"
"github.com/twmb/franz-go/pkg/kmsg"
Expand Down Expand Up @@ -138,7 +137,7 @@ func (b *balancer) into() Plan {
// partOwners is created by topic, and partNums refers to
// indices in partOwners. If we sort by partNum, we have sorted
// topics and partitions.
sort.Sort(&partNums) //nolint:gosec // sorting the slice, not using the pointer across iter
sortPartNums(partNums)

// We can reuse partNums for our topic partitions.
topicParts := partNums[:0]
Expand Down Expand Up @@ -203,10 +202,6 @@ func (m *memberPartitions) add(partNum int32) {
*m = append(*m, partNum)
}

func (m *memberPartitions) Len() int { return len(*m) }
func (m *memberPartitions) Less(i, j int) bool { return (*m)[i] < (*m)[j] }
func (m *memberPartitions) Swap(i, j int) { (*m)[i], (*m)[j] = (*m)[j], (*m)[i] }

// membersPartitions maps members to their partitions.
type membersPartitions []memberPartitions

Expand Down Expand Up @@ -479,33 +474,42 @@ func (b *balancer) assignUnassignedAndInitGraph() {
// all topics. This benefits the standard case the most, where all
// members consume equally.
for memberNum := range b.plan {
partNums := b.plan[memberNum]
sort.Slice(partNums, func(i, j int) bool {
lpNum, rpNum := partNums[i], partNums[j]
ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum]
li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum]
lt, rt := li.topic, ri.topic
lp, rp := lpNum-li.partNum, rpNum-ri.partNum
return lp < rp || (lp == rp && lt < rt)
})
}

for _, potentials := range topicPotentials {
(&membersByPartitions{potentials, b.plan}).init()
b.sortMemberByLiteralPartNum(memberNum)
}

for partNum, owner := range partitionConsumers {
if owner.memberNum != unassignedPart {
continue
if !b.isComplex && len(topicPotentials) > 0 {
potentials := topicPotentials[0]
(&membersByPartitions{potentials, b.plan}).init()
for partNum, owner := range partitionConsumers {
if owner.memberNum != unassignedPart {
continue
}
assigned := potentials[0]
b.plan[assigned].add(int32(partNum))
(&membersByPartitions{potentials, b.plan}).fix0()
partitionConsumers[partNum].memberNum = assigned
}
potentials := topicPotentials[b.partOwners[partNum]]
if len(potentials) == 0 {
continue
} else {
for partNum, owner := range partitionConsumers {
if owner.memberNum != unassignedPart {
continue
}
potentials := topicPotentials[b.partOwners[partNum]]
if len(potentials) == 0 {
continue
}
leastConsumingPotential := potentials[0]
leastConsuming := len(b.plan[leastConsumingPotential])
for _, potential := range potentials[1:] {
potentialConsuming := len(b.plan[potential])
if potentialConsuming < leastConsuming {
leastConsumingPotential = potential
leastConsuming = potentialConsuming
}
}
b.plan[leastConsumingPotential].add(int32(partNum))
partitionConsumers[partNum].memberNum = leastConsumingPotential
}
assigned := potentials[0]
b.plan[assigned].add(int32(partNum))
(&membersByPartitions{potentials, b.plan}).fix0()
partitionConsumers[partNum].memberNum = assigned
}

// Lastly, with everything assigned, we build our steal graph for
Expand Down Expand Up @@ -553,7 +557,7 @@ func (b *balancer) tryRestickyStales(
currentOwner := partitionConsumers[staleNum].memberNum
lastOwnerPartitions := &b.plan[lastOwnerNum]
currentOwnerPartitions := &b.plan[currentOwner]
if lastOwnerPartitions.Len()+1 < currentOwnerPartitions.Len() {
if len(*lastOwnerPartitions)+1 < len(*currentOwnerPartitions) {
currentOwnerPartitions.remove(staleNum)
lastOwnerPartitions.add(staleNum)
}
Expand Down Expand Up @@ -704,8 +708,8 @@ func (b *balancer) reassignPartition(src, dst uint16, partNum int32) {
srcPartitions := &b.plan[src]
dstPartitions := &b.plan[dst]

oldSrcLevel := srcPartitions.Len()
oldDstLevel := dstPartitions.Len()
oldSrcLevel := len(*srcPartitions)
oldDstLevel := len(*dstPartitions)

srcPartitions.remove(partNum)
dstPartitions.add(partNum)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kgo/internal/sticky/sticky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,20 @@ func Test_stickyAddEqualMove(t *testing.T) {
}
}

func Test_stickyTwoJoinEqualBalance(t *testing.T) {
t.Parallel()
topics := map[string]int32{"foo": 16, "bar": 16}
members := []GroupMember{
{ID: "1", Topics: []string{"foo", "bar"}},
{ID: "2", Topics: []string{"foo", "bar"}},
}
plan := Balance(members, topics)
if len(plan["1"]["foo"]) != 8 || len(plan["1"]["bar"]) != 8 ||
len(plan["2"]["foo"]) != 8 || len(plan["2"]["bar"]) != 8 {
t.Errorf("bad distribution: %v", plan)
}
}

func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 3c628d5

Please sign in to comment.