Skip to content

Commit

Permalink
Merge pull request #341 from karalabe/fix-rdy-distribution
Browse files Browse the repository at this point in the history
consumer: lower old RDYs first, then assign to new connection
  • Loading branch information
mreiferson authored Nov 28, 2021
2 parents 259dc59 + 5f35ce8 commit 827b836
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,11 @@ func (r *Consumer) ConnectToNSQD(addr string) error {

// pre-emptive signal to existing connections to lower their RDY count
for _, c := range r.conns() {
r.maybeUpdateRDY(c)
if c != conn {
r.maybeUpdateRDY(c)
}
}
r.maybeUpdateRDY(conn)

return nil
}
Expand Down Expand Up @@ -912,7 +915,9 @@ func (r *Consumer) maybeUpdateRDY(conn *Conn) {

count := r.perConnMaxInFlight()
r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count)
r.updateRDY(conn, count)
if err := r.updateRDY(conn, count); err != nil {
r.log(LogLevelWarning, "(%s) error sending RDY %d: %v", conn, count, err)
}
}

func (r *Consumer) rdyLoop() {
Expand Down

0 comments on commit 827b836

Please sign in to comment.