Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-1053 Replace filtering by created_at with filtering by block_timestamp #10743

Merged
merged 8 commits into from
Sep 27, 2023
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func (lp *logPoller) LogsWithSigs(start, end int64, eventSigs []common.Hash, add
}

func (lp *logPoller) LogsCreatedAfter(eventSig common.Hash, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectLogsCreatedAfter(eventSig[:], address, after, confs, qopts...)
return lp.orm.SelectLogsCreatedAfter(address, eventSig, after, confs, qopts...)
}

// IndexedLogs finds all the logs that have a topic value in topicValues at index topicIndex.
Expand Down
69 changes: 50 additions & 19 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,44 +39,75 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

func logRuntime(t *testing.T, start time.Time) {
func logRuntime(t testing.TB, start time.Time) {
t.Log("runtime", time.Since(start))
}

func TestPopulateLoadedDB(t *testing.T) {
t.Skip("Only for local load testing and query analysis")
lggr := logger.TestLogger(t)
_, db := heavyweight.FullTestDBV2(t, "logs_scale", nil)
chainID := big.NewInt(137)

o := logpoller.NewORM(big.NewInt(137), db, lggr, pgtest.NewQConfig(true))
func populateDatabase(t testing.TB, o *logpoller.ORM, chainID *big.Int) (common.Hash, common.Address, common.Address) {
event1 := EmitterABI.Events["Log1"].ID
address1 := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb")
address2 := common.HexToAddress("0x6E225058950f237371261C985Db6bDe26df2200E")
startDate := time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC)

// We start at 1 just so block number > 0
for j := 1; j < 1000; j++ {
for j := 1; j < 100; j++ {
var logs []logpoller.Log
// Max we can insert per batch
for i := 0; i < 1000; i++ {
addr := address1
if (i+(1000*j))%2 == 0 {
addr = address2
}
blockNumber := int64(i + (1000 * j))
blockTimestamp := startDate.Add(time.Duration(j*1000) * time.Hour)

logs = append(logs, logpoller.Log{
EvmChainId: utils.NewBig(chainID),
LogIndex: 1,
BlockHash: common.HexToHash(fmt.Sprintf("0x%d", i+(1000*j))),
BlockNumber: int64(i + (1000 * j)),
EventSig: event1,
Topics: [][]byte{event1[:], logpoller.EvmWord(uint64(i + 1000*j)).Bytes()},
Address: addr,
TxHash: common.HexToHash("0x1234"),
Data: logpoller.EvmWord(uint64(i + 1000*j)).Bytes(),
EvmChainId: utils.NewBig(chainID),
LogIndex: 1,
BlockHash: common.HexToHash(fmt.Sprintf("0x%d", i+(1000*j))),
BlockNumber: blockNumber,
BlockTimestamp: blockTimestamp,
EventSig: event1,
Topics: [][]byte{event1[:], logpoller.EvmWord(uint64(i + 1000*j)).Bytes()},
Address: addr,
TxHash: utils.RandomAddress().Hash(),
Data: logpoller.EvmWord(uint64(i + 1000*j)).Bytes(),
CreatedAt: blockTimestamp,
})

}
require.NoError(t, o.InsertLogs(logs))
require.NoError(t, o.InsertBlock(utils.RandomAddress().Hash(), int64((j+1)*1000-1), startDate.Add(time.Duration(j*1000)*time.Hour)))
}

return event1, address1, address2
}

func BenchmarkSelectLogsCreatedAfter(b *testing.B) {
chainId := big.NewInt(137)
_, db := heavyweight.FullTestDBV2(b, "logs_scale", nil)
o := logpoller.NewORM(chainId, db, logger.TestLogger(b), pgtest.NewQConfig(false))
event, address, _ := populateDatabase(b, o, chainId)

// Setting searchDate to pick around 5k logs
searchDate := time.Date(2020, 1, 1, 12, 12, 12, 0, time.UTC)

b.ResetTimer()

for i := 0; i < b.N; i++ {
logs, err := o.SelectLogsCreatedAfter(address, event, searchDate, 500)
require.NotZero(b, len(logs))
require.NoError(b, err)
}
}

func TestPopulateLoadedDB(t *testing.T) {
t.Skip("Only for local load testing and query analysis")
_, db := heavyweight.FullTestDBV2(t, "logs_scale", nil)
chainID := big.NewInt(137)

o := logpoller.NewORM(big.NewInt(137), db, logger.TestLogger(t), pgtest.NewQConfig(true))
event1, address1, address2 := populateDatabase(t, o, chainID)

func() {
defer logRuntime(t, time.Now())
_, err1 := o.SelectLogsByBlockRangeFilter(750000, 800000, address1, event1)
Expand Down
55 changes: 43 additions & 12 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address com
WHERE evm_chain_id = $1
AND event_sig = $2
AND address = $3
AND (block_number + $4) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Careful! This was set up deliberately for optimization reasons IIRC

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was the one who did these optimizations :P For some reason, I missed this query, therefore fixing it right now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORDER BY (block_number, log_index) DESC LIMIT 1`, utils.NewBig(o.chainID), eventSig, address, confs); err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,17 +231,22 @@ func (o *ORM) SelectLogsByBlockRangeFilter(start, end int64, address common.Addr
}

// SelectLogsCreatedAfter finds logs created after some timestamp.
func (o *ORM) SelectLogsCreatedAfter(eventSig []byte, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
func (o *ORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}

var logs []Log
q := o.q.WithOpts(qopts...)
err := q.Select(&logs, `
err = q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm_chain_id = $1
AND address = $2
AND event_sig = $3
AND created_at > $4
AND (block_number + $5) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig, after, confs)
AND block_number > $4
AND block_number <= $5
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig, minBlock, maxBlock)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -498,18 +503,24 @@ func validateTopicIndex(index int) error {
}

func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
q := o.q.WithOpts(qopts...)
minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}

var logs []Log
q := o.q.WithOpts(qopts...)
topicValuesBytes := concatBytes(topicValues)
// Add 1 since postgresql arrays are 1-indexed.
err := q.Select(&logs, `
err = q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm.logs.evm_chain_id = $1
AND address = $2 AND event_sig = $3
AND address = $2
AND event_sig = $3
AND topics[$4] = ANY($5)
AND created_at > $6
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $7
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs)
AND block_number > $6
AND block_number <= $7
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, minBlock, maxBlock)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -569,7 +580,27 @@ func (o *ORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIn
return nil, err
}
return logs, nil
}

func (o *ORM) blocksRangeAfterTimestamp(after time.Time, confs int, qopts ...pg.QOpt) (int64, int64, error) {
type blockRange struct {
MinBlockNumber int64 `db:"min_block"`
MaxBlockNumber int64 `db:"max_block"`
}

var br blockRange
q := o.q.WithOpts(qopts...)
err := q.Get(&br, `
SELECT
coalesce(min(block_number), 0) as min_block,
coalesce(max(block_number), 0) as max_block
FROM evm.log_poller_blocks
WHERE evm_chain_id = $1
AND block_timestamp > $2`, utils.NewBig(o.chainID), after)
if err != nil {
return 0, 0, err
}
return br.MinBlockNumber, br.MaxBlockNumber - int64(confs), nil
}

type bytesProducer interface {
Expand Down
109 changes: 102 additions & 7 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,16 +1087,16 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
th := SetupTH(t, 2, 3, 2)
event1 := EmitterABI.Events["Log1"].ID
event2 := EmitterABI.Events["Log2"].ID
address1 := common.HexToAddress("0xA")
address2 := common.HexToAddress("0xB")
address1 := utils.RandomAddress()
address2 := utils.RandomAddress()

require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{
GenLog(th.ChainID, 1, 1, "0x1", event1[:], address1),
GenLog(th.ChainID, 2, 1, "0x2", event2[:], address2),
GenLog(th.ChainID, 2, 2, "0x4", event2[:], address2),
GenLog(th.ChainID, 2, 3, "0x6", event2[:], address2),
GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event1[:], address1),
GenLog(th.ChainID, 2, 1, utils.RandomAddress().String(), event2[:], address2),
GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event2[:], address2),
GenLog(th.ChainID, 2, 3, utils.RandomAddress().String(), event2[:], address2),
}))
require.NoError(t, th.ORM.InsertBlock(common.HexToHash("0x1"), 3, time.Now()))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, time.Now()))

tests := []struct {
name string
Expand Down Expand Up @@ -1171,3 +1171,98 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
})
}
}

func TestSelectLogsCreatedAfter(t *testing.T) {
th := SetupTH(t, 2, 3, 2)
event := EmitterABI.Events["Log1"].ID
address := utils.RandomAddress()

past := time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC)
now := time.Date(2020, 1, 1, 12, 12, 12, 0, time.UTC)
future := time.Date(2030, 1, 1, 12, 12, 12, 0, time.UTC)

require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{
GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address),
}))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 1, past))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 2, now))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, future))

type expectedLog struct {
block int64
log int64
}

tests := []struct {
name string
confs int
after time.Time
expectedLogs []expectedLog
}{
{
name: "picks logs after block 1",
confs: 0,
after: past.Add(-time.Hour),
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
{block: 3, log: 1},
},
},
{
name: "skips blocks with not enough confirmations",
confs: 1,
after: past.Add(-time.Hour),
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
},
},
{
name: "limits number of blocks by block_timestamp",
confs: 0,
after: now.Add(-time.Hour),
expectedLogs: []expectedLog{
{block: 3, log: 1},
},
},
{
name: "returns empty dataset for future timestamp",
confs: 0,
after: future,
expectedLogs: []expectedLog{},
},
{
name: "returns empty dataset when too many confirmations are required",
confs: 3,
after: past.Add(-time.Hour),
expectedLogs: []expectedLog{},
},
}
for _, tt := range tests {
t.Run("SelectLogsCreatedAfter"+tt.name, func(t *testing.T) {
logs, err := th.ORM.SelectLogsCreatedAfter(address, event, tt.after, tt.confs)
require.NoError(t, err)
assert.Len(t, logs, len(tt.expectedLogs))

for i, log := range logs {
assert.Equal(t, tt.expectedLogs[i].block, log.BlockNumber)
assert.Equal(t, tt.expectedLogs[i].log, log.LogIndex)
}
})

t.Run("SelectIndexedLogsCreatedAfter"+tt.name, func(t *testing.T) {
logs, err := th.ORM.SelectIndexedLogsCreatedAfter(address, event, 0, []common.Hash{event}, tt.after, tt.confs)
require.NoError(t, err)
assert.Len(t, logs, len(tt.expectedLogs))

for i, log := range logs {
assert.Equal(t, tt.expectedLogs[i].block, log.BlockNumber)
assert.Equal(t, tt.expectedLogs[i].log, log.LogIndex)
}
})
}
}
8 changes: 4 additions & 4 deletions core/internal/cltest/heavyweight/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ import (

// FullTestDBV2 creates a pristine DB which runs in a separate database than the normal
// unit tests, so you can do things like use other Postgres connection types with it.
func FullTestDBV2(t *testing.T, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
func FullTestDBV2(t testing.TB, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
return prepareFullTestDBV2(t, name, false, true, overrideFn)
}

// FullTestDBNoFixturesV2 is the same as FullTestDB, but it does not load fixtures.
func FullTestDBNoFixturesV2(t *testing.T, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
func FullTestDBNoFixturesV2(t testing.TB, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
return prepareFullTestDBV2(t, name, false, false, overrideFn)
}

// FullTestDBEmptyV2 creates an empty DB (without migrations).
func FullTestDBEmptyV2(t *testing.T, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
func FullTestDBEmptyV2(t testing.TB, name string, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
return prepareFullTestDBV2(t, name, true, false, overrideFn)
}

func prepareFullTestDBV2(t *testing.T, name string, empty bool, loadFixtures bool, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
func prepareFullTestDBV2(t testing.TB, name string, empty bool, loadFixtures bool, overrideFn func(c *chainlink.Config, s *chainlink.Secrets)) (chainlink.GeneralConfig, *sqlx.DB) {
testutils.SkipShort(t, "FullTestDB")

if empty && loadFixtures {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +goose Up
create index log_poller_blocks_by_timestamp on evm.log_poller_blocks (evm_chain_id, block_timestamp);

-- +goose Down
DROP INDEX IF EXISTS evm.log_poller_blocks_by_timestamp;
Loading