diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index 31de875d021..b1be71aad19 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "math" "math/big" "sort" "sync" @@ -38,6 +39,8 @@ var ( // recoveryLogsBuffer is the number of blocks to be used as a safety buffer when reading logs recoveryLogsBuffer = int64(200) recoveryLogsBurst = int64(500) + // blockTimeUpdateCadence is the cadence at which the chain's blocktime is re-calculated + blockTimeUpdateCadence = 10 * time.Minute ) type LogRecoverer interface { @@ -67,11 +70,12 @@ type logRecoverer struct { pending []ocr2keepers.UpkeepPayload visited map[string]visitedRecord - filterStore UpkeepFilterStore - states core.UpkeepStateReader - packer LogDataPacker - poller logpoller.LogPoller - client client.Client + filterStore UpkeepFilterStore + states core.UpkeepStateReader + packer LogDataPacker + poller logpoller.LogPoller + client client.Client + blockTimeResolver *blockTimeResolver } var _ LogRecoverer = &logRecoverer{} @@ -84,13 +88,14 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie lookbackBlocks: &atomic.Int64{}, interval: opts.ReadInterval * 5, - pending: make([]ocr2keepers.UpkeepPayload, 0), - visited: make(map[string]visitedRecord), - poller: poller, - filterStore: filterStore, - states: stateStore, - packer: packer, - client: client, + pending: make([]ocr2keepers.UpkeepPayload, 0), + visited: make(map[string]visitedRecord), + poller: poller, + filterStore: filterStore, + states: stateStore, + packer: packer, + client: client, + blockTimeResolver: newBlockTimeResolver(poller), } rec.lookbackBlocks.Store(opts.LookbackBlocks) @@ -111,35 +116,30 @@ func (r *logRecoverer) Start(pctx context.Context) error { r.cancel = cancel r.lock.Unlock() - blockTimeResolver := newBlockTimeResolver(r.poller) - blockTime, err := blockTimeResolver.BlockTime(ctx, defaultSampleSize) - if err != nil { - // TODO: TBD exit or just log a warning - // return fmt.Errorf("failed to compute block time: %w", err) - r.lggr.Warnw("failed to compute block time", "err", err) - } - if blockTime > 0 { - r.blockTime.Store(int64(blockTime)) - } + r.updateBlockTime(ctx) r.lggr.Infow("starting log recoverer", "blockTime", r.blockTime.Load(), "lookbackBlocks", r.lookbackBlocks.Load(), "interval", r.interval) { go func(ctx context.Context, interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() + recoverTicker := time.NewTicker(interval) + defer recoverTicker.Stop() gcTicker := time.NewTicker(utils.WithJitter(GCInterval)) defer gcTicker.Stop() + blockTimeUpdateTicker := time.NewTicker(blockTimeUpdateCadence) + defer blockTimeUpdateTicker.Stop() for { select { - case <-ticker.C: + case <-recoverTicker.C: if err := r.recover(ctx); err != nil { r.lggr.Warnw("failed to recover logs", "err", err) } case <-gcTicker.C: r.clean(ctx) gcTicker.Reset(utils.WithJitter(GCInterval)) + case <-blockTimeUpdateTicker.C: + r.updateBlockTime(ctx) case <-ctx.Done(): return } @@ -650,3 +650,21 @@ func (r *logRecoverer) sortPending(latestBlock uint64) { return shuffledIDs[r.pending[i].WorkID] < shuffledIDs[r.pending[j].WorkID] }) } + +func (r *logRecoverer) updateBlockTime(ctx context.Context) { + blockTime, err := r.blockTimeResolver.BlockTime(ctx, defaultSampleSize) + if err != nil { + r.lggr.Warnw("failed to compute block time", "err", err) + return + } + if blockTime > 0 { + currentBlockTime := r.blockTime.Load() + newBlockTime := int64(blockTime) + if currentBlockTime > 0 && (int64(math.Abs(float64(currentBlockTime-newBlockTime)))*100/currentBlockTime) > 20 { + r.lggr.Warnf("updating blocktime from %d to %d, this change is larger than 20%", currentBlockTime, newBlockTime) + } else { + r.lggr.Debugf("updating blocktime from %d to %d", currentBlockTime, newBlockTime) + } + r.blockTime.Store(newBlockTime) + } +}