Skip to content

Commit

Permalink
avoid promoting lastRePollBlock if limit was exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Sep 20, 2023
1 parent d75a700 commit 5976a03
Showing 1 changed file with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,14 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
}
filteredLogs := r.filterFinalizedStates(f, logs, states)

added, alreadyPending := r.populatePending(f, filteredLogs)
added, alreadyPending, ok := r.populatePending(f, filteredLogs)
if added > 0 {
r.lggr.Debugw("found missed logs", "added", added, "alreadyPending", alreadyPending, "upkeepID", f.upkeepID)
}
if !ok {
r.lggr.Debugw("failed to add all logs to pending", "upkeepID", f.upkeepID)
return nil
}
r.filterStore.UpdateFilters(func(uf1, uf2 upkeepFilter) upkeepFilter {
uf1.lastRePollBlock = end
r.lggr.Debugw("Updated lastRePollBlock", "lastRePollBlock", end, "upkeepID", uf1.upkeepID)
Expand All @@ -423,13 +427,15 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB
}

// populatePending adds the logs to the pending list if they are not already pending.
// returns the number of logs added and the number of logs that were already pending.
func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int) {
// returns the number of logs added, the number of logs that were already pending,
// and a flag that indicates whether some errors happened while we are trying to add to pending q.
func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.Log) (int, int, bool) {
r.lock.Lock()
defer r.lock.Unlock()

pendingSizeBefore := len(r.pending)
alreadyPending := 0
errs := make([]error, 0)
for _, log := range filteredLogs {
trigger := logToTrigger(log)
// Set the checkBlock and Hash to zero so that the checkPipeline uses the latest block
Expand Down Expand Up @@ -462,9 +468,11 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.
visitedAt: time.Now(),
payload: payload,
}
} else {
errs = append(errs, err)
}
}
return len(r.pending) - pendingSizeBefore, alreadyPending
return len(r.pending) - pendingSizeBefore, alreadyPending, len(errs) == 0
}

// filterFinalizedStates filters out the log upkeeps that have already been completed (performed or ineligible).
Expand Down

0 comments on commit 5976a03

Please sign in to comment.