Skip to content

Commit

Permalink
CCIP-1053 Replace filtering by created_at with filtering by block_tim…
Browse files Browse the repository at this point in the history
…estamp (#10743)

* Replace filtering by created_at with filtering by block_timestamp

* Adding index to speeds up filtering by block_timestamp

* Minor fix

* Minor changes

* Benchmarks for new query

* Post merge fixes

---------

Co-authored-by: Domino Valdano <[email protected]>
  • Loading branch information
mateusz-sekara and reductionista authored Sep 27, 2023
1 parent f67d7e3 commit c19d648
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 43 deletions.
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func (lp *logPoller) LogsWithSigs(start, end int64, eventSigs []common.Hash, add
}

func (lp *logPoller) LogsCreatedAfter(eventSig common.Hash, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectLogsCreatedAfter(eventSig[:], address, after, confs, qopts...)
return lp.orm.SelectLogsCreatedAfter(address, eventSig, after, confs, qopts...)
}

// IndexedLogs finds all the logs that have a topic value in topicValues at index topicIndex.
Expand Down
69 changes: 50 additions & 19 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,44 +39,75 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

func logRuntime(t *testing.T, start time.Time) {
func logRuntime(t testing.TB, start time.Time) {
t.Log("runtime", time.Since(start))
}

func TestPopulateLoadedDB(t *testing.T) {
t.Skip("Only for local load testing and query analysis")
lggr := logger.TestLogger(t)
_, db := heavyweight.FullTestDBV2(t, "logs_scale", nil)
chainID := big.NewInt(137)

o := logpoller.NewORM(big.NewInt(137), db, lggr, pgtest.NewQConfig(true))
func populateDatabase(t testing.TB, o *logpoller.ORM, chainID *big.Int) (common.Hash, common.Address, common.Address) {
event1 := EmitterABI.Events["Log1"].ID
address1 := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb")
address2 := common.HexToAddress("0x6E225058950f237371261C985Db6bDe26df2200E")
startDate := time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC)

// We start at 1 just so block number > 0
for j := 1; j < 1000; j++ {
for j := 1; j < 100; j++ {
var logs []logpoller.Log
// Max we can insert per batch
for i := 0; i < 1000; i++ {
addr := address1
if (i+(1000*j))%2 == 0 {
addr = address2
}
blockNumber := int64(i + (1000 * j))
blockTimestamp := startDate.Add(time.Duration(j*1000) * time.Hour)

logs = append(logs, logpoller.Log{
EvmChainId: utils.NewBig(chainID),
LogIndex: 1,
BlockHash: common.HexToHash(fmt.Sprintf("0x%d", i+(1000*j))),
BlockNumber: int64(i + (1000 * j)),
EventSig: event1,
Topics: [][]byte{event1[:], logpoller.EvmWord(uint64(i + 1000*j)).Bytes()},
Address: addr,
TxHash: common.HexToHash("0x1234"),
Data: logpoller.EvmWord(uint64(i + 1000*j)).Bytes(),
EvmChainId: utils.NewBig(chainID),
LogIndex: 1,
BlockHash: common.HexToHash(fmt.Sprintf("0x%d", i+(1000*j))),
BlockNumber: blockNumber,
BlockTimestamp: blockTimestamp,
EventSig: event1,
Topics: [][]byte{event1[:], logpoller.EvmWord(uint64(i + 1000*j)).Bytes()},
Address: addr,
TxHash: utils.RandomAddress().Hash(),
Data: logpoller.EvmWord(uint64(i + 1000*j)).Bytes(),
CreatedAt: blockTimestamp,
})

}
require.NoError(t, o.InsertLogs(logs))
require.NoError(t, o.InsertBlock(utils.RandomAddress().Hash(), int64((j+1)*1000-1), startDate.Add(time.Duration(j*1000)*time.Hour)))
}

return event1, address1, address2
}

func BenchmarkSelectLogsCreatedAfter(b *testing.B) {
chainId := big.NewInt(137)
_, db := heavyweight.FullTestDBV2(b, "logs_scale", nil)
o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false))
event, address, _ := populateDatabase(b, o, chainId)

// Setting searchDate to pick around 5k logs
searchDate := time.Date(2020, 1, 1, 12, 12, 12, 0, time.UTC)

b.ResetTimer()

for i := 0; i < b.N; i++ {
logs, err := o.SelectLogsCreatedAfter(address, event, searchDate, 500)
require.NotZero(b, len(logs))
require.NoError(b, err)
}
}

func TestPopulateLoadedDB(t *testing.T) {
t.Skip("Only for local load testing and query analysis")
_, db := heavyweight.FullTestDBV2(t, "logs_scale", nil)
chainID := big.NewInt(137)

o := logpoller.NewORM(big.NewInt(137), db, logger.TestLogger(t), pgtest.NewQConfig(true))
event1, address1, address2 := populateDatabase(t, o, chainID)

func() {
defer logRuntime(t, time.Now())
_, err1 := o.SelectLogsByBlockRangeFilter(750000, 800000, address1, event1)
Expand Down
55 changes: 43 additions & 12 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address com
WHERE evm_chain_id = $1
AND event_sig = $2
AND address = $3
AND (block_number + $4) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $4
ORDER BY (block_number, log_index) DESC LIMIT 1`, utils.NewBig(o.chainID), eventSig, address, confs); err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,17 +231,22 @@ func (o *ORM) SelectLogsByBlockRangeFilter(start, end int64, address common.Addr
}

// SelectLogsCreatedAfter finds logs created after some timestamp.
func (o *ORM) SelectLogsCreatedAfter(eventSig []byte, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
func (o *ORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}

var logs []Log
q := o.q.WithOpts(qopts...)
err := q.Select(&logs, `
err = q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm_chain_id = $1
AND address = $2
AND event_sig = $3
AND created_at > $4
AND (block_number + $5) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig, after, confs)
AND block_number > $4
AND block_number <= $5
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig, minBlock, maxBlock)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -498,18 +503,24 @@ func validateTopicIndex(index int) error {
}

func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
q := o.q.WithOpts(qopts...)
minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}

