diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index d6e55286029..b9d1ba2e98a 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -945,7 +945,7 @@ func (lp *logPoller) LogsWithSigs(start, end int64, eventSigs []common.Hash, add } func (lp *logPoller) LogsCreatedAfter(eventSig common.Hash, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectLogsCreatedAfter(eventSig[:], address, after, confs, qopts...) + return lp.orm.SelectLogsCreatedAfter(address, eventSig, after, confs, qopts...) } // IndexedLogs finds all the logs that have a topic value in topicValues at index topicIndex. diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index c434d18c999..3d4218d6ffd 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -39,23 +39,17 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) -func logRuntime(t *testing.T, start time.Time) { +func logRuntime(t testing.TB, start time.Time) { t.Log("runtime", time.Since(start)) } -func TestPopulateLoadedDB(t *testing.T) { - t.Skip("Only for local load testing and query analysis") - lggr := logger.TestLogger(t) - _, db := heavyweight.FullTestDBV2(t, "logs_scale", nil) - chainID := big.NewInt(137) - - o := logpoller.NewORM(big.NewInt(137), db, lggr, pgtest.NewQConfig(true)) +func populateDatabase(t testing.TB, o *logpoller.ORM, chainID *big.Int) (common.Hash, common.Address, common.Address) { event1 := EmitterABI.Events["Log1"].ID address1 := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb") address2 := common.HexToAddress("0x6E225058950f237371261C985Db6bDe26df2200E") + startDate := time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC) - // We start at 1 just so block number > 0 - for j := 1; j < 1000; j++ { + for j := 1; j < 100; j++ { var logs []logpoller.Log // Max we can insert per batch for i := 0; i < 1000; i++ { @@ -63,20 +57,57 @@ func TestPopulateLoadedDB(t *testing.T) { if (i+(1000*j))%2 == 0 { addr = address2 } + blockNumber := int64(i + (1000 * j)) + blockTimestamp := startDate.Add(time.Duration(j*1000) * time.Hour) + logs = append(logs, logpoller.Log{ - EvmChainId: utils.NewBig(chainID), - LogIndex: 1, - BlockHash: common.HexToHash(fmt.Sprintf("0x%d", i+(1000*j))), - BlockNumber: int64(i + (1000 * j)), - EventSig: event1, - Topics: [][]byte{event1[:], logpoller.EvmWord(uint64(i + 1000*j)).Bytes()}, - Address: addr, - TxHash: common.HexToHash("0x1234"), - Data: logpoller.EvmWord(uint64(i + 1000*j)).Bytes(), + EvmChainId: utils.NewBig(chainID), + LogIndex: 1, + BlockHash: common.HexToHash(fmt.Sprintf("0x%d", i+(1000*j))), + BlockNumber: blockNumber, + BlockTimestamp: blockTimestamp, + EventSig: event1, + Topics: [][]byte{event1[:], logpoller.EvmWord(uint64(i + 1000*j)).Bytes()}, + Address: addr, + TxHash: utils.RandomAddress().Hash(), + Data: logpoller.EvmWord(uint64(i + 1000*j)).Bytes(), + CreatedAt: blockTimestamp, }) + } require.NoError(t, o.InsertLogs(logs)) + require.NoError(t, o.InsertBlock(utils.RandomAddress().Hash(), int64((j+1)*1000-1), startDate.Add(time.Duration(j*1000)*time.Hour))) } + + return event1, address1, address2 +} + +func BenchmarkSelectLogsCreatedAfter(b *testing.B) { + chainId := big.NewInt(137) + _, db := heavyweight.FullTestDBV2(b, "logs_scale", nil) + o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false)) + event, address, _ := populateDatabase(b, o, chainId) + + // Setting searchDate to pick around 5k logs + searchDate := time.Date(2020, 1, 1, 12, 12, 12, 0, time.UTC) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + logs, err := o.SelectLogsCreatedAfter(address, event, searchDate, 500) + require.NotZero(b, len(logs)) + require.NoError(b, err) + } +} + +func TestPopulateLoadedDB(t *testing.T) { + t.Skip("Only for local load testing and query analysis") + _, db := heavyweight.FullTestDBV2(t, "logs_scale", nil) + chainID := big.NewInt(137) + + o := logpoller.NewORM(big.NewInt(137), db, logger.TestLogger(t), pgtest.NewQConfig(true)) + event1, address1, address2 := populateDatabase(t, o, chainID) + func() { defer logRuntime(t, time.Now()) _, err1 := o.SelectLogsByBlockRangeFilter(750000, 800000, address1, event1) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index c062ef3e080..b26c4ac7df6 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -122,7 +122,7 @@ func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address com WHERE evm_chain_id = $1 AND event_sig = $2 AND address = $3 - AND (block_number + $4) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) + AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $4 ORDER BY (block_number, log_index) DESC LIMIT 1`, utils.NewBig(o.chainID), eventSig, address, confs); err != nil { return nil, err } @@ -231,17 +231,22 @@ func (o *ORM) SelectLogsByBlockRangeFilter(start, end int64, address common.Addr } // SelectLogsCreatedAfter finds logs created after some timestamp. -func (o *ORM) SelectLogsCreatedAfter(eventSig []byte, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *ORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { + minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...) + if err != nil { + return nil, err + } + var logs []Log q := o.q.WithOpts(qopts...) - err := q.Select(&logs, ` + err = q.Select(&logs, ` SELECT * FROM evm.logs WHERE evm_chain_id = $1 AND address = $2 AND event_sig = $3 - AND created_at > $4 - AND (block_number + $5) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig, after, confs) + AND block_number > $4 + AND block_number <= $5 + ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig, minBlock, maxBlock) if err != nil { return nil, err } @@ -498,18 +503,24 @@ func validateTopicIndex(index int) error { } func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { - q := o.q.WithOpts(qopts...) + minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...) + if err != nil { + return nil, err + } + var logs []Log + q := o.q.WithOpts(qopts...) topicValuesBytes := concatBytes(topicValues) // Add 1 since postgresql arrays are 1-indexed. - err := q.Select(&logs, ` + err = q.Select(&logs, ` SELECT * FROM evm.logs WHERE evm.logs.evm_chain_id = $1 - AND address = $2 AND event_sig = $3 + AND address = $2 + AND event_sig = $3 AND topics[$4] = ANY($5) - AND created_at > $6 - AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $7 - ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs) + AND block_number > $6 + AND block_number <= $7 + ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, minBlock, maxBlock) if err != nil { return nil, err } @@ -569,7 +580,27 @@ func (o *ORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIn return nil, err } return logs, nil +} +func (o *ORM) blocksRangeAfterTimestamp(after time.Time, confs int, qopts ...pg.QOpt) (int64, int64, error) { + type blockRange struct { + MinBlockNumber int64 `db:"min_block"` + MaxBlockNumber int64 `db:"max_block"` + } + + var br blockRange + q := o.q.WithOpts(qopts...) + err := q.Get(&br, ` + SELECT + coalesce(min(block_number), 0) as min_block, + coalesce(max(block_number), 0) as max_block + FROM evm.log_poller_blocks + WHERE evm_chain_id = $1 + AND block_timestamp > $2`, utils.NewBig(o.chainID), after) + if err != nil { + return 0, 0, err + } + return br.MinBlockNumber, br.MaxBlockNumber - int64(confs), nil } type bytesProducer interface { diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 28f3e8da8e6..f9b51000bb1 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -1087,16 +1087,16 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { th := SetupTH(t, 2, 3, 2) event1 := EmitterABI.Events["Log1"].ID event2 := EmitterABI.Events["Log2"].ID - address1 := common.HexToAddress("0xA") - address2 := common.HexToAddress("0xB") + address1 := utils.RandomAddress() + address2 := utils.RandomAddress() require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{ - GenLog(th.ChainID, 1, 1, "0x1", event1[:], address1), - GenLog(th.ChainID, 2, 1, "0x2", event2[:], address2), - GenLog(th.ChainID, 2, 2, "0x4", event2[:], address2), - GenLog(th.ChainID, 2, 3, "0x6", event2[:], address2), + GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event1[:], address1), + GenLog(th.ChainID, 2, 1, utils.RandomAddress().String(), event2[:], address2), + GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event2[:], address2), + GenLog(th.ChainID, 2, 3, utils.RandomAddress().String(), event2[:], address2), })) - require.NoError(t, th.ORM.InsertBlock(common.HexToHash("0x1"), 3, time.Now())) + require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, time.Now())) tests := []struct { name string @@ -1171,3 +1171,98 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { }) } } + +func TestSelectLogsCreatedAfter(t *testing.T) { + th := SetupTH(t, 2, 3, 2) + event := EmitterABI.Events["Log1"].ID + address := utils.RandomAddress() + + past := time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC) + now := time.Date(2020, 1, 1, 12, 12, 12, 0, time.UTC) + future := time.Date(2030, 1, 1, 12, 12, 12, 0, time.UTC) + + require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{ + GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address), + GenLog(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address), + GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address), + GenLog(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address), + })) + require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 1, past)) + require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 2, now)) + require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, future)) + + type expectedLog struct { + block int64 + log int64 + } + + tests := []struct { + name string + confs int + after time.Time + expectedLogs []expectedLog + }{ + { + name: "picks logs after block 1", + confs: 0, + after: past.Add(-time.Hour), + expectedLogs: []expectedLog{ + {block: 2, log: 1}, + {block: 2, log: 2}, + {block: 3, log: 1}, + }, + }, + { + name: "skips blocks with not enough confirmations", + confs: 1, + after: past.Add(-time.Hour), + expectedLogs: []expectedLog{ + {block: 2, log: 1}, + {block: 2, log: 2}, + }, + }, + { + name: "limits number of blocks by block_timestamp", + confs: 0, + after: now.Add(-time.Hour), + expectedLogs: []expectedLog{ + {block: 3, log: 1}, + }, + }, + { + name: "returns empty dataset for future timestamp", + confs: 0, + after: future, + expectedLogs: []expectedLog{}, + }, + { + name: "returns empty dataset when too many confirmations are required", + confs: 3, + after: past.Add(-time.Hour), + expectedLogs: []expectedLog{}, + }, + } + for _, tt := range tests { + t.Run("SelectLogsCreatedAfter"+tt.name, func(t *testing.T) { + logs, err := th.ORM.SelectLogsCreatedAfter(address, event, tt.after, tt.confs) + require.NoError(t, err) + assert.Len(t, logs, len(tt.expectedLogs)) + + for i, log := range logs { + assert.Equal(t, tt.expectedLogs[i].block, log.BlockNumber) + assert.Equal(t, tt.expectedLogs[i].log, log.LogIndex) + } + }) + + t.Run("SelectIndexedLogsCreatedAfter"+tt.name, func(t *testing.T) { + logs, err := th.ORM.SelectIndexedLogsCreatedAfter(address, event, 0, []common.Hash{event}, tt.after, tt.confs) + require.NoError(t, err) + assert.Len(t, logs, len(tt.expectedLogs)) + + for i, log := range logs { + assert.Equal(t, tt.expectedLogs[i].block, log.BlockNumber) + assert.Equal(t, tt.expectedLogs[i].log, log.LogIndex) + } + }) + } +} diff --git a/core/internal/cltest/heavyweight/orm.go b/core/internal/cltest/heavyweight/orm.go index 3690a986e2e..841901c25aa 100644 --- a/core/internal/cltest/heavyweight/orm.go +++ b/core/internal/cltest/heavyweight/orm.go @@ -29,21 +29,21 @@ import ( // FullTestDBV2 creates a pristine DB which runs in a separate database than the normal // unit tests, so you can do things like use other Postgres connection types with it. -func FullTestDBV2(t *testing.T, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { +func FullTestDBV2(t testing.TB, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { return prepareFullTestDBV2(t, name, false, true, overrideFn) } // FullTestDBNoFixturesV2 is the same as FullTestDB, but it does not load fixtures. -func FullTestDBNoFixturesV2(t *testing.T, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { +func FullTestDBNoFixturesV2(t testing.TB, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { return prepareFullTestDBV2(t, name, false, false, overrideFn) } // FullTestDBEmptyV2 creates an empty DB (without migrations). -func FullTestDBEmptyV2(t *testing.T, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { +func FullTestDBEmptyV2(t testing.TB, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { return prepareFullTestDBV2(t, name, true, false, overrideFn) } -func prepareFullTestDBV2(t *testing.T, name string, empty bool, loadFixtures bool, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { +func prepareFullTestDBV2(t testing.TB, name string, empty bool, loadFixtures bool, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) { testutils.SkipShort(t, "FullTestDB") if empty && loadFixtures { diff --git a/core/store/migrate/migrations/0198_add_block_timestamp_index.sql b/core/store/migrate/migrations/0198_add_block_timestamp_index.sql new file mode 100644 index 00000000000..8f20f4d8491 --- /dev/null +++ b/core/store/migrate/migrations/0198_add_block_timestamp_index.sql @@ -0,0 +1,5 @@ +-- +goose Up +create index log_poller_blocks_by_timestamp on evm.log_poller_blocks (evm_chain_id, block_timestamp); + +-- +goose Down +DROP INDEX IF EXISTS evm.log_poller_blocks_by_timestamp; \ No newline at end of file