Skip to content

Commit

Permalink
Merge branch 'ccip-develop' into update-codeowners
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonmci authored Oct 13, 2023
2 parents 228d654 + 9c68d3e commit 20ddd8f
Show file tree
Hide file tree
Showing 35 changed files with 4,549 additions and 504 deletions.
60 changes: 60 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,3 +1152,63 @@ func TestTooManyLogResults(t *testing.T) {
require.Len(t, crit, 1)
assert.Contains(t, crit[0].Message, "Too many log results in a single block")
}

func Test_CreatedAfterQueriesWithBackfill(t *testing.T) {
emittedLogs := 60
finalityDepth := 10
ctx := testutils.Context(t)
th := SetupTH(t, int64(finalityDepth), 3, 2)

header, err := th.Client.HeaderByNumber(ctx, nil)
require.NoError(t, err)

genesisBlockTime := time.UnixMilli(int64(header.Time))

// Emit some logs in blocks
for i := 0; i < emittedLogs; i++ {
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))})
require.NoError(t, err)
th.Client.Commit()
}

// First PollAndSave, no filters are registered
currentBlock := th.PollAndSaveLogs(ctx, 1)

err = th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "Test Emitter",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress1},
})
require.NoError(t, err)

// Emit blocks to cover finality depth, because backup always backfill up to the one block before last finalized
for i := 0; i < finalityDepth+1; i++ {
th.Client.Commit()
}

// LogPoller should backfill entire history
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
require.NoError(t, err)

// Make sure that all logs are backfilled
logs, err := th.LogPoller.Logs(
0,
currentBlock,
EmitterABI.Events["Log1"].ID,
th.EmitterAddress1,
pg.WithParentCtx(testutils.Context(t)),
)
require.NoError(t, err)
require.Len(t, logs, emittedLogs)

// We should get all the logs by the block_timestamp
logs, err = th.LogPoller.LogsCreatedAfter(
EmitterABI.Events["Log1"].ID,
th.EmitterAddress1,
genesisBlockTime,
0,
pg.WithParentCtx(testutils.Context(t)),
)
require.NoError(t, err)
require.Len(t, logs, emittedLogs)
}
46 changes: 8 additions & 38 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,21 +276,16 @@ func (o *DbORM) SelectLogs(start, end int64, address common.Address, eventSig co

// SelectLogsCreatedAfter finds logs created after some timestamp.
func (o *DbORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}

var logs []Log
q := o.q.WithOpts(qopts...)
err = q.Select(&logs, `
err := q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm_chain_id = $1
AND address = $2
AND event_sig = $3
AND block_number > $4
AND block_number <= $5
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig, minBlock, maxBlock)
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $4
AND block_timestamp > $5
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig, confs, after)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -547,23 +542,19 @@ func validateTopicIndex(index int) error {
}

func (o *DbORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...)
if err != nil {
return nil, err
}
var logs []Log
q := o.q.WithOpts(qopts...)
topicValuesBytes := concatBytes(topicValues)
// Add 1 since postgresql arrays are 1-indexed.
err = q.Select(&logs, `
err := q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm.logs.evm_chain_id = $1
AND address = $2
AND event_sig = $3
AND topics[$4] = ANY($5)
AND block_number > $6
AND block_number <= $7
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, minBlock, maxBlock)
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $6
AND block_timestamp > $7
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, confs, after)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -625,27 +616,6 @@ func (o *DbORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topic
return logs, nil
}

func (o *DbORM) blocksRangeAfterTimestamp(after time.Time, confs int, qopts ...pg.QOpt) (int64, int64, error) {
type blockRange struct {
MinBlockNumber int64 `db:"min_block"`
MaxBlockNumber int64 `db:"max_block"`
}

var br blockRange
q := o.q.WithOpts(qopts...)
err := q.Get(&br, `
SELECT
coalesce(min(block_number), 0) as min_block,
coalesce(max(block_number), 0) as max_block
FROM evm.log_poller_blocks
WHERE evm_chain_id = $1
AND block_timestamp > $2`, utils.NewBig(o.chainID), after)
if err != nil {
return 0, 0, err
}
return br.MinBlockNumber, br.MaxBlockNumber - int64(confs), nil
}

type bytesProducer interface {
Bytes() []byte
}
Expand Down
36 changes: 21 additions & 15 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func GenLog(chainID *big.Int, logIndex int64, blockNum int64, blockHash string,
}
}

func GenLogWithTimestamp(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, topic1 []byte, address common.Address, ts time.Time) logpoller.Log {
lg := GenLog(chainID, logIndex, blockNum, blockHash, topic1, address)
lg.BlockTimestamp = ts
return lg
}

func TestLogPoller_Batching(t *testing.T) {
t.Parallel()
th := SetupTH(t, 2, 3, 2)
Expand Down Expand Up @@ -1177,19 +1183,19 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
event := EmitterABI.Events["Log1"].ID
address := utils.RandomAddress()

past := time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC)
now := time.Date(2020, 1, 1, 12, 12, 12, 0, time.UTC)
future := time.Date(2030, 1, 1, 12, 12, 12, 0, time.UTC)
block1 := time.Date(2010, 1, 1, 12, 12, 12, 0, time.UTC)
block2 := time.Date(2020, 1, 1, 12, 12, 12, 0, time.UTC)
block3 := time.Date(2030, 1, 1, 12, 12, 12, 0, time.UTC)

require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{
GenLog(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address),
GenLog(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address),
GenLogWithTimestamp(th.ChainID, 1, 1, utils.RandomAddress().String(), event[:], address, block1),
GenLogWithTimestamp(th.ChainID, 1, 2, utils.RandomAddress().String(), event[:], address, block2),
GenLogWithTimestamp(th.ChainID, 2, 2, utils.RandomAddress().String(), event[:], address, block2),
GenLogWithTimestamp(th.ChainID, 1, 3, utils.RandomAddress().String(), event[:], address, block3),
}))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 1, past))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 2, now))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, future))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 1, block1))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 2, block2))
require.NoError(t, th.ORM.InsertBlock(utils.RandomAddress().Hash(), 3, block3))

type expectedLog struct {
block int64
Expand All @@ -1205,7 +1211,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
{
name: "picks logs after block 1",
confs: 0,
after: past.Add(-time.Hour),
after: block1.Add(time.Hour),
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
Expand All @@ -1215,7 +1221,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
{
name: "skips blocks with not enough confirmations",
confs: 1,
after: past.Add(-time.Hour),
after: block1.Add(time.Hour),
expectedLogs: []expectedLog{
{block: 2, log: 1},
{block: 2, log: 2},
Expand All @@ -1224,21 +1230,21 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
{
name: "limits number of blocks by block_timestamp",
confs: 0,
after: now.Add(-time.Hour),
after: block2.Add(time.Hour),
expectedLogs: []expectedLog{
{block: 3, log: 1},
},
},
{
name: "returns empty dataset for future timestamp",
confs: 0,
after: future,
after: block3,
expectedLogs: []expectedLog{},
},
{
name: "returns empty dataset when too many confirmations are required",
confs: 3,
after: past.Add(-time.Hour),
after: block1.Add(-time.Hour),
expectedLogs: []expectedLog{},
},
}
Expand Down
Loading

0 comments on commit 20ddd8f

Please sign in to comment.