Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.5.1-auto] Recoverer: limit the number of pending payloads for an upkeep #10714

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logprovider

import (
"bytes"
"context"
"crypto/rand"
"errors"
Expand Down Expand Up @@ -43,6 +44,9 @@ var (
recoveryLogsBurst = int64(500)
// blockTimeUpdateCadence is the cadence at which the chain's blocktime is re-calculated
blockTimeUpdateCadence = 10 * time.Minute
// maxPendingPayloadsPerUpkeep is the number of logs we can have pending for a single upkeep
// at any given time
maxPendingPayloadsPerUpkeep = 500
)

type LogRecoverer interface {
Expand Down Expand Up @@ -405,10 +409,14 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
}
filteredLogs := r.filterFinalizedStates(f, logs, states)

added, alreadyPending := r.populatePending(f, filteredLogs)
added, alreadyPending, ok := r.populatePending(f, filteredLogs)
if added > 0 {
r.lggr.Debugw("found missed logs", "added", added, "alreadyPending", alreadyPending, "upkeepID", f.upkeepID)
}
if !ok {
r.lggr.Debugw("failed to add all logs to pending", "upkeepID", f.upkeepID)
return nil
}
r.filterStore.UpdateFilters(func(uf1, uf2 upkeepFilter) upkeepFilter {
uf1.lastRePollBlock = end
r.lggr.Debugw("Updated lastRePollBlock", "lastRePollBlock", end, "upkeepID", uf1.upkeepID)
Expand All @@ -419,13 +427,15 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
}

// populatePending adds the logs to the pending list if they are not already pending.
// returns the number of logs added and the number of logs that were already pending.
func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int) {
// returns the number of logs added, the number of logs that were already pending,
// and a flag that indicates whether some errors happened while we are trying to add to pending q.
func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int, bool) {
r.lock.Lock()
defer r.lock.Unlock()

pendingSizeBefore := len(r.pending)
alreadyPending := 0
errs := make([]error, 0)
for _, log := range filteredLogs {
trigger := logToTrigger(log)
// Set the checkBlock and Hash to zero so that the checkPipeline uses the latest block
Expand Down Expand Up @@ -453,13 +463,16 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.
continue
}
// r.lggr.Debugw("adding a payload to pending", "payload", payload)
r.visited[wid] = visitedRecord{
visitedAt: time.Now(),
payload: payload,
if err := r.addPending(payload); err != nil {
errs = append(errs, err)
} else {
r.visited[wid] = visitedRecord{
visitedAt: time.Now(),
payload: payload,
}
}
r.addPending(payload)
}
return len(r.pending) - pendingSizeBefore, alreadyPending
return len(r.pending) - pendingSizeBefore, alreadyPending, len(errs) == 0
}

// filterFinalizedStates filters out the log upkeeps that have already been completed (performed or ineligible).
Expand Down Expand Up @@ -619,9 +632,10 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
removed++
continue
}
r.addPending(rec.payload)
rec.visitedAt = time.Now()
r.visited[ids[i]] = rec
if err := r.addPending(rec.payload); err == nil {
rec.visitedAt = time.Now()
r.visited[ids[i]] = rec
}
default:
delete(r.visited, ids[i])
removed++
Expand All @@ -637,17 +651,25 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {

// addPending adds a payload to the pending list if it's not already there.
// NOTE: the lock must be held before calling this function.
func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) {
func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) error {
var exist bool
pending := r.pending
upkeepPayloads := 0
for _, p := range pending {
if bytes.Equal(p.UpkeepID[:], payload.UpkeepID[:]) {
upkeepPayloads++
}
if p.WorkID == payload.WorkID {
exist = true
}
}
if upkeepPayloads >= maxPendingPayloadsPerUpkeep {
return fmt.Errorf("upkeep %v has too many payloads in pending queue", payload.UpkeepID)
}
if !exist {
r.pending = append(pending, payload)
}
return nil
}

// removePending removes a payload from the pending list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,40 +1093,75 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {

func TestLogRecoverer_pending(t *testing.T) {
tests := []struct {
name string
exist []ocr2keepers.UpkeepPayload
new []ocr2keepers.UpkeepPayload
want []ocr2keepers.UpkeepPayload
name string
maxPerUpkeep int
exist []ocr2keepers.UpkeepPayload
new []ocr2keepers.UpkeepPayload
errored []bool
want []ocr2keepers.UpkeepPayload
}{
{
"empty",
[]ocr2keepers.UpkeepPayload{},
[]ocr2keepers.UpkeepPayload{},
[]ocr2keepers.UpkeepPayload{},
name: "empty",
maxPerUpkeep: 10,
exist: []ocr2keepers.UpkeepPayload{},
new: []ocr2keepers.UpkeepPayload{},
errored: []bool{},
want: []ocr2keepers.UpkeepPayload{},
},
{
"add new and existing",
[]ocr2keepers.UpkeepPayload{
name: "add new and existing",
maxPerUpkeep: 10,
exist: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
[]ocr2keepers.UpkeepPayload{
new: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")},
},
[]ocr2keepers.UpkeepPayload{
errored: []bool{false, false},
want: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")},
},
},
{
name: "exceed limits for upkeep",
maxPerUpkeep: 3,
exist: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
new: []ocr2keepers.UpkeepPayload{
{WorkID: "4", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
errored: []bool{true},
want: []ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
origMaxPendingPayloadsPerUpkeep := maxPendingPayloadsPerUpkeep
maxPendingPayloadsPerUpkeep = tc.maxPerUpkeep
defer func() {
maxPendingPayloadsPerUpkeep = origMaxPendingPayloadsPerUpkeep
}()

r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200))
r.lock.Lock()
r.pending = tc.exist
for _, p := range tc.new {
r.addPending(p)
for i, p := range tc.new {
err := r.addPending(p)
if tc.errored[i] {
require.Error(t, err)
continue
}
require.NoError(t, err)
}
pending := r.pending
require.GreaterOrEqual(t, len(pending), len(tc.new))
Expand Down