From e0850a6a31843606015d1c49d52b5a6ad8727378 Mon Sep 17 00:00:00 2001 From: Domino Valdano Date: Thu, 8 Aug 2024 10:53:43 -0700 Subject: [PATCH] BCI-3492 [LogPoller]: Allow withObservedExecAndRowsAffected to report non-zero rows affected (#14057) * Fix withObservedExecAndRowsAffected Also: - Change behavior of DeleteExpiredLogs to delete logs which don't match any filter - Add a test case to ensure the dataset size is published properly during pruning * pnpm changeset * changeset #fix -> #bugfix --- .changeset/sweet-pumas-refuse.md | 5 ++++ core/chains/evm/logpoller/observability.go | 2 +- .../evm/logpoller/observability_test.go | 11 +++++++ core/chains/evm/logpoller/orm.go | 30 ++++++++----------- core/chains/evm/logpoller/orm_test.go | 13 ++++---- 5 files changed, 37 insertions(+), 24 deletions(-) create mode 100644 .changeset/sweet-pumas-refuse.md diff --git a/.changeset/sweet-pumas-refuse.md b/.changeset/sweet-pumas-refuse.md new file mode 100644 index 00000000000..fd642a9c94c --- /dev/null +++ b/.changeset/sweet-pumas-refuse.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#bugfix Addresses 2 minor issues with the pruning of LogPoller's db tables: logs not matching any filter will now be pruned, and rows deleted are now properly reported for observability diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 7842a060eca..782307e7d06 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -285,7 +285,7 @@ func withObservedExecAndRowsAffected(o *ObservedORM, queryName string, queryType WithLabelValues(o.chainId, queryName, string(queryType)). Observe(float64(time.Since(queryStarted))) - if err != nil { + if err == nil { o.datasetSize. WithLabelValues(o.chainId, queryName, string(queryType)). Set(float64(rowsAffected)) diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 78c27b4b8f7..4ea7adceab0 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus/testutil" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -117,6 +118,16 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420"))) + rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3) + require.NoError(t, err) + require.Equal(t, int64(3), rowsAffected) + assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete")) + + rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0) + require.NoError(t, err) + require.Equal(t, int64(2), rowsAffected) + assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete")) + // Don't update counters in case of an error require.Error(t, orm.InsertLogsWithBlock(ctx, logs, NewLogPollerBlock(utils.RandomBytes32(), 0, time.Now(), 0))) assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420"))) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 1d249760736..22870efccf3 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -313,34 +314,29 @@ type Exp struct { ShouldDelete bool } +// DeleteExpiredLogs removes any logs which either: +// - don't match any currently registered filters, or +// - have a timestamp older than any matching filter's retention, UNLESS there is at +// least one matching filter with retention=0 func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { var err error var result sql.Result - if limit > 0 { - result, err = o.ds.ExecContext(ctx, ` - DELETE FROM evm.logs + query := `DELETE FROM evm.logs WHERE (evm_chain_id, address, event_sig, block_number) IN ( SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number FROM evm.logs l - INNER JOIN ( - SELECT address, event, MAX(retention) AS retention + LEFT JOIN ( + SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention FROM evm.log_poller_filters WHERE evm_chain_id = $1 GROUP BY evm_chain_id, address, event - HAVING NOT 0 = ANY(ARRAY_AGG(retention)) ) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event - AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') - LIMIT $2 - )`, ubig.New(o.chainID), limit) + WHERE r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)` + + if limit > 0 { + result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit) } else { - result, err = o.ds.ExecContext(ctx, `WITH r AS - ( SELECT address, event, MAX(retention) AS retention - FROM evm.log_poller_filters WHERE evm_chain_id=$1 - GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention)) - ) DELETE FROM evm.logs l USING r - WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event - AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT) - ubig.New(o.chainID)) + result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID)) } if err != nil { diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 6f431b6db92..0df34196ff9 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -458,20 +458,21 @@ func TestORM(t *testing.T) { time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period deleted, err := o1.DeleteExpiredLogs(ctx, 0) require.NoError(t, err) - assert.Equal(t, int64(1), deleted) + assert.Equal(t, int64(4), deleted) + logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - // The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour) - // Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything - // matching filter12 should be kept regardless of what other filters it matches. - assert.Len(t, logs, 7) + // It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all + // 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1 + // of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12. + assert.Len(t, logs, 4) // Delete logs after should delete all logs. err = o1.DeleteLogsAndBlocksAfter(ctx, 1) require.NoError(t, err) logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber) require.NoError(t, err) - require.Zero(t, len(logs)) + assert.Zero(t, len(logs)) } type PgxLogger struct {