Skip to content

Commit

Permalink
reduce duplicate rdy updation requests
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Jan 15, 2020
1 parent d9600b3 commit 0f433a4
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
6 changes: 6 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,13 @@ func (r *Consumer) sendRDY(c *Conn, count int64) error {
}

atomic.AddInt64(&r.totalRdyCount, count-c.RDY())

lastRDY := c.LastRDY()
c.SetRDY(count)
if count == lastRDY {
return nil
}

err := c.WriteCommand(Ready(int(count)))
if err != nil {
r.log(LogLevelError, "(%s) error sending RDY %d - %s", c.String(), count, err)
Expand Down
12 changes: 8 additions & 4 deletions mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,19 @@ func TestConsumerRequeueNoBackoff(t *testing.T) {
"RDY 0",
fmt.Sprintf("REQ %s 0", msgIDRequeueNoBackoff),
"RDY 1",
"RDY 1",
"RDY 1", // Duplicate rdy update will be ignored.
fmt.Sprintf("FIN %s", msgIDGood),
}
if len(n.got) != len(expected) {
if len(n.got) != len(expected) - 1 {
t.Fatalf("we got %d commands != %d expected", len(n.got), len(expected))
}
for i, r := range n.got {
if string(r) != expected[i] {
t.Fatalf("cmd %d bad %s != %s", i, r, expected[i])
j := i
if i == len(n.got) - 1 {
j = i + 1
}
if string(r) != expected[j] {
t.Fatalf("cmd %d bad %s != %s", i, r, expected[j])
}
}
}
Expand Down

0 comments on commit 0f433a4

Please sign in to comment.