var logs []Log
q := o.q.WithOpts(qopts...)
topicValuesBytes := concatBytes(topicValues)
// Add 1 since postgresql arrays are 1-indexed.
err := q.Select(&logs, `
err = q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm.logs.evm_chain_id = $1
AND address = $2 AND event_sig = $3
AND address = $2
AND event_sig = $3
AND topics[$4] = ANY($5)
AND created_at > $6
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $7
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs)
AND block_number > $6
AND block_number <= $7
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, minBlock, maxBlock)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -569,7 +580,27 @@ func (o *ORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIn
return nil, err
}
return logs, nil
}

func (o *ORM) blocksRangeAfterTimestamp(after time.Time, confs int, qopts ...pg.QOpt) (int64, int64, error) {
type blockRange struct {
MinBlockNumber int64 `db:"min_block"`
MaxBlockNumber int64 `db:"max_block"`
}

var br blockRange
q := o.q.WithOpts(qopts...)
err := q.Get(&br, `
SELECT
coalesce(min(block_number), 0) as min_block,
coalesce(max(block_number), 0) as max_block
FROM evm.log_poller_blocks
WHERE evm_chain_id = $1
AND block_timestamp > $2`, utils.NewBig(o.chainID), after)
if err != nil {
return 0, 0, err
}
return br.MinBlockNumber, br.MaxBlockNumber - int64(confs), nil
}

type bytesProducer interface {
Expand Down
109 changes: 102 additions & 7 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,16 +1087,16 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
th := SetupTH(t, 2, 3, 2)
event1 := EmitterABI.Events["Log1"].ID
event2 := EmitterABI.Events["Log2"].ID
address1 := common.HexToAddress("0xA")
address2 := common.HexToAddress("0xB")
address1 := utils.RandomAddress()
address2 := utils.RandomAddress()

require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{
GenLog(th.ChainID, 1, 1, "0x1", event1[:], address1),
GenLog(th.ChainID, 2, 1, "0x2", event2[:], address2),
GenLog(th.ChainID, 2, 2, "0x4", event2[:], address2),
GenLog(th.ChainID, 2, 3, "0x6", event2[:], address2),
GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event1[:], address1),
GenLog(th.ChainID, 2, 1, utils.RandomAddress().String(), event2[:], address2),
GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event2[:], address2),
GenLog(th.ChainID, 2, 3, utils.RandomAddress().String(), event2[:], address2),
}))
require.NoError(t, th.ORM.InsertBlock(common.HexToHash("0x1"), 3, time.Now()))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, time.Now()))

tests := []struct {
name string
Expand Down Expand Up @@ -1171,3 +1171,98 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
})
}
}

func TestSelectLogsCreatedAfter(t *testing.T) {
th := SetupTH(t, 2, 3, 2)
event := EmitterABI.Events["Log1"].ID
address := utils.RandomAddress()

past := time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC)
now := time.Date(2020, 1, 1, 12, 12, 12, 0, time.UTC)
future := time.Date(2030, 1, 1, 12, 12, 12, 0, time.UTC)

require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{
GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address),
}))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 1, past))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 2, now))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, future))

type expectedLog struct {
block int64
log int64
}

tests := []struct {
name string
confs int
after time.Time
expectedLogs []expectedLog
}{
{
name: "picks logs after block 1",
confs: 0,
after: past.Add(-time.Hour),
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
{block: 3, log: 1},
},
},
{
name: "skips blocks with not enough confirmations",
confs: 1,
after: past.Add(-time.Hour),
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
},
},
{
name: "limits number of blocks by block_timestamp",
confs: 0,
after: now.Add(-time.Hour),
expectedLogs: []expectedLog{
{block: 3, log: 1},
},
},
{
name: "returns empty dataset for future timestamp",
confs: 0,
after: future,
expectedLogs: []expectedLog{},
},
{
name: "returns empty dataset when too many confirmations are required",
confs: 3,
after: past.Add(-time.Hour),
expectedLogs: []expectedLog{},
},
}
for _, tt := range tests {
t.Run("SelectLogsCreatedAfter"+tt.name, func(t *testing.T) {
logs, err := th.ORM.SelectLogsCreatedAfter(address, event, tt.after, tt.confs)
require.NoError(t, err)
assert.Len(t, logs, len(tt.expectedLogs))

for i, log := range logs {
assert.Equal(t, tt.expectedLogs[i].block, log.BlockNumber)
assert.Equal(t, tt.expectedLogs[i].log, log.LogIndex)
}
})

t.Run("SelectIndexedLogsCreatedAfter"+tt.name, func(t *testing.T) {
logs, err := th.ORM.SelectIndexedLogsCreatedAfter(address, event, 0, []common.Hash{event}, tt.after, tt.confs)
require.NoError(t, err)
assert.Len(t, logs, len(tt.expectedLogs))

for i, log := range logs {
assert.Equal(t, tt.expectedLogs[i].block, log.BlockNumber)
assert.Equal(t, tt.expectedLogs[i].log, log.LogIndex)
}
})
}
}
8 changes: 4 additions & 4 deletions core/internal/cltest/heavyweight/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ import (

// FullTestDBV2 creates a pristine DB which runs in a separate database than the normal
// unit tests, so you can do things like use other Postgres connection types with it.
func FullTestDBV2(t *testing.T, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
func FullTestDBV2(t testing.TB, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
return prepareFullTestDBV2(t, name, false, true, overrideFn)
}

// FullTestDBNoFixturesV2 is the same as FullTestDB, but it does not load fixtures.
func FullTestDBNoFixturesV2(t *testing.T, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
func FullTestDBNoFixturesV2(t testing.TB, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
return prepareFullTestDBV2(t, name, false, false, overrideFn)
}

// FullTestDBEmptyV2 creates an empty DB (without migrations).
func FullTestDBEmptyV2(t *testing.T, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
func FullTestDBEmptyV2(t testing.TB, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
return prepareFullTestDBV2(t, name, true, false, overrideFn)
}

func prepareFullTestDBV2(t *testing.T, name string, empty bool, loadFixtures bool, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
func prepareFullTestDBV2(t testing.TB, name string, empty bool, loadFixtures bool, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
testutils.SkipShort(t, "FullTestDB")

if empty && loadFixtures {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +goose Up
create index log_poller_blocks_by_timestamp on evm.log_poller_blocks (evm_chain_id, block_timestamp);

-- +goose Down
DROP INDEX IF EXISTS evm.log_poller_blocks_by_timestamp;

0 comments on commit c19d648

Please sign in to comment.