Skip to content

Commit

Permalink
update log recoverer to update the block time periodically (#10544)
Browse files Browse the repository at this point in the history
* update log recoverer to update the block time periodically

* add check for currentBlockTime > 0

---------

Co-authored-by: Akshay Aggarwal <[email protected]>
  • Loading branch information
RyanRHall and infiloop2 authored Sep 8, 2023
1 parent 1347424 commit b338b71
Showing 1 changed file with 43 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/big"
"sort"
"sync"
Expand Down Expand Up @@ -38,6 +39,8 @@ var (
// recoveryLogsBuffer is the number of blocks to be used as a safety buffer when reading logs
recoveryLogsBuffer = int64(200)
recoveryLogsBurst = int64(500)
// blockTimeUpdateCadence is the cadence at which the chain's blocktime is re-calculated
blockTimeUpdateCadence = 10 * time.Minute
)

type LogRecoverer interface {
Expand Down Expand Up @@ -67,11 +70,12 @@ type logRecoverer struct {
pending []ocr2keepers.UpkeepPayload
visited map[string]visitedRecord

filterStore UpkeepFilterStore
states core.UpkeepStateReader
packer LogDataPacker
poller logpoller.LogPoller
client client.Client
filterStore UpkeepFilterStore
states core.UpkeepStateReader
packer LogDataPacker
poller logpoller.LogPoller
client client.Client
blockTimeResolver *blockTimeResolver
}

var _ LogRecoverer = &logRecoverer{}
Expand All @@ -84,13 +88,14 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie
lookbackBlocks: &atomic.Int64{},
interval: opts.ReadInterval * 5,

pending: make([]ocr2keepers.UpkeepPayload, 0),
visited: make(map[string]visitedRecord),
poller: poller,
filterStore: filterStore,
states: stateStore,
packer: packer,
client: client,
pending: make([]ocr2keepers.UpkeepPayload, 0),
visited: make(map[string]visitedRecord),
poller: poller,
filterStore: filterStore,
states: stateStore,
packer: packer,
client: client,
blockTimeResolver: newBlockTimeResolver(poller),
}

rec.lookbackBlocks.Store(opts.LookbackBlocks)
Expand All @@ -111,35 +116,30 @@ func (r *logRecoverer) Start(pctx context.Context) error {
r.cancel = cancel
r.lock.Unlock()

blockTimeResolver := newBlockTimeResolver(r.poller)
blockTime, err := blockTimeResolver.BlockTime(ctx, defaultSampleSize)
if err != nil {
// TODO: TBD exit or just log a warning
// return fmt.Errorf("failed to compute block time: %w", err)
r.lggr.Warnw("failed to compute block time", "err", err)
}
if blockTime > 0 {
r.blockTime.Store(int64(blockTime))
}
r.updateBlockTime(ctx)

r.lggr.Infow("starting log recoverer", "blockTime", r.blockTime.Load(), "lookbackBlocks", r.lookbackBlocks.Load(), "interval", r.interval)

{
go func(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
recoverTicker := time.NewTicker(interval)
defer recoverTicker.Stop()
gcTicker := time.NewTicker(utils.WithJitter(GCInterval))
defer gcTicker.Stop()
blockTimeUpdateTicker := time.NewTicker(blockTimeUpdateCadence)
defer blockTimeUpdateTicker.Stop()

for {
select {
case <-ticker.C:
case <-recoverTicker.C:
if err := r.recover(ctx); err != nil {
r.lggr.Warnw("failed to recover logs", "err", err)
}
case <-gcTicker.C:
r.clean(ctx)
gcTicker.Reset(utils.WithJitter(GCInterval))
case <-blockTimeUpdateTicker.C:
r.updateBlockTime(ctx)
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -650,3 +650,21 @@ func (r *logRecoverer) sortPending(latestBlock uint64) {
return shuffledIDs[r.pending[i].WorkID] < shuffledIDs[r.pending[j].WorkID]
})
}

func (r *logRecoverer) updateBlockTime(ctx context.Context) {
blockTime, err := r.blockTimeResolver.BlockTime(ctx, defaultSampleSize)
if err != nil {
r.lggr.Warnw("failed to compute block time", "err", err)
return
}
if blockTime > 0 {
currentBlockTime := r.blockTime.Load()
newBlockTime := int64(blockTime)
if currentBlockTime > 0 && (int64(math.Abs(float64(currentBlockTime-newBlockTime)))*100/currentBlockTime) > 20 {
r.lggr.Warnf("updating blocktime from %d to %d, this change is larger than 20%", currentBlockTime, newBlockTime)
} else {
r.lggr.Debugf("updating blocktime from %d to %d", currentBlockTime, newBlockTime)
}
r.blockTime.Store(newBlockTime)
}
}

0 comments on commit b338b71

Please sign in to comment.