From 1429d472dbac13ad16732ffa37748054187ddaf5 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 21 Oct 2023 00:10:50 -0600 Subject: [PATCH] sticky balancer: try for better topic distribution among members The sticky balancer currently strives for ultimate stickiness, with no regard to trying to balance topic partitions among members equally. When adding a member, it is often the case that an entire topic's partitions shifts to the other member, while the first member has the other topic. By sorting by partition number before balancing, when the algorithm steals partitions from the end of an existing member to give to the new member, we ensure that we divvy up the topics equally to both members while still ensuring stickiness. This is likely not perfect but it goes a long way. --- pkg/kgo/internal/sticky/sticky.go | 18 ++++++++++++++++++ pkg/kgo/internal/sticky/sticky_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/pkg/kgo/internal/sticky/sticky.go b/pkg/kgo/internal/sticky/sticky.go index c8a1e710..a653b67d 100644 --- a/pkg/kgo/internal/sticky/sticky.go +++ b/pkg/kgo/internal/sticky/sticky.go @@ -472,6 +472,24 @@ func (b *balancer) assignUnassignedAndInitGraph() { } b.tryRestickyStales(topicPotentials, partitionConsumers) + + // For each member, we now sort their current partitions by partition, + // then topic. Sorting the lowest numbers first means that once we + // steal from the end (when adding a member), we steal equally across + // all topics. This benefits the standard case the most, where all + // members consume equally. + for memberNum := range b.plan { + partNums := b.plan[memberNum] + sort.Slice(partNums, func(i, j int) bool { + lpNum, rpNum := partNums[i], partNums[j] + ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum] + li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum] + lt, rt := li.topic, ri.topic + lp, rp := lpNum-li.partNum, rpNum-ri.partNum + return lp < rp || (lp == rp && lt < rt) + }) + } + for _, potentials := range topicPotentials { (&membersByPartitions{potentials, b.plan}).init() } diff --git a/pkg/kgo/internal/sticky/sticky_test.go b/pkg/kgo/internal/sticky/sticky_test.go index c0033b8e..264b41e2 100644 --- a/pkg/kgo/internal/sticky/sticky_test.go +++ b/pkg/kgo/internal/sticky/sticky_test.go @@ -1517,6 +1517,30 @@ func Test_stickyBalanceStrategy_Plan_AssignmentWithMultipleGenerations2(t *testi testPlanUsage(t, plan3, topics, nil) } +func Test_stickyAddEqualMove(t *testing.T) { + t.Parallel() + topics := map[string]int32{"foo": 16, "bar": 16} + members := []GroupMember{ + {ID: "1", Topics: []string{"foo", "bar"}}, + } + plan1 := Balance(members, topics) + + // PLAN 2 + members[0].UserData = udEncode(1, 1, plan1["1"]) + members = append(members, GroupMember{ + ID: "2", Topics: []string{"foo", "bar"}, + }) + + plan2 := Balance(members, topics) + testEqualDivvy(t, plan2, 16, members) + testPlanUsage(t, plan2, topics, nil) + + if len(plan2["1"]["foo"]) != 8 || len(plan2["1"]["bar"]) != 8 || + len(plan2["2"]["foo"]) != 8 || len(plan2["2"]["bar"]) != 8 { + t.Errorf("bad distribution: %v", plan2) + } +} + func Test_stickyBalanceStrategy_Plan_AssignmentWithConflictingPreviousGenerations(t *testing.T) { t.Parallel()