Skip to content

Commit

Permalink
Revert countBasedPruningActive flag
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Oct 9, 2024
1 parent d3f6648 commit 65e8876
Showing 1 changed file with 22 additions and 50 deletions.
72 changes: 22 additions & 50 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ type logPoller struct {
// Usually the only way to recover is to manually remove the offending logs and block from the database.
// LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should
// recover automatically without needing to restart the LogPoller.
finalityViolated *atomic.Bool
countBasedLogPruningActive *atomic.Bool
finalityViolated *atomic.Bool
}

type Opts struct {
Expand All @@ -159,25 +158,24 @@ type Opts struct {
// support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller {
return &logPoller{
stopCh: make(chan struct{}),
ec: ec,
orm: orm,
headTracker: headTracker,
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),
replayStart: make(chan int64),
replayComplete: make(chan error),
pollPeriod: opts.PollPeriod,
backupPollerBlockDelay: opts.BackupPollerBlockDelay,
finalityDepth: opts.FinalityDepth,
useFinalityTag: opts.UseFinalityTag,
backfillBatchSize: opts.BackfillBatchSize,
rpcBatchSize: opts.RpcBatchSize,
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
countBasedLogPruningActive: new(atomic.Bool),
stopCh: make(chan struct{}),
ec: ec,
orm: orm,
headTracker: headTracker,
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),
replayStart: make(chan int64),
replayComplete: make(chan error),
pollPeriod: opts.PollPeriod,
backupPollerBlockDelay: opts.BackupPollerBlockDelay,
finalityDepth: opts.FinalityDepth,
useFinalityTag: opts.UseFinalityTag,
backfillBatchSize: opts.BackfillBatchSize,
rpcBatchSize: opts.RpcBatchSize,
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
}
}

Expand Down Expand Up @@ -294,9 +292,6 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error {
}
lp.filters[filter.Name] = filter
lp.filterDirty = true
if filter.MaxLogsKept > 0 {
lp.countBasedLogPruningActive.Store(true)
}
return nil
}

Expand Down Expand Up @@ -552,37 +547,18 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i
return mathutil.Min(requested, lastProcessed.BlockNumber), nil
}

// loadFilters loads the filters from db, and activates count-based Log Pruning
// if required by any of the filters
func (lp *logPoller) loadFilters(ctx context.Context) error {
filters, err := lp._loadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}
if lp.countBasedLogPruningActive.Load() {
return nil
}
for _, filter := range filters {
if filter.MaxLogsKept != 0 {
lp.countBasedLogPruningActive.Store(true)
}
}
return nil
}

// _loadFilters is the part of loadFilters() requiring a filterMu lock
func (lp *logPoller) _loadFilters(ctx context.Context) (filters map[string]Filter, err error) {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()
filters, err := lp.orm.LoadFilters(ctx)

filters, err = lp.orm.LoadFilters(ctx)
if err != nil {
return filters, err
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}

lp.filters = filters
lp.filterDirty = true
return filters, nil
return nil
}

// tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period
Expand Down Expand Up @@ -1174,10 +1150,6 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
done = false
}

if !lp.countBasedLogPruningActive.Load() {
return done, err
}

rowIDs, err := lp.orm.SelectExcessLogIDs(ctx, lp.logPrunePageSize)
if err != nil {
lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err)
Expand Down

0 comments on commit 65e8876

Please sign in to comment.