Skip to content

Commit

Permalink
CCIP-1496 Moving pruning to a separate loop and respecting paging the…
Browse files Browse the repository at this point in the history
…re (#12060)

* Moving pruning to a separate loop and respecting paging there

* Separate parameter to configure paging during logs pruning

* Fixing specs

* Config fixes

* Post review fixes

* Post review fixes

* Tests

* Switching to index instead of primary key, because of the performance difference

* Minor performance improvement to blocks deletion

* Minor fix

* Minor fix
  • Loading branch information
mateusz-sekara authored Feb 23, 2024
1 parent 16f1b78 commit 5f212bb
Show file tree
Hide file tree
Showing 39 changed files with 345 additions and 60 deletions.
4 changes: 4 additions & 0 deletions core/chains/evm/config/chain_scoped.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,7 @@ func (e *evmConfig) OperatorFactoryAddress() string {
}
return e.c.OperatorFactoryAddress.String()
}

func (e *evmConfig) LogPrunePageSize() uint32 {
return *e.c.LogPrunePageSize
}
2 changes: 2 additions & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"

commonassets "github.com/smartcontractkit/chainlink-common/pkg/assets"

commonconfig "github.com/smartcontractkit/chainlink/v2/common/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/config"
Expand Down Expand Up @@ -36,6 +37,7 @@ type EVM interface {
LogBackfillBatchSize() uint32
LogKeepBlocksDepth() uint32
LogPollInterval() time.Duration
LogPrunePageSize() uint32
MinContractPayment() *commonassets.Link
MinIncomingConfirmations() uint32
NonceAutoSync() bool
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ type Chain struct {
LogBackfillBatchSize *uint32
LogPollInterval *commonconfig.Duration
LogKeepBlocksDepth *uint32
LogPrunePageSize *uint32
MinIncomingConfirmations *uint32
MinContractPayment *commonassets.Link
NonceAutoSync *bool
Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/config/toml/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

cconfig "github.com/smartcontractkit/chainlink-common/pkg/config"

"github.com/smartcontractkit/chainlink/v2/common/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
)
Expand Down Expand Up @@ -136,6 +137,9 @@ func (c *Chain) SetFrom(f *Chain) {
if v := f.LogKeepBlocksDepth; v != nil {
c.LogKeepBlocksDepth = v
}
if v := f.LogPrunePageSize; v != nil {
c.LogPrunePageSize = v
}
if v := f.MinIncomingConfirmations; v != nil {
c.MinIncomingConfirmations = v
}
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/defaults/fallback.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ FinalityTagEnabled = false
LogBackfillBatchSize = 1000
LogPollInterval = '15s'
LogKeepBlocksDepth = 100000
LogPrunePageSize = 0
MinContractPayment = '.00001 link'
MinIncomingConfirmations = 3
NonceAutoSync = true
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/forwarders/forwarder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
t.Log(authorized)

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database())

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {
ec.Commit()

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database())

Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter"
Expand Down Expand Up @@ -66,7 +67,7 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize
// Mark genesis block as finalized to avoid any nulls in the tests
head := esc.Backend().Blockchain().CurrentHeader()
esc.Backend().Blockchain().SetFinalized(head)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth, 0)
emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
emitterAddress2, _, emitter2, err := log_emitter.DeployLogEmitter(owner, ec)
Expand Down
64 changes: 50 additions & 14 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type LogPollerTest interface {
BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64)
Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery
GetReplayFromBlock(ctx context.Context, requested int64) (int64, error)
PruneOldBlocks(ctx context.Context) error
PruneOldBlocks(ctx context.Context) (bool, error)
}

