diff --git a/pkg/kgo/internal/sticky/sticky.go b/pkg/kgo/internal/sticky/sticky.go index c8a1e710..a653b67d 100644 --- a/pkg/kgo/internal/sticky/sticky.go +++ b/pkg/kgo/internal/sticky/sticky.go @@ -472,6 +472,24 @@ func (b *balancer) assignUnassignedAndInitGraph() { } b.tryRestickyStales(topicPotentials, partitionConsumers) + + // For each member, we now sort their current partitions by partition, + // then topic. Sorting the lowest numbers first means that once we + // steal from the end (when adding a member), we steal equally across + // all topics. This benefits the standard case the most, where all + // members consume equally. + for memberNum := range b.plan { + partNums := b.plan[memberNum] + sort.Slice(partNums, func(i, j int) bool { + lpNum, rpNum := partNums[i], partNums[j] + ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum] + li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum] + lt, rt := li.topic, ri.topic + lp, rp := lpNum-li.partNum, rpNum-ri.partNum + return lp < rp || (lp == rp && lt < rt) + }) + } + for _, potentials := range topicPotentials { (&membersByPartitions{potentials, b.plan}).init() } diff --git a/pkg/kgo/internal/sticky/sticky_test.go b/pkg/kgo/internal/sticky/sticky_test.go index c0033b8e..264b41e2 100644 --- a/pkg/kgo/internal/sticky/sticky_test.go +++ b/pkg/kgo/internal/sticky/sticky_test.go @@ -1517,6 +1517,30 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testi testPlanUsage(t, plan3, topics, nil) } +func Test_stickyAddEqualMove(t *testing.T) { + t.Parallel() + topics := map[string]int32{"foo": 16, "bar": 16} + members := []GroupMember{ + {ID: "1", Topics: []string{"foo", "bar"}}, + } + plan1 := Balance(members, topics) + + // PLAN 2 + members[0].UserData = udEncode(1, 1, plan1["1"]) + members = append(members, GroupMember{ + ID: "2", Topics: []string{"foo", "bar"}, + }) + + plan2 := Balance(members, topics) + testEqualDivvy(t, plan2, 16, members) + testPlanUsage(t, plan2, topics, nil) + + if len(plan2["1"]["foo"]) != 8 || len(plan2["1"]["bar"]) != 8 || + len(plan2["2"]["foo"]) != 8 || len(plan2["2"]["bar"]) != 8 { + t.Errorf("bad distribution: %v", plan2) + } +} + func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) { t.Parallel()