Skip to content

Commit

Permalink
Recoverer: limit the number of pending payloads for an upkeep (#10712)
Browse files Browse the repository at this point in the history
* recoverer: limit the number of allowed payloads for an upkeep

* modify maxPendingPayloadsPerUpkeep to 500

* avoid promoting lastRePollBlock if limit was exceeded

* fix wrong if
  • Loading branch information
amirylm authored Sep 20, 2023
1 parent 334587c commit 51b040f
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logprovider

import (
"bytes"
"context"
"crypto/rand"
"errors"
Expand Down Expand Up @@ -43,6 +44,9 @@ var (
recoveryLogsBurst = int64(500)
// blockTimeUpdateCadence is the cadence at which the chain's blocktime is re-calculated
blockTimeUpdateCadence = 10 * time.Minute
// maxPendingPayloadsPerUpkeep is the number of logs we can have pending for a single upkeep
// at any given time
maxPendingPayloadsPerUpkeep = 500
)

type LogRecoverer interface {
Expand Down Expand Up @@ -405,10 +409,14 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
}
filteredLogs := r.filterFinalizedStates(f, logs, states)

added, alreadyPending := r.populatePending(f, filteredLogs)
added, alreadyPending, ok := r.populatePending(f, filteredLogs)
if added > 0 {
r.lggr.Debugw("found missed logs", "added", added, "alreadyPending", alreadyPending, "upkeepID", f.upkeepID)
}
if !ok {
r.lggr.Debugw("failed to add all logs to pending", "upkeepID", f.upkeepID)
return nil
}
r.filterStore.UpdateFilters(func(uf1, uf2 upkeepFilter) upkeepFilter {
uf1.lastRePollBlock = end
r.lggr.Debugw("Updated lastRePollBlock", "lastRePollBlock", end, "upkeepID", uf1.upkeepID)
Expand All @@ -419,13 +427,15 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
}

// populatePending adds the logs to the pending list if they are not already pending.
// returns the number of logs added and the number of logs that were already pending.
func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int) {
// returns the number of logs added, the number of logs that were already pending,
// and a flag that indicates whether some errors happened while we are trying to add to pending q.
func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int, bool) {
r.lock.Lock()
defer r.lock.Unlock()

pendingSizeBefore := len(r.pending)
alreadyPending := 0
errs := make([]error, 0)
for _, log := range filteredLogs {
trigger := logToTrigger(log)
// Set the checkBlock and Hash to zero so that the checkPipeline uses the latest block
Expand Down Expand Up @@ -453,13 +463,16 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.
continue
}
// r.lggr.Debugw("adding a payload to pending", "payload", payload)
r.visited[wid] = visitedRecord{
visitedAt: time.Now(),
payload: payload,
if err := r.addPending(payload); err != nil {
errs = append(errs, err)
} else {
r.visited[wid] = visitedRecord{
visitedAt: time.Now(),
payload: payload,
}
}
r.addPending(payload)
}
return len(r.pending) - pendingSizeBefore, alreadyPending
return len(r.pending) - pendingSizeBefore, alreadyPending, len(errs) == 0
}

// filterFinalizedStates filters out the log upkeeps that have already been completed (performed or ineligible).
Expand Down Expand Up @@ -619,9 +632,10 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
removed++
continue
}
r.addPending(rec.payload)
rec.visitedAt = time.Now()
r.visited[ids[i]] = rec
if err := r.addPending(rec.payload); err == nil {
rec.visitedAt = time.Now()
r.visited[ids[i]] = rec
}
default:
delete(r.visited, ids[i])
removed++
Expand All @@ -637,17 +651,25 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {

// addPending adds a payload to the pending list if it's not already there.
// NOTE: the lock must be held before calling this function.
func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) {
func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error {
var exist bool
pending := r.pending
upkeepPayloads := 0
for _, p := range pending {
if bytes.Equal(p.UpkeepID[:], payload.UpkeepID[:]) {
upkeepPayloads++
}
if p.WorkID == payload.WorkID {
exist = true
}
}
if upkeepPayloads >= maxPendingPayloadsPerUpkeep {
return fmt.Errorf("upkeep %v has too many payloads in pending queue", payload.UpkeepID)
}
if !exist {
r.pending = append(pending, payload)
}
return nil
}

// removePending removes a payload from the pending list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,40 +1093,75 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {

func TestLogRecoverer_pending(t *testing.T) {
tests := []struct {
name string
exist []ocr2keepers.UpkeepPayload
new []ocr2keepers.UpkeepPayload
want []ocr2keepers.UpkeepPayload
name string
maxPerUpkeep int
exist []ocr2keepers.UpkeepPayload
new []ocr2keepers.UpkeepPayload
errored []bool
want []ocr2keepers.UpkeepPayload
}{
{
"empty",
[]ocr2keepers.UpkeepPayload{},
[]ocr2keepers.UpkeepPayload{},
[]ocr2keepers.UpkeepPayload{},
name: "empty",
maxPerUpkeep: 10,
exist: []ocr2keepers.UpkeepPayload{},
new: []ocr2keepers.UpkeepPayload{},
errored: []bool{},
want: []ocr2keepers.UpkeepPayload{},
},
{
"add new and existing",
[]ocr2keepers.UpkeepPayload{
name: "add new and existing",
maxPerUpkeep: 10,
exist: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
[]ocr2keepers.UpkeepPayload{
new: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")},
},
[]ocr2keepers.UpkeepPayload{
errored: []bool{false, false},
want: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")},
},
},
{
name: "exceed limits for upkeep",
maxPerUpkeep: 3,
exist: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
new: []ocr2keepers.UpkeepPayload{
{WorkID: "4", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
errored: []bool{true},
want: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
origMaxPendingPayloadsPerUpkeep := maxPendingPayloadsPerUpkeep
maxPendingPayloadsPerUpkeep = tc.maxPerUpkeep
defer func() {
maxPendingPayloadsPerUpkeep = origMaxPendingPayloadsPerUpkeep
}()

r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200))
r.lock.Lock()
r.pending = tc.exist
for _, p := range tc.new {
r.addPending(p)
for i, p := range tc.new {
err := r.addPending(p)
if tc.errored[i] {
require.Error(t, err)
continue
}
require.NoError(t, err)
}
pending := r.pending
require.GreaterOrEqual(t, len(pending), len(tc.new))
Expand Down

0 comments on commit 51b040f

Please sign in to comment.