Skip to content

Commit

Permalink
CCIP-1143 LogPoller using blocks for database when doing cleanup (#11017
Browse files Browse the repository at this point in the history
)

* Fix in PruneOldBlocks

* Post review fixes
  • Loading branch information
mateusz-sekara authored Oct 23, 2023
1 parent a9ddfa1 commit b7a2c77
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 37 deletions.
4 changes: 2 additions & 2 deletions core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type TestHarness struct {
EthDB ethdb.Database
}

func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize, rpcBatchSize int64) TestHarness {
func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth int64) TestHarness {
lggr := logger.TestLogger(t)
chainID := testutils.NewRandomEVMChainID()
chainID2 := testutils.NewRandomEVMChainID()
Expand All @@ -66,7 +66,7 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize
// Mark genesis block as finalized to avoid any nulls in the tests
head := esc.Backend().Blockchain().CurrentHeader()
esc.Backend().Blockchain().SetFinalized(head)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, 1000)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth)
emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
emitterAddress2, _, emitter2, err := log_emitter.DeployLogEmitter(owner, ec)
Expand Down
17 changes: 11 additions & 6 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type LogPollerTest interface {
BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64)
Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery
GetReplayFromBlock(ctx context.Context, requested int64) (int64, error)
PruneOldBlocks(ctx context.Context) error
}

type Client interface {
Expand Down Expand Up @@ -535,7 +536,7 @@ func (lp *logPoller) run() {
lp.BackupPollAndSaveLogs(lp.ctx, backupPollerBlockDelay)
case <-blockPruneTick:
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 1000))
if err := lp.pruneOldBlocks(lp.ctx); err != nil {
if err := lp.PruneOldBlocks(lp.ctx); err != nil {
lp.lggr.Errorw("Unable to prune old blocks", "err", err)
}
case <-logPruneTick:
Expand Down Expand Up @@ -944,19 +945,23 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
return nil, rerr
}

// pruneOldBlocks removes blocks that are > lp.ancientBlockDepth behind the latest finalized block.
func (lp *logPoller) pruneOldBlocks(ctx context.Context) error {
_, latestFinalizedBlock, err := lp.latestBlocks(ctx)
// PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.
func (lp *logPoller) PruneOldBlocks(ctx context.Context) error {
latestBlock, err := lp.orm.SelectLatestBlock(pg.WithParentCtx(ctx))
if err != nil {
return err
}
if latestFinalizedBlock <= lp.keepFinalizedBlocksDepth {
if latestBlock == nil {
// No blocks saved yet.
return nil
}
if latestBlock.FinalizedBlockNumber <= lp.keepFinalizedBlocksDepth {
// No-op, keep all blocks
return nil
}
// 1-2-3-4-5(finalized)-6-7(latest), keepFinalizedBlocksDepth=3
// Remove <= 2
return lp.orm.DeleteBlocksBefore(latestFinalizedBlock-lp.keepFinalizedBlocksDepth, pg.WithParentCtx(ctx))
return lp.orm.DeleteBlocksBefore(latestBlock.FinalizedBlockNumber-lp.keepFinalizedBlocksDepth, pg.WithParentCtx(ctx))
}

// Logs returns logs matching topics and address (exactly) in the given block range,
Expand Down
81 changes: 67 additions & 14 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logpoller_test
import (
"context"
"fmt"
"math"
"math/big"
"testing"
"time"
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestPopulateLoadedDB(t *testing.T) {
}

func TestLogPoller_Integration(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
th.Client.Commit() // Block 2. Ensure we have finality number of blocks

require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{"Integration test", []common.Hash{EmitterABI.Events["Log1"].ID}, []common.Address{th.EmitterAddress1}, 0}))
Expand Down Expand Up @@ -239,7 +240,7 @@ func Test_BackupLogPoller(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2)
th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000)
// later, we will need at least 32 blocks filled with logs for cache invalidation
for i := int64(0); i < 32; i++ {
// to invalidate geth's internal read-cache, a matching log must be found in the bloom Filter
Expand Down Expand Up @@ -386,7 +387,7 @@ func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) {
// Intentionally use very low backupLogPollerDelay to verify if finality is used properly
backupLogPollerDelay := int64(0)
ctx := testutils.Context(t)
th := SetupTH(t, true, 0, 3, 2)
th := SetupTH(t, true, 0, 3, 2, 1000)

header, err := th.Client.HeaderByNumber(ctx, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -450,7 +451,7 @@ func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) {
func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) {
emittedLogs := 30
ctx := testutils.Context(t)
th := SetupTH(t, true, 0, 3, 2)
th := SetupTH(t, true, 0, 3, 2, 1000)

// Emit some logs in blocks
for i := 0; i < emittedLogs; i++ {
Expand Down Expand Up @@ -498,7 +499,7 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T)
logsBatch := 10
// Intentionally use very low backupLogPollerDelay to verify if finality is used properly
ctx := testutils.Context(t)
th := SetupTH(t, true, 0, 3, 2)
th := SetupTH(t, true, 0, 3, 2, 1000)

//header, err := th.Client.HeaderByNumber(ctx, nil)
//require.NoError(t, err)
Expand Down Expand Up @@ -557,7 +558,7 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T)
func TestLogPoller_BlockTimestamps(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)

addresses := []common.Address{th.EmitterAddress1, th.EmitterAddress2}
topics := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}
Expand Down Expand Up @@ -749,7 +750,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2)
th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000)

