From aa22ad5cf1a7165a07183691525da27e3696eaed Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Wed, 28 Feb 2024 09:05:10 +0100 Subject: [PATCH] CCIP-1496 Delete expired logs by the block_timestamp (#12040) * Removing created_at index * SonarQube fix Post rebase fix Added debug log --- core/chains/evm/logpoller/log_poller.go | 1 + core/chains/evm/logpoller/orm.go | 4 +- core/chains/evm/logpoller/orm_test.go | 152 +++++++++--------- .../migrations/0226_log_poller_index_drop.sql | 5 + 4 files changed, 88 insertions(+), 74 deletions(-) create mode 100644 core/store/migrate/migrations/0226_log_poller_index_drop.sql diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index a19bbfda7f1..b1d00206130 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -916,6 +916,7 @@ func (lp *logPoller) latestBlocks(ctx context.Context) (*evmtypes.Head, int64, e } latest := blocks[0] finalized := blocks[1] + lp.lggr.Debugw("Latest blocks read from chain", "latest", latest.Number, "finalized", finalized.Number) return latest, finalized.Number, nil } diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index ca14d29563f..777cf3546cb 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -309,7 +309,7 @@ func (o *DbORM) DeleteExpiredLogs(limit int64, qopts ...pg.QOpt) (int64, error) GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention)) ) DELETE FROM evm.logs l USING r WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event - AND l.created_at <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT) + AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT) ubig.New(o.chainID)) } @@ -397,7 +397,7 @@ func (o *DbORM) SelectLogsByBlockRange(start, end int64) ([]Log, error) { WHERE evm_chain_id = :evm_chain_id AND block_number >= :start_block AND block_number <= :end_block - ORDER BY (block_number, log_index, created_at)`, args) + ORDER BY (block_number, log_index)`, args) if err != nil { return nil, err } diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 655a7295dab..a9f39a7ac4a 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -222,92 +222,100 @@ func TestORM(t *testing.T) { topic2 := common.HexToHash("0x1600") require.NoError(t, o1.InsertLogs([]logpoller.Log{ { - EvmChainId: ubig.New(th.ChainID), - LogIndex: 1, - BlockHash: common.HexToHash("0x1234"), - BlockNumber: int64(10), - EventSig: topic, - Topics: [][]byte{topic[:]}, - Address: common.HexToAddress("0x1234"), - TxHash: common.HexToHash("0x1888"), - Data: []byte("hello"), + EvmChainId: ubig.New(th.ChainID), + LogIndex: 1, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(10), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1234"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), }, { - EvmChainId: ubig.New(th.ChainID), - LogIndex: 2, - BlockHash: common.HexToHash("0x1234"), - BlockNumber: int64(11), - EventSig: topic, - Topics: [][]byte{topic[:]}, - Address: common.HexToAddress("0x1234"), - TxHash: common.HexToHash("0x1888"), - Data: []byte("hello"), + EvmChainId: ubig.New(th.ChainID), + LogIndex: 2, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(11), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1234"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), }, { - EvmChainId: ubig.New(th.ChainID), - LogIndex: 3, - BlockHash: common.HexToHash("0x1234"), - BlockNumber: int64(12), - EventSig: topic, - Topics: [][]byte{topic[:]}, - Address: common.HexToAddress("0x1235"), - TxHash: common.HexToHash("0x1888"), - Data: []byte("hello"), + EvmChainId: ubig.New(th.ChainID), + LogIndex: 3, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(12), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1235"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), }, { - EvmChainId: ubig.New(th.ChainID), - LogIndex: 4, - BlockHash: common.HexToHash("0x1234"), - BlockNumber: int64(13), - EventSig: topic, - Topics: [][]byte{topic[:]}, - Address: common.HexToAddress("0x1235"), - TxHash: common.HexToHash("0x1888"), - Data: []byte("hello"), + EvmChainId: ubig.New(th.ChainID), + LogIndex: 4, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(13), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1235"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), }, { - EvmChainId: ubig.New(th.ChainID), - LogIndex: 5, - BlockHash: common.HexToHash("0x1234"), - BlockNumber: int64(14), - EventSig: topic2, - Topics: [][]byte{topic2[:]}, - Address: common.HexToAddress("0x1234"), - TxHash: common.HexToHash("0x1888"), - Data: []byte("hello2"), + EvmChainId: ubig.New(th.ChainID), + LogIndex: 5, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(14), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1234"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello2"), + BlockTimestamp: time.Now(), }, { - EvmChainId: ubig.New(th.ChainID), - LogIndex: 6, - BlockHash: common.HexToHash("0x1234"), - BlockNumber: int64(15), - EventSig: topic2, - Topics: [][]byte{topic2[:]}, - Address: common.HexToAddress("0x1235"), - TxHash: common.HexToHash("0x1888"), - Data: []byte("hello2"), + EvmChainId: ubig.New(th.ChainID), + LogIndex: 6, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(15), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1235"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello2"), + BlockTimestamp: time.Now(), }, { - EvmChainId: ubig.New(th.ChainID), - LogIndex: 7, - BlockHash: common.HexToHash("0x1237"), - BlockNumber: int64(16), - EventSig: topic, - Topics: [][]byte{topic[:]}, - Address: common.HexToAddress("0x1236"), - TxHash: common.HexToHash("0x1888"), - Data: []byte("hello short retention"), + EvmChainId: ubig.New(th.ChainID), + LogIndex: 7, + BlockHash: common.HexToHash("0x1237"), + BlockNumber: int64(16), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello short retention"), + BlockTimestamp: time.Now(), }, { - EvmChainId: ubig.New(th.ChainID), - LogIndex: 8, - BlockHash: common.HexToHash("0x1238"), - BlockNumber: int64(17), - EventSig: topic2, - Topics: [][]byte{topic2[:]}, - Address: common.HexToAddress("0x1236"), - TxHash: common.HexToHash("0x1888"), - Data: []byte("hello2 long retention"), + EvmChainId: ubig.New(th.ChainID), + LogIndex: 8, + BlockHash: common.HexToHash("0x1238"), + BlockNumber: int64(17), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello2 long retention"), + BlockTimestamp: time.Now(), }, })) diff --git a/core/store/migrate/migrations/0226_log_poller_index_drop.sql b/core/store/migrate/migrations/0226_log_poller_index_drop.sql new file mode 100644 index 00000000000..f20fd00aaa2 --- /dev/null +++ b/core/store/migrate/migrations/0226_log_poller_index_drop.sql @@ -0,0 +1,5 @@ +-- +goose Up +DROP INDEX evm.evm_logs_idx_created_at; + +-- +goose Down +CREATE INDEX evm_logs_idx_created_at ON evm.logs (created_at);