diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index c7f6884426f..d1dd7252117 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -50,6 +50,10 @@ var ( // maxPendingPayloadsPerUpkeep is the number of logs we can have pending for a single upkeep // at any given time maxPendingPayloadsPerUpkeep = 500 + // recoveryRateLimit is the rate limit for the number of times that an upkeep can be scanned for recovery per recoveryRatePeriod + recoveryRateLimit = uint32(3) + // recoveryRatePeriod is the period for the recovery rate limit + recoveryRatePeriod = time.Minute ) type LogRecoverer interface { @@ -87,6 +91,8 @@ type logRecoverer struct { client client.Client blockTimeResolver *blockTimeResolver + buckets *tokenBuckets + finalityDepth int64 } @@ -111,6 +117,8 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie client: client, blockTimeResolver: newBlockTimeResolver(poller), + buckets: newTokenBuckets(recoveryRateLimit, recoveryRatePeriod), + finalityDepth: opts.FinalityDepth, } @@ -130,6 +138,8 @@ func (r *logRecoverer) Start(ctx context.Context) error { r.lggr.Infow("starting log recoverer", "blockTime", r.blockTime.Load(), "lookbackBlocks", r.lookbackBlocks.Load(), "interval", r.interval) + r.threadCtrl.Go(r.buckets.Start) + r.threadCtrl.Go(func(ctx context.Context) { recoveryTicker := time.NewTicker(r.interval) defer recoveryTicker.Stop() @@ -349,6 +359,8 @@ func (r *logRecoverer) recover(ctx context.Context) error { var wg sync.WaitGroup for _, f := range filters { + // apply rate limit, we ignore the result as upkeepID is already rate limited in getFilterBatch + _ = r.buckets.Accept(f.upkeepID.String(), 1) wg.Add(1) go func(f upkeepFilter) { defer wg.Done() @@ -513,6 +525,10 @@ func (r *logRecoverer) getRecoveryWindow(latest int64) (int64, int64) { // getFilterBatch returns a batch of filters that are ready to be recovered. func (r *logRecoverer) getFilterBatch(offsetBlock int64) []upkeepFilter { filters := r.filterStore.GetFilters(func(f upkeepFilter) bool { + if r.buckets.Touch(f.upkeepID.String(), 1) { + // upkeepID is rate limited + return false + } // ensure we work only on filters that are ready to be recovered // no need to recover in case f.configUpdateBlock is after offsetBlock return f.lastRePollBlock < offsetBlock && int64(f.configUpdateBlock) <= offsetBlock