Skip to content

Commit

Permalink
reduce duplicate rdy updation requests
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Jan 15, 2020
1 parent d9600b3 commit fa27f35
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
6 changes: 6 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,13 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error {
}

atomic.AddInt64(&r.totalRdyCount, count-c.RDY())

lastRDY := c.LastRDY()
c.SetRDY(count)
if count == lastRDY {
return nil
}

err := c.WriteCommand(Ready(int(count)))
if err != nil {
r.log(LogLevelError, "(%s) error sending RDY %d - %s", c.String(), count, err)
Expand Down
4 changes: 2 additions & 2 deletions mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,10 @@ func TestConsumerRequeueNoBackoff(t *testing.T) {
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDRequeueNoBackoff),
"RDY 1",
"RDY 1",
"RDY 1", // Duplicate rdy update will be ignored.
fmt.Sprintf("FIN %s", msgIDGood),
}
if len(n.got) != len(expected) {
if len(n.got) != len(expected) - 1 {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
}
for i, r := range n.got {
Expand Down

0 comments on commit fa27f35

Please sign in to comment.