type Client interface {
Expand Down Expand Up @@ -106,6 +106,7 @@ type logPoller struct {
backfillBatchSize int64 // batch size to use when backfilling finalized logs
rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks
backupPollerNextBlock int64
logPrunePageSize int64

filterMu sync.RWMutex
filters map[string]Filter
Expand All @@ -130,8 +131,7 @@ type logPoller struct {
//
// How fast that can be done depends largely on network speed and DB, but even for the fastest
// support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration,
useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64) *logPoller {
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64, logsPrunePageSize int64) *logPoller {
ctx, cancel := context.WithCancel(context.Background())
return &logPoller{
ctx: ctx,
Expand All @@ -147,6 +147,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
backfillBatchSize: backfillBatchSize,
rpcBatchSize: rpcBatchSize,
keepFinalizedBlocksDepth: keepFinalizedBlocksDepth,
logPrunePageSize: logsPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
}
Expand Down Expand Up @@ -392,8 +393,9 @@ func (lp *logPoller) ReplayAsync(fromBlock int64) {

func (lp *logPoller) Start(context.Context) error {
return lp.StartOnce("LogPoller", func() error {
lp.wg.Add(1)
lp.wg.Add(2)
go lp.run()
go lp.backgroundWorkerRun()
return nil
})
}
Expand Down Expand Up @@ -439,8 +441,6 @@ func (lp *logPoller) run() {
logPollTick := time.After(0)
// stagger these somewhat, so they don't all run back-to-back
backupLogPollTick := time.After(100 * time.Millisecond)
blockPruneTick := time.After(3 * time.Second)
logPruneTick := time.After(5 * time.Second)
filtersLoaded := false

loadFilters := func() error {
Expand Down Expand Up @@ -545,15 +545,38 @@ func (lp *logPoller) run() {
continue
}
lp.BackupPollAndSaveLogs(lp.ctx, backupPollerBlockDelay)
}
}
}

func (lp *logPoller) backgroundWorkerRun() {
defer lp.wg.Done()

// Avoid putting too much pressure on the database by staggering the pruning of old blocks and logs.
// Usually, node after restart will have some work to boot the plugins and other services.
// Deferring first prune by minutes reduces risk of putting too much pressure on the database.
blockPruneTick := time.After(5 * time.Minute)
logPruneTick := time.After(10 * time.Minute)

for {
select {
case <-lp.ctx.Done():
return
case <-blockPruneTick:
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 1000))
if err := lp.PruneOldBlocks(lp.ctx); err != nil {
if allRemoved, err := lp.PruneOldBlocks(lp.ctx); err != nil {
lp.lggr.Errorw("Unable to prune old blocks", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new blocks
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100))
}
case <-logPruneTick:
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000
if err := lp.orm.DeleteExpiredLogs(pg.WithParentCtx(lp.ctx)); err != nil {
lp.lggr.Error(err)
if allRemoved, err := lp.PruneExpiredLogs(lp.ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241))
}
}
}
Expand Down Expand Up @@ -933,22 +956,35 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
}

// PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.
func (lp *logPoller) PruneOldBlocks(ctx context.Context) error {
// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
latestBlock, err := lp.orm.SelectLatestBlock(pg.WithParentCtx(ctx))
if err != nil {
return err
return false, err
}
if latestBlock == nil {
// No blocks saved yet.
return nil
return true, nil
}
if latestBlock.FinalizedBlockNumber <= lp.keepFinalizedBlocksDepth {
// No-op, keep all blocks
return nil
return true, nil
}
// 1-2-3-4-5(finalized)-6-7(latest), keepFinalizedBlocksDepth=3
// Remove <= 2
return lp.orm.DeleteBlocksBefore(latestBlock.FinalizedBlockNumber-lp.keepFinalizedBlocksDepth, pg.WithParentCtx(ctx))
rowsRemoved, err := lp.orm.DeleteBlocksBefore(
latestBlock.FinalizedBlockNumber-lp.keepFinalizedBlocksDepth,
lp.logPrunePageSize,
pg.WithParentCtx(ctx),
)
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// PruneExpiredLogs logs that are older than their retention period defined in Filter.
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
rowsRemoved, err := lp.orm.DeleteExpiredLogs(lp.logPrunePageSize, pg.WithParentCtx(ctx))
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// Logs returns logs matching topics and address (exactly) in the given block range,
Expand Down
16 changes: 8 additions & 8 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestLogPoller_RegisterFilter(t *testing.T) {
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))

// Set up a test chain with a log emitting contract deployed.
lp := NewLogPoller(orm, nil, lggr, time.Hour, false, 1, 1, 2, 1000)
lp := NewLogPoller(orm, nil, lggr, time.Hour, false, 1, 1, 2, 1000, 0)

// We expect a zero Filter if nothing registered yet.
f := lp.Filter(nil, nil, nil)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) {

ctx := testutils.Context(t)

lp := NewLogPoller(orm, ec, lggr, 1*time.Hour, false, 2, 3, 2, 1000)
lp := NewLogPoller(orm, ec, lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0)
lp.BackupPollAndSaveLogs(ctx, 100)
assert.Equal(t, int64(0), lp.backupPollerNextBlock)
assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len())
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestLogPoller_Replay(t *testing.T) {
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil)
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once()
ec.On("ConfiguredChainID").Return(chainID, nil)
lp := NewLogPoller(orm, ec, lggr, time.Hour, false, 3, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, false, 3, 3, 3, 20, 0)

// process 1 log in block 3
lp.PollAndSaveLogs(testutils.Context(t), 4)
Expand Down Expand Up @@ -446,7 +446,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {
ec := evmclimocks.NewClient(t)
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil)

lp := NewLogPoller(orm, ec, lggr, time.Hour, false, finalityDepth, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, false, finalityDepth, 3, 3, 20, 0)
latestBlock, lastFinalizedBlockNumber, err := lp.latestBlocks(testutils.Context(t))
require.NoError(t, err)
require.Equal(t, latestBlock.Number, head.Number)
Expand All @@ -470,7 +470,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {
*(elems[1].Result.(*evmtypes.Head)) = evmtypes.Head{Number: expectedLastFinalizedBlockNumber, Hash: utils.RandomBytes32()}
})

lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0)

latestBlock, lastFinalizedBlockNumber, err := lp.latestBlocks(testutils.Context(t))
require.NoError(t, err)
Expand All @@ -488,7 +488,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {
elems[1].Error = fmt.Errorf("some error")
})

lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0)
_, _, err := lp.latestBlocks(testutils.Context(t))
require.Error(t, err)
})
Expand All @@ -497,7 +497,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {
ec := evmclimocks.NewClient(t)
ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(fmt.Errorf("some error"))

lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0)
_, _, err := lp.latestBlocks(testutils.Context(t))
require.Error(t, err)
})
Expand All @@ -506,7 +506,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {

func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) {
lggr := logger.Test(b)
lp := NewLogPoller(nil, nil, lggr, 1*time.Hour, false, 2, 3, 2, 1000)
lp := NewLogPoller(nil, nil, lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0)
for i := 0; i < nFilters; i++ {
var addresses []common.Address
var events []common.Hash
Expand Down
13 changes: 8 additions & 5 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
}, 10e6)
_, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
lp := logpoller.NewLogPoller(orm, client.NewSimulatedBackendClient(t, ec, chainID), lggr, 15*time.Second, false, int64(finalityDepth), 3, 2, 1000)
lp := logpoller.NewLogPoller(orm, client.NewSimulatedBackendClient(t, ec, chainID), lggr, 15*time.Second, false, int64(finalityDepth), 3, 2, 1000, 0)
for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1.
ec.Commit()
}
Expand Down Expand Up @@ -1306,7 +1306,7 @@ func TestLogPoller_DBErrorHandling(t *testing.T) {
ec.Commit()
ec.Commit()

lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, 1*time.Hour, false, 2, 3, 2, 1000)
lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0)

err = lp.Replay(ctx, 5) // block number too high
require.ErrorContains(t, err, "Invalid replay block number")
Expand Down Expand Up @@ -1354,7 +1354,7 @@ func TestTooManyLogResults(t *testing.T) {
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)
o := logpoller.NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
lp := logpoller.NewLogPoller(o, ec, lggr, 1*time.Hour, false, 2, 20, 10, 1000)
lp := logpoller.NewLogPoller(o, ec, lggr, 1*time.Hour, false, 2, 20, 10, 1000, 0)
expected := []int64{10, 5, 2, 1}

clientErr := client.JsonError{
Expand Down Expand Up @@ -1671,11 +1671,14 @@ func Test_PruneOldBlocks(t *testing.T) {
}

if tt.wantErr {
require.Error(t, th.LogPoller.PruneOldBlocks(ctx))
_, err := th.LogPoller.PruneOldBlocks(ctx)
require.Error(t, err)
return
}

require.NoError(t, th.LogPoller.PruneOldBlocks(ctx))
allDeleted, err := th.LogPoller.PruneOldBlocks(ctx)
require.NoError(t, err)
assert.True(t, allDeleted)
blocks, err := th.ORM.GetBlocksRange(0, math.MaxInt64, pg.WithParentCtx(ctx))
require.NoError(t, err)
assert.Len(t, blocks, tt.blocksLeft)
Expand Down
28 changes: 22 additions & 6 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error {
})
}

func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksBefore", del, func() error {
return o.ORM.DeleteBlocksBefore(end, qopts...)
func (o *ObservedORM) DeleteBlocksBefore(end int64, limit int64, qopts ...pg.QOpt) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteBlocksBefore", del, func() (int64, error) {
return o.ORM.DeleteBlocksBefore(end, limit, qopts...)
})
}

Expand All @@ -134,9 +134,9 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) er
})
}

func (o *ObservedORM) DeleteExpiredLogs(qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteExpiredLogs", del, func() error {
return o.ORM.DeleteExpiredLogs(qopts...)
func (o *ObservedORM) DeleteExpiredLogs(limit int64, qopts ...pg.QOpt) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(limit, qopts...)
})
}

Expand Down Expand Up @@ -264,6 +264,22 @@ func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query
return results, err
}

func withObservedExecAndRowsAffected(o *ObservedORM, queryName string, queryType queryType, exec func() (int64, error)) (int64, error) {
queryStarted := time.Now()
rowsAffected, err := exec()
o.queryDuration.
WithLabelValues(o.chainId, queryName, string(queryType)).
Observe(float64(time.Since(queryStarted)))

if err != nil {
o.datasetSize.
WithLabelValues(o.chainId, queryName, string(queryType)).
Set(float64(rowsAffected))
}

return rowsAffected, err
}

func withObservedQuery[T any](o *ObservedORM, queryName string, query func() (T, error)) (T, error) {
queryStarted := time.Now()
defer func() {
Expand Down
Loading

0 comments on commit 5f212bb

Please sign in to comment.