Skip to content

Commit

Permalink
sticky
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Oct 31, 2023
1 parent 6ebcb43 commit 40aaa21
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions pkg/kgo/internal/sticky/sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,22 +490,39 @@ func (b *balancer) assignUnassignedAndInitGraph() {
})
}

for _, potentials := range topicPotentials {
if !b.isComplex && len(topicPotentials) > 0 {
potentials := topicPotentials[0]
(&membersByPartitions{potentials, b.plan}).init()
}

for partNum, owner := range partitionConsumers {
if owner.memberNum != unassignedPart {
continue
for partNum, owner := range partitionConsumers {
if owner.memberNum != unassignedPart {
continue
}
assigned := potentials[0]
b.plan[assigned].add(int32(partNum))
(&membersByPartitions{potentials, b.plan}).fix0()
partitionConsumers[partNum].memberNum = assigned
}
potentials := topicPotentials[b.partOwners[partNum]]
if len(potentials) == 0 {
continue
} else {
for partNum, owner := range partitionConsumers {
if owner.memberNum != unassignedPart {
continue
}
potentials := topicPotentials[b.partOwners[partNum]]
if len(potentials) == 0 {
continue
}
leastConsumingPotential := potentials[0]
leastConsuming := len(b.plan[leastConsumingPotential])
for _, potential := range potentials[1:] {
potentialConsuming := len(b.plan[potential])
if potentialConsuming < leastConsuming {
leastConsumingPotential = potential
leastConsuming = potentialConsuming
}
}
b.plan[leastConsumingPotential].add(int32(partNum))
partitionConsumers[partNum].memberNum = leastConsumingPotential
}
assigned := potentials[0]
b.plan[assigned].add(int32(partNum))
(&membersByPartitions{potentials, b.plan}).fix0()
partitionConsumers[partNum].memberNum = assigned
}

// Lastly, with everything assigned, we build our steal graph for
Expand Down

0 comments on commit 40aaa21

Please sign in to comment.