Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-1496 Moving pruning to a separate loop and respecting paging there #12060

Merged
merged 11 commits into from
Feb 23, 2024
4 changes: 4 additions & 0 deletions core/chains/evm/config/chain_scoped.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,7 @@ func (e *evmConfig) OperatorFactoryAddress() string {
}
return e.c.OperatorFactoryAddress.String()
}

func (e *evmConfig) LogPrunePageSize() uint32 {
return *e.c.LogPrunePageSize
}
2 changes: 2 additions & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"

commonassets "github.com/smartcontractkit/chainlink-common/pkg/assets"

commonconfig "github.com/smartcontractkit/chainlink/v2/common/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/config"
Expand Down Expand Up @@ -36,6 +37,7 @@ type EVM interface {
LogBackfillBatchSize() uint32
LogKeepBlocksDepth() uint32
LogPollInterval() time.Duration
LogPrunePageSize() uint32
MinContractPayment() *commonassets.Link
MinIncomingConfirmations() uint32
NonceAutoSync() bool
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ type Chain struct {
LogBackfillBatchSize *uint32
LogPollInterval *commonconfig.Duration
LogKeepBlocksDepth *uint32
LogPrunePageSize *uint32
MinIncomingConfirmations *uint32
MinContractPayment *commonassets.Link
NonceAutoSync *bool
Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/config/toml/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

cconfig "github.com/smartcontractkit/chainlink-common/pkg/config"

"github.com/smartcontractkit/chainlink/v2/common/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
)
Expand Down Expand Up @@ -136,6 +137,9 @@ func (c *Chain) SetFrom(f *Chain) {
if v := f.LogKeepBlocksDepth; v != nil {
c.LogKeepBlocksDepth = v
}
if v := f.LogPrunePageSize; v != nil {
c.LogPrunePageSize = v
}
if v := f.MinIncomingConfirmations; v != nil {
c.MinIncomingConfirmations = v
}
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/defaults/fallback.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ FinalityTagEnabled = false
LogBackfillBatchSize = 1000
LogPollInterval = '15s'
LogKeepBlocksDepth = 100000
LogPrunePageSize = 0
MinContractPayment = '.00001 link'
MinIncomingConfirmations = 3
NonceAutoSync = true
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/forwarders/forwarder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
t.Log(authorized)

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database())

Expand Down Expand Up @@ -113,7 +113,7 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {
ec.Commit()

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000)
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database())
fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database())

Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/log_emitter"
Expand Down Expand Up @@ -66,7 +67,7 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize
// Mark genesis block as finalized to avoid any nulls in the tests
head := esc.Backend().Blockchain().CurrentHeader()
esc.Backend().Blockchain().SetFinalized(head)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth)
lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth, 0)
emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
emitterAddress2, _, emitter2, err := log_emitter.DeployLogEmitter(owner, ec)
Expand Down
64 changes: 50 additions & 14 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type LogPollerTest interface {
BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64)
Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery
GetReplayFromBlock(ctx context.Context, requested int64) (int64, error)
PruneOldBlocks(ctx context.Context) error
PruneOldBlocks(ctx context.Context) (bool, error)
}

type Client interface {
Expand Down Expand Up @@ -106,6 +106,7 @@ type logPoller struct {
backfillBatchSize int64 // batch size to use when backfilling finalized logs
rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks
backupPollerNextBlock int64
logPrunePageSize int64

filterMu sync.RWMutex
filters map[string]Filter
Expand All @@ -130,8 +131,7 @@ type logPoller struct {
//
// How fast that can be done depends largely on network speed and DB, but even for the fastest
// support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration,
useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64) *logPoller {
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64, logsPrunePageSize int64) *logPoller {
ctx, cancel := context.WithCancel(context.Background())
return &logPoller{
ctx: ctx,
Expand All @@ -147,6 +147,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
backfillBatchSize: backfillBatchSize,
rpcBatchSize: rpcBatchSize,
keepFinalizedBlocksDepth: keepFinalizedBlocksDepth,
logPrunePageSize: logsPrunePageSize,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this param as a backward compatibility layer. Products already using "delete all" queries will not be affected by this PR. Product has to explicitly specify that they need paging

filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
}
Expand Down Expand Up @@ -387,8 +388,9 @@ func (lp *logPoller) ReplayAsync(fromBlock int64) {

func (lp *logPoller) Start(context.Context) error {
return lp.StartOnce("LogPoller", func() error {
lp.wg.Add(1)
lp.wg.Add(2)
go lp.run()
go lp.backgroundWorkerRun()
return nil
})
}
Expand Down Expand Up @@ -434,8 +436,6 @@ func (lp *logPoller) run() {
logPollTick := time.After(0)
// stagger these somewhat, so they don't all run back-to-back
backupLogPollTick := time.After(100 * time.Millisecond)
blockPruneTick := time.After(3 * time.Second)
logPruneTick := time.After(5 * time.Second)
filtersLoaded := false

loadFilters := func() error {
Expand Down Expand Up @@ -540,15 +540,38 @@ func (lp *logPoller) run() {
continue
}
lp.BackupPollAndSaveLogs(lp.ctx, backupPollerBlockDelay)
}
}
}

func (lp *logPoller) backgroundWorkerRun() {
defer lp.wg.Done()

// Avoid putting too much pressure on the database by staggering the pruning of old blocks and logs.
// Usually, node after restart will have some work to boot the plugins and other services.
// Deferring first prune by minutes reduces risk of putting too much pressure on the database.
blockPruneTick := time.After(5 * time.Minute)
logPruneTick := time.After(10 * time.Minute)

for {
select {
case <-lp.ctx.Done():
return
case <-blockPruneTick:
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 1000))
if err := lp.PruneOldBlocks(lp.ctx); err != nil {
if allRemoved, err := lp.PruneOldBlocks(lp.ctx); err != nil {
lp.lggr.Errorw("Unable to prune old blocks", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new blocks
blockPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 100))
}
case <-logPruneTick:
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 2401)) // = 7^5 avoids common factors with 1000
if err := lp.orm.DeleteExpiredLogs(pg.WithParentCtx(lp.ctx)); err != nil {
lp.lggr.Error(err)
if allRemoved, err := lp.PruneExpiredLogs(lp.ctx); err != nil {
lp.lggr.Errorw("Unable to prune expired logs", "err", err)
} else if !allRemoved {
// Tick faster when cleanup can't keep up with the pace of new logs
logPruneTick = time.After(utils.WithJitter(lp.pollPeriod * 241))
}
}
}
Expand Down Expand Up @@ -928,22 +951,35 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
}

// PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.
mateusz-sekara marked this conversation as resolved.
Show resolved Hide resolved
func (lp *logPoller) PruneOldBlocks(ctx context.Context) error {
// Returns whether all blocks eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
func (lp *logPoller) PruneOldBlocks(ctx context.Context) (bool, error) {
latestBlock, err := lp.orm.SelectLatestBlock(pg.WithParentCtx(ctx))
if err != nil {
return err
return false, err
}
if latestBlock == nil {
// No blocks saved yet.
return nil
return true, nil
}
if latestBlock.FinalizedBlockNumber <= lp.keepFinalizedBlocksDepth {
// No-op, keep all blocks
return nil
return true, nil
}
// 1-2-3-4-5(finalized)-6-7(latest), keepFinalizedBlocksDepth=3
// Remove <= 2
return lp.orm.DeleteBlocksBefore(latestBlock.FinalizedBlockNumber-lp.keepFinalizedBlocksDepth, pg.WithParentCtx(ctx))
rowsRemoved, err := lp.orm.DeleteBlocksBefore(
latestBlock.FinalizedBlockNumber-lp.keepFinalizedBlocksDepth,
lp.logPrunePageSize,
pg.WithParentCtx(ctx),
)
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// PruneExpiredLogs logs that are older than their retention period defined in Filter.
// Returns whether all logs eligible for pruning were removed. If logPrunePageSize is set to 0, it will always return true.
func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) {
rowsRemoved, err := lp.orm.DeleteExpiredLogs(lp.logPrunePageSize, pg.WithParentCtx(ctx))
return lp.logPrunePageSize == 0 || rowsRemoved < lp.logPrunePageSize, err
}

// Logs returns logs matching topics and address (exactly) in the given block range,
Expand Down
16 changes: 8 additions & 8 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestLogPoller_RegisterFilter(t *testing.T) {
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))

// Set up a test chain with a log emitting contract deployed.
lp := NewLogPoller(orm, nil, lggr, time.Hour, false, 1, 1, 2, 1000)
lp := NewLogPoller(orm, nil, lggr, time.Hour, false, 1, 1, 2, 1000, 0)

// We expect a zero Filter if nothing registered yet.
f := lp.Filter(nil, nil, nil)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) {

ctx := testutils.Context(t)

lp := NewLogPoller(orm, ec, lggr, 1*time.Hour, false, 2, 3, 2, 1000)
lp := NewLogPoller(orm, ec, lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0)
lp.BackupPollAndSaveLogs(ctx, 100)
assert.Equal(t, int64(0), lp.backupPollerNextBlock)
assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len())
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestLogPoller_Replay(t *testing.T) {
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil)
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once()
ec.On("ConfiguredChainID").Return(chainID, nil)
lp := NewLogPoller(orm, ec, lggr, time.Hour, false, 3, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, false, 3, 3, 3, 20, 0)

// process 1 log in block 3
lp.PollAndSaveLogs(testutils.Context(t), 4)
Expand Down Expand Up @@ -446,7 +446,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {
ec := evmclimocks.NewClient(t)
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil)

lp := NewLogPoller(orm, ec, lggr, time.Hour, false, finalityDepth, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, false, finalityDepth, 3, 3, 20, 0)
latestBlock, lastFinalizedBlockNumber, err := lp.latestBlocks(testutils.Context(t))
require.NoError(t, err)
require.Equal(t, latestBlock.Number, head.Number)
Expand All @@ -470,7 +470,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {
*(elems[1].Result.(*evmtypes.Head)) = evmtypes.Head{Number: expectedLastFinalizedBlockNumber, Hash: utils.RandomBytes32()}
})

lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0)

latestBlock, lastFinalizedBlockNumber, err := lp.latestBlocks(testutils.Context(t))
require.NoError(t, err)
Expand All @@ -488,7 +488,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {
elems[1].Error = fmt.Errorf("some error")
})

lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0)
_, _, err := lp.latestBlocks(testutils.Context(t))
require.Error(t, err)
})
Expand All @@ -497,7 +497,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {
ec := evmclimocks.NewClient(t)
ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(fmt.Errorf("some error"))

lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20)
lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0)
_, _, err := lp.latestBlocks(testutils.Context(t))
require.Error(t, err)
})
Expand All @@ -506,7 +506,7 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) {

func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) {
lggr := logger.Test(b)
lp := NewLogPoller(nil, nil, lggr, 1*time.Hour, false, 2, 3, 2, 1000)
lp := NewLogPoller(nil, nil, lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0)
for i := 0; i < nFilters; i++ {
var addresses []common.Address
var events []common.Hash
Expand Down
13 changes: 8 additions & 5 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
}, 10e6)
_, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec)
require.NoError(t, err)
lp := logpoller.NewLogPoller(orm, client.NewSimulatedBackendClient(t, ec, chainID), lggr, 15*time.Second, false, int64(finalityDepth), 3, 2, 1000)
lp := logpoller.NewLogPoller(orm, client.NewSimulatedBackendClient(t, ec, chainID), lggr, 15*time.Second, false, int64(finalityDepth), 3, 2, 1000, 0)
for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1.
ec.Commit()
}
Expand Down Expand Up @@ -1288,7 +1288,7 @@ func TestLogPoller_DBErrorHandling(t *testing.T) {
ec.Commit()
ec.Commit()

lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, 1*time.Hour, false, 2, 3, 2, 1000)
lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0)

err = lp.Replay(ctx, 5) // block number too high
require.ErrorContains(t, err, "Invalid replay block number")
Expand Down Expand Up @@ -1336,7 +1336,7 @@ func TestTooManyLogResults(t *testing.T) {
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)
o := logpoller.NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
lp := logpoller.NewLogPoller(o, ec, lggr, 1*time.Hour, false, 2, 20, 10, 1000)
lp := logpoller.NewLogPoller(o, ec, lggr, 1*time.Hour, false, 2, 20, 10, 1000, 0)
expected := []int64{10, 5, 2, 1}

clientErr := client.JsonError{
Expand Down Expand Up @@ -1649,11 +1649,14 @@ func Test_PruneOldBlocks(t *testing.T) {
}

if tt.wantErr {
require.Error(t, th.LogPoller.PruneOldBlocks(ctx))
_, err := th.LogPoller.PruneOldBlocks(ctx)
require.Error(t, err)
return
}

require.NoError(t, th.LogPoller.PruneOldBlocks(ctx))
allDeleted, err := th.LogPoller.PruneOldBlocks(ctx)
require.NoError(t, err)
assert.True(t, allDeleted)
blocks, err := th.ORM.GetBlocksRange(0, math.MaxInt64, pg.WithParentCtx(ctx))
require.NoError(t, err)
assert.Len(t, blocks, tt.blocksLeft)
Expand Down
28 changes: 22 additions & 6 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error {
})
}

func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksBefore", del, func() error {
return o.ORM.DeleteBlocksBefore(end, qopts...)
func (o *ObservedORM) DeleteBlocksBefore(end int64, limit int64, qopts ...pg.QOpt) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteBlocksBefore", del, func() (int64, error) {
return o.ORM.DeleteBlocksBefore(end, limit, qopts...)
})
}

Expand All @@ -134,9 +134,9 @@ func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) er
})
}

func (o *ObservedORM) DeleteExpiredLogs(qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteExpiredLogs", del, func() error {
return o.ORM.DeleteExpiredLogs(qopts...)
func (o *ObservedORM) DeleteExpiredLogs(limit int64, qopts ...pg.QOpt) (int64, error) {
return withObservedExecAndRowsAffected(o, "DeleteExpiredLogs", del, func() (int64, error) {
return o.ORM.DeleteExpiredLogs(limit, qopts...)
})
}

Expand Down Expand Up @@ -264,6 +264,22 @@ func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query
return results, err
}

func withObservedExecAndRowsAffected(o *ObservedORM, queryName string, queryType queryType, exec func() (int64, error)) (int64, error) {
queryStarted := time.Now()
rowsAffected, err := exec()
o.queryDuration.
WithLabelValues(o.chainId, queryName, string(queryType)).
Observe(float64(time.Since(queryStarted)))

if err != nil {
o.datasetSize.
WithLabelValues(o.chainId, queryName, string(queryType)).
Set(float64(rowsAffected))
}

return rowsAffected, err
}

func withObservedQuery[T any](o *ObservedORM, queryName string, query func() (T, error)) (T, error) {
queryStarted := time.Now()
defer func() {
Expand Down
Loading
Loading