From a7b9328364c9ca712bdaf51a12e07d9695e071b2 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Thu, 10 Oct 2024 20:48:53 -0700 Subject: [PATCH] Address remaining PR comments - Remove topics from SelectExcessLogs query - Early exit from loadFilters - upper >= end --- core/chains/evm/logpoller/log_poller.go | 7 ++++--- core/chains/evm/logpoller/orm.go | 11 ++--------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index e592060030..44de497562 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -555,7 +555,7 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i // loadFilters loads the filters from db, and activates count-based Log Pruning // if required by any of the filters func (lp *logPoller) loadFilters(ctx context.Context) error { - filters, err := lp._loadFilters(ctx) + filters, err := lp.lockAndLoadFilters(ctx) if err != nil { return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") } @@ -565,13 +565,14 @@ func (lp *logPoller) loadFilters(ctx context.Context) error { for _, filter := range filters { if filter.MaxLogsKept != 0 { lp.countBasedLogPruningActive.Store(true) + return nil } } return nil } -// _loadFilters is the part of loadFilters() requiring a filterMu lock -func (lp *logPoller) _loadFilters(ctx context.Context) (filters map[string]Filter, err error) { +// lockAndLoadFilters is the part of loadFilters() requiring a filterMu lock +func (lp *logPoller) lockAndLoadFilters(ctx context.Context) (filters map[string]Filter, err error) { lp.filterMu.Lock() defer lp.filterMu.Unlock() diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 246e09eb0d..4d7cf33ebe 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -346,7 +346,7 @@ func (r *RangeQueryer[T]) ExecPagedQuery(ctx context.Context, limit, end int64) } rowsAffected += rows - if upper == end { + if upper >= end { break } } @@ -457,9 +457,6 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results [] withSubQuery := ` SELECT name, ARRAY_AGG(address) AS addresses, ARRAY_AGG(event) AS events, - (ARRAY_AGG(topic2) FILTER(WHERE topic2 IS NOT NULL)) AS topic2, - (ARRAY_AGG(topic3) FILTER(WHERE topic3 IS NOT NULL)) AS topic3, - (ARRAY_AGG(topic4) FILTER(WHERE topic4 IS NOT NULL)) AS topic4, MAX(max_logs_kept) AS max_logs_kept -- Should all be the same, just need MAX for GROUP BY FROM evm.log_poller_filters WHERE evm_chain_id=$1 GROUP BY name` @@ -469,11 +466,7 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results [] SELECT l.id, block_number, log_index, max_logs_kept != 0 AND ROW_NUMBER() OVER(PARTITION BY f.name ORDER BY block_number, log_index DESC) > max_logs_kept AS old FROM filters f JOIN evm.logs l ON - l.address = ANY(f.addresses) AND - l.event_sig = ANY(f.events) AND - (f.topic2 IS NULL OR l.topics[1] = ANY(f.topic2)) AND - (f.topic3 IS NULL OR l.topics[2] = ANY(f.topic3)) AND - (f.topic4 IS NULL OR l.topics[3] = ANY(f.topic4)) + l.address = ANY(f.addresses) AND l.event_sig = ANY(f.events) WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3 `