diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index c5b06701737..c3c1b99787b 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -1,6 +1,7 @@ package logprovider import ( + "bytes" "context" "crypto/rand" "errors" @@ -43,6 +44,9 @@ var ( recoveryLogsBurst = int64(500) // blockTimeUpdateCadence is the cadence at which the chain's blocktime is re-calculated blockTimeUpdateCadence = 10 * time.Minute + // maxPendingPayloadsPerUpkeep is the number of logs we can have pending for a single upkeep + // at any given time + maxPendingPayloadsPerUpkeep = 500 ) type LogRecoverer interface { @@ -405,10 +409,14 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB } filteredLogs := r.filterFinalizedStates(f, logs, states) - added, alreadyPending := r.populatePending(f, filteredLogs) + added, alreadyPending, ok := r.populatePending(f, filteredLogs) if added > 0 { r.lggr.Debugw("found missed logs", "added", added, "alreadyPending", alreadyPending, "upkeepID", f.upkeepID) } + if !ok { + r.lggr.Debugw("failed to add all logs to pending", "upkeepID", f.upkeepID) + return nil + } r.filterStore.UpdateFilters(func(uf1, uf2 upkeepFilter) upkeepFilter { uf1.lastRePollBlock = end r.lggr.Debugw("Updated lastRePollBlock", "lastRePollBlock", end, "upkeepID", uf1.upkeepID) @@ -419,13 +427,15 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB } // populatePending adds the logs to the pending list if they are not already pending. -// returns the number of logs added and the number of logs that were already pending. -func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int) { +// returns the number of logs added, the number of logs that were already pending, +// and a flag that indicates whether some errors happened while we are trying to add to pending q. +func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int, bool) { r.lock.Lock() defer r.lock.Unlock() pendingSizeBefore := len(r.pending) alreadyPending := 0 + errs := make([]error, 0) for _, log := range filteredLogs { trigger := logToTrigger(log) // Set the checkBlock and Hash to zero so that the checkPipeline uses the latest block @@ -453,13 +463,16 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller. continue } // r.lggr.Debugw("adding a payload to pending", "payload", payload) - r.visited[wid] = visitedRecord{ - visitedAt: time.Now(), - payload: payload, + if err := r.addPending(payload); err != nil { + errs = append(errs, err) + } else { + r.visited[wid] = visitedRecord{ + visitedAt: time.Now(), + payload: payload, + } } - r.addPending(payload) } - return len(r.pending) - pendingSizeBefore, alreadyPending + return len(r.pending) - pendingSizeBefore, alreadyPending, len(errs) == 0 } // filterFinalizedStates filters out the log upkeeps that have already been completed (performed or ineligible). @@ -619,9 +632,10 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { removed++ continue } - r.addPending(rec.payload) - rec.visitedAt = time.Now() - r.visited[ids[i]] = rec + if err := r.addPending(rec.payload); err == nil { + rec.visitedAt = time.Now() + r.visited[ids[i]] = rec + } default: delete(r.visited, ids[i]) removed++ @@ -637,17 +651,25 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { // addPending adds a payload to the pending list if it's not already there. // NOTE: the lock must be held before calling this function. -func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) { +func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error { var exist bool pending := r.pending + upkeepPayloads := 0 for _, p := range pending { + if bytes.Equal(p.UpkeepID[:], payload.UpkeepID[:]) { + upkeepPayloads++ + } if p.WorkID == payload.WorkID { exist = true } } + if upkeepPayloads >= maxPendingPayloadsPerUpkeep { + return fmt.Errorf("upkeep %v has too many payloads in pending queue", payload.UpkeepID) + } if !exist { r.pending = append(pending, payload) } + return nil } // removePending removes a payload from the pending list. diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go index 59c4244304a..2fdf04f76c7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go @@ -1093,40 +1093,75 @@ func TestLogRecoverer_GetProposalData(t *testing.T) { func TestLogRecoverer_pending(t *testing.T) { tests := []struct { - name string - exist []ocr2keepers.UpkeepPayload - new []ocr2keepers.UpkeepPayload - want []ocr2keepers.UpkeepPayload + name string + maxPerUpkeep int + exist []ocr2keepers.UpkeepPayload + new []ocr2keepers.UpkeepPayload + errored []bool + want []ocr2keepers.UpkeepPayload }{ { - "empty", - []ocr2keepers.UpkeepPayload{}, - []ocr2keepers.UpkeepPayload{}, - []ocr2keepers.UpkeepPayload{}, + name: "empty", + maxPerUpkeep: 10, + exist: []ocr2keepers.UpkeepPayload{}, + new: []ocr2keepers.UpkeepPayload{}, + errored: []bool{}, + want: []ocr2keepers.UpkeepPayload{}, }, { - "add new and existing", - []ocr2keepers.UpkeepPayload{ + name: "add new and existing", + maxPerUpkeep: 10, + exist: []ocr2keepers.UpkeepPayload{ {WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, }, - []ocr2keepers.UpkeepPayload{ + new: []ocr2keepers.UpkeepPayload{ {WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, {WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")}, }, - []ocr2keepers.UpkeepPayload{ + errored: []bool{false, false}, + want: []ocr2keepers.UpkeepPayload{ {WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, {WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")}, }, }, + { + name: "exceed limits for upkeep", + maxPerUpkeep: 3, + exist: []ocr2keepers.UpkeepPayload{ + {WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + {WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + {WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + }, + new: []ocr2keepers.UpkeepPayload{ + {WorkID: "4", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + }, + errored: []bool{true}, + want: []ocr2keepers.UpkeepPayload{ + {WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + {WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + {WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")}, + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + origMaxPendingPayloadsPerUpkeep := maxPendingPayloadsPerUpkeep + maxPendingPayloadsPerUpkeep = tc.maxPerUpkeep + defer func() { + maxPendingPayloadsPerUpkeep = origMaxPendingPayloadsPerUpkeep + }() + r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200)) r.lock.Lock() r.pending = tc.exist - for _, p := range tc.new { - r.addPending(p) + for i, p := range tc.new { + err := r.addPending(p) + if tc.errored[i] { + require.Error(t, err) + continue + } + require.NoError(t, err) } pending := r.pending require.GreaterOrEqual(t, len(pending), len(tc.new))