// Set up a log poller listening for log emitter logs.
err := th.LogPoller.RegisterFilter(logpoller.Filter{
Expand Down Expand Up @@ -997,7 +998,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2)
th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000)

// Set up a log poller listening for log emitter logs.
err := th.LogPoller.RegisterFilter(logpoller.Filter{
Expand Down Expand Up @@ -1057,7 +1058,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {

func TestLogPoller_LoadFilters(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)

filter1 := logpoller.Filter{"first Filter", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0}
Expand Down Expand Up @@ -1108,7 +1109,7 @@ func TestLogPoller_LoadFilters(t *testing.T) {

func TestLogPoller_GetBlocks_Range(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)

err := th.LogPoller.RegisterFilter(logpoller.Filter{"GetBlocks Test", []common.Hash{
EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID}, []common.Address{th.EmitterAddress1, th.EmitterAddress2}, 0},
Expand Down Expand Up @@ -1218,7 +1219,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {

func TestGetReplayFromBlock(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
// Commit a few blocks
for i := 0; i < 10; i++ {
th.Client.Commit()
Expand Down Expand Up @@ -1465,7 +1466,7 @@ func Test_PollAndQueryFinalizedBlocks(t *testing.T) {
firstBatchLen := 3
secondBatchLen := 5

th := SetupTH(t, true, 2, 3, 2)
th := SetupTH(t, true, 2, 3, 2, 1000)

eventSig := EmitterABI.Events["Log1"].ID
err := th.LogPoller.RegisterFilter(logpoller.Filter{
Expand Down Expand Up @@ -1549,7 +1550,7 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2)
th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2, 1000)
// Mark first block as finalized
h := th.Client.Blockchain().CurrentHeader()
th.Client.Blockchain().SetFinalized(h)
Expand Down Expand Up @@ -1592,7 +1593,7 @@ func Test_CreatedAfterQueriesWithBackfill(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2)
th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000)

header, err := th.Client.HeaderByNumber(ctx, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1651,6 +1652,58 @@ func Test_CreatedAfterQueriesWithBackfill(t *testing.T) {
}
}

func Test_PruneOldBlocks(t *testing.T) {
ctx := testutils.Context(t)

tests := []struct {
name string
keepFinalizedBlocksDepth int64
blockToCreate int
blocksLeft int
wantErr bool
}{
{
name: "returns error if no blocks yet",
keepFinalizedBlocksDepth: 10,
blockToCreate: 0,
wantErr: true,
},
{
name: "returns if there is not enough blocks in the db",
keepFinalizedBlocksDepth: 11,
blockToCreate: 10,
blocksLeft: 10,
},
{
name: "prunes matching blocks",
keepFinalizedBlocksDepth: 1000,
blockToCreate: 2000,
blocksLeft: 1010, // last finalized block is 10 block behind
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, true, 0, 3, 2, tt.keepFinalizedBlocksDepth)

for i := 1; i <= tt.blockToCreate; i++ {
err := th.ORM.InsertBlock(utils.RandomBytes32(), int64(i+10), time.Now(), int64(i))
require.NoError(t, err)
}

if tt.wantErr {
require.Error(t, th.LogPoller.PruneOldBlocks(ctx))
return
}

require.NoError(t, th.LogPoller.PruneOldBlocks(ctx))
blocks, err := th.ORM.GetBlocksRange(0, math.MaxInt64, pg.WithParentCtx(ctx))
require.NoError(t, err)
assert.Len(t, blocks, tt.blocksLeft)
})
}
}

func markBlockAsFinalized(t *testing.T, th TestHarness, blockNumber int64) {
b, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(blockNumber))
require.NoError(t, err)
Expand Down
30 changes: 15 additions & 15 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func GenLogWithTimestamp(chainID *big.Int, logIndex int64, blockNum int64, block

func TestLogPoller_Batching(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
var logs []logpoller.Log
// Inserts are limited to 65535 parameters. A log being 10 parameters this results in
// a maximum of 6553 log inserts per tx. As inserting more than 6553 would result in
Expand All @@ -63,7 +63,7 @@ func TestLogPoller_Batching(t *testing.T) {
}

func TestORM_GetBlocks_From_Range(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
o1 := th.ORM
// Insert many blocks and read them back together
blocks := []block{
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestORM_GetBlocks_From_Range(t *testing.T) {
}

func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
o1 := th.ORM
// Insert many blocks and read them back together
var recentBlocks []block
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) {
}

func TestORM(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
o1 := th.ORM
o2 := th.ORM2
// Insert and read back a block.
Expand Down Expand Up @@ -447,7 +447,7 @@ func insertLogsTopicValueRange(t *testing.T, chainID *big.Int, o *logpoller.DbOR
}

func TestORM_IndexedLogs(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
o1 := th.ORM
eventSig := common.HexToHash("0x1599")
addr := common.HexToAddress("0x1234")
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestORM_IndexedLogs(t *testing.T) {
}

func TestORM_SelectIndexedLogsByTxHash(t *testing.T) {
th := SetupTH(t, false, 0, 3, 2)
th := SetupTH(t, false, 0, 3, 2, 1000)
o1 := th.ORM
eventSig := common.HexToHash("0x1599")
txHash := common.HexToHash("0x1888")
Expand Down Expand Up @@ -574,7 +574,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) {
}

func TestORM_DataWords(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
o1 := th.ORM
eventSig := common.HexToHash("0x1599")
addr := common.HexToAddress("0x1234")
Expand Down Expand Up @@ -637,7 +637,7 @@ func TestORM_DataWords(t *testing.T) {
}

func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
o1 := th.ORM

// Insert logs on different topics, should be able to read them
Expand Down Expand Up @@ -731,7 +731,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) {
}

func TestORM_DeleteBlocksBefore(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
o1 := th.ORM
require.NoError(t, o1.InsertBlock(common.HexToHash("0x1234"), 1, time.Now(), 0))
require.NoError(t, o1.InsertBlock(common.HexToHash("0x1235"), 2, time.Now(), 0))
Expand All @@ -754,7 +754,7 @@ func TestORM_DeleteBlocksBefore(t *testing.T) {

func TestLogPoller_Logs(t *testing.T) {
t.Parallel()
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
event1 := EmitterABI.Events["Log1"].ID
event2 := EmitterABI.Events["Log2"].ID
address1 := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb")
Expand Down Expand Up @@ -802,7 +802,7 @@ func TestLogPoller_Logs(t *testing.T) {
}

func BenchmarkLogs(b *testing.B) {
th := SetupTH(b, false, 2, 3, 2)
th := SetupTH(b, false, 2, 3, 2, 1000)
o := th.ORM
var lgs []logpoller.Log
addr := common.HexToAddress("0x1234")
Expand All @@ -828,7 +828,7 @@ func BenchmarkLogs(b *testing.B) {
}

func TestSelectLogsWithSigsExcluding(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
orm := th.ORM
addressA := common.HexToAddress("0x11111")
addressB := common.HexToAddress("0x22222")
Expand Down Expand Up @@ -1074,7 +1074,7 @@ func TestSelectLogsWithSigsExcluding(t *testing.T) {
}

func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
event1 := EmitterABI.Events["Log1"].ID
event2 := EmitterABI.Events["Log2"].ID
address1 := utils.RandomAddress()
Expand Down Expand Up @@ -1171,7 +1171,7 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
}

func TestSelectLogsCreatedAfter(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
event := EmitterABI.Events["Log1"].ID
address := utils.RandomAddress()

Expand Down Expand Up @@ -1275,7 +1275,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) {
}

func TestNestedLogPollerBlocksQuery(t *testing.T) {
th := SetupTH(t, false, 2, 3, 2)
th := SetupTH(t, false, 2, 3, 2, 1000)
event := EmitterABI.Events["Log1"].ID
address := utils.RandomAddress()

Expand Down

0 comments on commit b7a2c77

Please sign in to comment.