Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-1053 Replace filtering by created_at with filtering by block_timestamp #10743

Merged
merged 8 commits into from
Sep 27, 2023
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func (lp *logPoller) LogsWithSigs(start, end int64, eventSigs []common.Hash, add
}

func (lp *logPoller) LogsCreatedAfter(eventSig common.Hash, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectLogsCreatedAfter(eventSig[:], address, after, confs, qopts...)
return lp.orm.SelectLogsCreatedAfter(address, eventSig, after, confs, qopts...)
}

// IndexedLogs finds all the logs that have a topic value in topicValues at index topicIndex.
Expand Down
56 changes: 44 additions & 12 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address com
WHERE evm_chain_id = $1
AND event_sig = $2
AND address = $3
AND (block_number + $4) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Careful! This was set up deliberately for optimization reasons IIRC

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was the one who did these optimizations :P For some reason, I missed this query, therefore fixing it right now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORDER BY (block_number, log_index) DESC LIMIT 1`, utils.NewBig(o.chainID), eventSig, address, confs); err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,21 +231,27 @@ func (o *ORM) SelectLogsByBlockRangeFilter(start, end int64, address common.Addr
}

// SelectLogsCreatedAfter finds logs created after some timestamp.
func (o *ORM) SelectLogsCreatedAfter(eventSig []byte, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
func (o *ORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}

var logs []Log
q := o.q.WithOpts(qopts...)
err := q.Select(&logs, `
err = q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm_chain_id = $1
AND address = $2
AND event_sig = $3
AND created_at > $4
AND (block_number + $5) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig, after, confs)
AND block_number > $4
AND block_number <= $5
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig, minBlock, maxBlock)
if err != nil {
return nil, err
}
return logs, nil

}

// SelectLogsWithSigsByBlockRangeFilter finds the logs in the given block range with the given event signatures
Expand Down Expand Up @@ -498,18 +504,24 @@ func validateTopicIndex(index int) error {
}

func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
q := o.q.WithOpts(qopts...)
minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}

var logs []Log
q := o.q.WithOpts(qopts...)
topicValuesBytes := concatBytes(topicValues)
// Add 1 since postgresql arrays are 1-indexed.
err := q.Select(&logs, `
err = q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm.logs.evm_chain_id = $1
AND address = $2 AND event_sig = $3
AND address = $2
AND event_sig = $3
AND topics[$4] = ANY($5)
AND created_at > $6
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $7
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs)
AND block_number > $6
AND block_number <= $7
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, minBlock, maxBlock)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -569,7 +581,27 @@ func (o *ORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIn
return nil, err
}
return logs, nil
}

func (o *ORM) blocksRangeAfterTimestamp(after time.Time, confs int, qopts ...pg.QOpt) (int64, int64, error) {
type blockRange struct {
MinBlockNumber int64 `db:"min_block"`
MaxBlockNumber int64 `db:"max_block"`
}

var br blockRange
q := o.q.WithOpts(qopts...)
err := q.Get(&br, `
SELECT
coalesce(min(block_number), 0) as min_block,
coalesce(max(block_number), 0) as max_block
FROM evm.log_poller_blocks
WHERE evm_chain_id = $1
AND block_timestamp > $2`, utils.NewBig(o.chainID), after)
if err != nil {
return 0, 0, err
}
return br.MinBlockNumber, br.MaxBlockNumber - int64(confs), nil
}

type bytesProducer interface {
Expand Down
109 changes: 102 additions & 7 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,16 +1087,16 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
th := SetupTH(t, 2, 3, 2)
event1 := EmitterABI.Events["Log1"].ID
event2 := EmitterABI.Events["Log2"].ID
address1 := common.HexToAddress("0xA")
address2 := common.HexToAddress("0xB")
address1 := utils.RandomAddress()
address2 := utils.RandomAddress()

require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{
GenLog(th.ChainID, 1, 1, "0x1", event1[:], address1),
GenLog(th.ChainID, 2, 1, "0x2", event2[:], address2),
GenLog(th.ChainID, 2, 2, "0x4", event2[:], address2),
GenLog(th.ChainID, 2, 3, "0x6", event2[:], address2),
GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event1[:], address1),
GenLog(th.ChainID, 2, 1, utils.RandomAddress().String(), event2[:], address2),
GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event2[:], address2),
GenLog(th.ChainID, 2, 3, utils.RandomAddress().String(), event2[:], address2),
}))
require.NoError(t, th.ORM.InsertBlock(common.HexToHash("0x1"), 3, time.Now()))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, time.Now()))

tests := []struct {
name string
Expand Down Expand Up @@ -1171,3 +1171,98 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
})
}
}

func TestSelectLogsCreatedAfter(t *testing.T) {
th := SetupTH(t, 2, 3, 2)
event := EmitterABI.Events["Log1"].ID
address := utils.RandomAddress()

past := time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC)
now := time.Date(2020, 1, 1, 12, 12, 12, 0, time.UTC)
future := time.Date(2030, 1, 1, 12, 12, 12, 0, time.UTC)

require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{
GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address),
}))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 1, past))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 2, now))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, future))

type expectedLog struct {
block int64
log int64
}

tests := []struct {
name string
confs int
after time.Time
expectedLogs []expectedLog
}{
{
name: "picks logs after block 1",
confs: 0,
after: past.Add(-time.Hour),
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
{block: 3, log: 1},
},
},
{
name: "skips blocks with not enough confirmations",
confs: 1,
after: past.Add(-time.Hour),
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
},
},
{
name: "limits number of blocks by block_timestamp",
confs: 0,
after: now.Add(-time.Hour),
expectedLogs: []expectedLog{
{block: 3, log: 1},
},
},
{
name: "returns empty dataset for future timestamp",
confs: 0,
after: future,
expectedLogs: []expectedLog{},
},
{
name: "returns empty dataset when too many confirmations are required",
confs: 3,
after: past.Add(-time.Hour),
expectedLogs: []expectedLog{},
},
}
for _, tt := range tests {
t.Run("SelectLogsCreatedAfter"+tt.name, func(t *testing.T) {
logs, err := th.ORM.SelectLogsCreatedAfter(address, event, tt.after, tt.confs)
require.NoError(t, err)
assert.Len(t, logs, len(tt.expectedLogs))

for i, log := range logs {
assert.Equal(t, tt.expectedLogs[i].block, log.BlockNumber)
assert.Equal(t, tt.expectedLogs[i].log, log.LogIndex)
}
})

t.Run("SelectIndexedLogsCreatedAfter"+tt.name, func(t *testing.T) {
logs, err := th.ORM.SelectIndexedLogsCreatedAfter(address, event, 0, []common.Hash{event}, tt.after, tt.confs)
require.NoError(t, err)
assert.Len(t, logs, len(tt.expectedLogs))

for i, log := range logs {
assert.Equal(t, tt.expectedLogs[i].block, log.BlockNumber)
assert.Equal(t, tt.expectedLogs[i].log, log.LogIndex)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- +goose Up
create index log_poller_blocks_by_timestamp on evm.log_poller_blocks (evm_chain_id, block_timestamp);

-- +goose Down
DROP INDEX IF EXISTS evm.log_poller_blocks_by_timestamp;
Loading