From a3bd1f7650103e859f0f1fff7e2a305e16f2d597 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Wed, 14 Feb 2024 09:30:34 +0100 Subject: [PATCH] Improving deletes performance by limiting number of records to scan (#12007) * Improving deletes performance by limiting number of records to scan * Post review fixes - replacing direct latestBlock call with nested query --- core/chains/evm/logpoller/orm.go | 18 ++++++++++++++---- core/chains/evm/logpoller/orm_test.go | 2 +- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 663c56d10ed..1db8271ccb6 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-common/pkg/logger" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) @@ -196,8 +197,6 @@ func (o *DbORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { } func (o *DbORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error { - // These deletes are bounded by reorg depth, so they are - // fast and should not slow down the log readers. return o.q.WithOpts(qopts...).Transaction(func(tx pg.Queryer) error { args, err := newQueryArgs(o.chainID). withStartBlock(start). @@ -207,13 +206,24 @@ func (o *DbORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error { return err } - _, err = tx.NamedExec(`DELETE FROM evm.log_poller_blocks WHERE block_number >= :start_block AND evm_chain_id = :evm_chain_id`, args) + // Applying upper bound filter is critical for Postgres performance (especially for evm.logs table) + // because it allows the planner to properly estimate the number of rows to be scanned. + // If not applied, these queries can become very slow. After some critical number + // of logs, Postgres will try to scan all the logs in the index by block_number. + // Latency without upper bound filter can be orders of magnitude higher for large number of logs. + _, err = tx.NamedExec(`DELETE FROM evm.log_poller_blocks + WHERE evm_chain_id = :evm_chain_id + AND block_number >= :start_block + AND block_number <= (SELECT MAX(block_number) FROM evm.log_poller_blocks WHERE evm_chain_id = :evm_chain_id)`, args) if err != nil { o.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err) return err } - _, err = tx.NamedExec(`DELETE FROM evm.logs WHERE block_number >= :start_block AND evm_chain_id = :evm_chain_id`, args) + _, err = tx.NamedExec(`DELETE FROM evm.logs + WHERE evm_chain_id = :evm_chain_id + AND block_number >= :start_block + AND block_number <= (SELECT MAX(block_number) FROM evm.logs WHERE evm_chain_id = :evm_chain_id)`, args) if err != nil { o.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err) return err diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 0af62ebd547..bcaa6f72fa0 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -1435,7 +1435,7 @@ func TestInsertLogsInTx(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // clean all logs and blocks between test cases - defer func() { _ = o.DeleteLogsAndBlocksAfter(0) }() + defer func() { _, _ = db.Exec("truncate evm.logs") }() insertErr := o.InsertLogs(tt.logs) logsFromDb, err := o.SelectLogs(0, math.MaxInt, address, event)