diff --git a/pkg/kgo/internal/sticky/sticky.go b/pkg/kgo/internal/sticky/sticky.go index a653b67d..a59ae857 100644 --- a/pkg/kgo/internal/sticky/sticky.go +++ b/pkg/kgo/internal/sticky/sticky.go @@ -490,22 +490,39 @@ func (b *balancer) assignUnassignedAndInitGraph() { }) } - for _, potentials := range topicPotentials { + if !b.isComplex && len(topicPotentials) > 0 { + potentials := topicPotentials[0] (&membersByPartitions{potentials, b.plan}).init() - } - - for partNum, owner := range partitionConsumers { - if owner.memberNum != unassignedPart { - continue + for partNum, owner := range partitionConsumers { + if owner.memberNum != unassignedPart { + continue + } + assigned := potentials[0] + b.plan[assigned].add(int32(partNum)) + (&membersByPartitions{potentials, b.plan}).fix0() + partitionConsumers[partNum].memberNum = assigned } - potentials := topicPotentials[b.partOwners[partNum]] - if len(potentials) == 0 { - continue + } else { + for partNum, owner := range partitionConsumers { + if owner.memberNum != unassignedPart { + continue + } + potentials := topicPotentials[b.partOwners[partNum]] + if len(potentials) == 0 { + continue + } + leastConsumingPotential := potentials[0] + leastConsuming := len(b.plan[leastConsumingPotential]) + for _, potential := range potentials[1:] { + potentialConsuming := len(b.plan[potential]) + if potentialConsuming < leastConsuming { + leastConsumingPotential = potential + leastConsuming = potentialConsuming + } + } + b.plan[leastConsumingPotential].add(int32(partNum)) + partitionConsumers[partNum].memberNum = leastConsumingPotential } - assigned := potentials[0] - b.plan[assigned].add(int32(partNum)) - (&membersByPartitions{potentials, b.plan}).fix0() - partitionConsumers[partNum].memberNum = assigned } // Lastly, with everything assigned, we build our steal graph for