From 014e84241f35205442e6e1116b7c2e0a0f809ad5 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 28 Feb 2024 10:07:57 -0800 Subject: [PATCH] Allow customizing `BackupPollerDelayBlock` or disabling Backup LogPoller entirely for tests (#11850) * Refactor NewLogPoller() params into logpoller.Opts struct, add BackupPollerBlockDela * Fix chain reader test & logpoller test This failing chainreader test had FinalityDepth set to 0, so LogPoller was starting after the block where the logs were emitted. It was only passing due to Backup LogPoller. With Backup LogPoller disabled, we have to increase the finality depth so that LogPoller will start a few blocks back (at the first unfinalized block). The failing logpoller test was looking for an extra log entry emitted by BackupLogPoller. Now we can run it with BackupLogPoller disabled (leading to a more robust test) and stop looking for that * Fix LogPoller_Replay test * Address SonarQube CodeSmell: reduce cognitive complexity of lp.run() * Add BackupLogPollerBlockDelay to toml config * Fix some merge conflicts * Restore KeepFinalizedBlocksDepth: 1000 * Use default BackupPollerBlockDelay: 0 instead of explicitly setting to 0 * disable backup log poller in lp smoke tests * Fix generated docs * Add missing web resolver testdata --------- Co-authored-by: Bartek Tofel --- core/chains/evm/config/chain_scoped.go | 4 + core/chains/evm/config/config.go | 1 + core/chains/evm/config/toml/config.go | 39 ++-- core/chains/evm/config/toml/defaults.go | 3 + .../evm/config/toml/defaults/fallback.toml | 1 + .../evm/forwarders/forwarder_manager_test.go | 19 +- core/chains/evm/logpoller/helper_test.go | 8 +- core/chains/evm/logpoller/log_poller.go | 144 ++++++++------ .../evm/logpoller/log_poller_internal_test.go | 66 +++++-- core/chains/evm/logpoller/log_poller_test.go | 186 +++++++++++++++--- core/chains/evm/logpoller/orm_test.go | 39 ++-- core/chains/evm/txmgr/txmgr_test.go | 9 +- core/chains/legacyevm/chain.go | 22 +-- core/config/docs/chains-evm.toml | 4 + core/services/chainlink/config_test.go | 26 +-- .../chainlink/testdata/config-full.toml | 1 + .../config-multi-chain-effective.toml | 3 + ...annel_definition_cache_integration_test.go | 28 ++- .../v21/logprovider/integration_test.go | 9 +- .../promreporter/prom_reporter_test.go | 9 +- core/services/relay/evm/chain_reader_test.go | 14 +- core/services/relay/evm/config_poller_test.go | 10 +- .../relay/evm/functions/config_poller_test.go | 9 +- .../relay/evm/mercury/helpers_test.go | 10 +- core/services/vrf/v2/bhs_feeder_test.go | 6 +- .../vrf/v2/integration_helpers_test.go | 5 +- .../vrf/v2/listener_v2_log_listener_test.go | 11 +- core/web/resolver/chain_test.go | 2 + core/web/resolver/testdata/config-full.toml | 1 + .../config-multi-chain-effective.toml | 3 + docs/CONFIG.md | 59 ++++++ integration-tests/smoke/log_poller_test.go | 3 +- .../testconfig/log_poller/config.go | 12 +- .../testconfig/log_poller/log_poller.toml | 3 + .../universal/log_poller/helpers.go | 2 + .../disk-based-logging-disabled.txtar | 1 + .../validate/disk-based-logging-no-dir.txtar | 1 + .../node/validate/disk-based-logging.txtar | 1 + testdata/scripts/node/validate/invalid.txtar | 1 + testdata/scripts/node/validate/valid.txtar | 1 + 40 files changed, 584 insertions(+), 192 deletions(-) diff --git a/core/chains/evm/config/chain_scoped.go b/core/chains/evm/config/chain_scoped.go index 69cc4c0a6ad..9247e77ba9d 100644 --- a/core/chains/evm/config/chain_scoped.go +++ b/core/chains/evm/config/chain_scoped.go @@ -134,6 +134,10 @@ func (e *evmConfig) LogKeepBlocksDepth() uint32 { return *e.c.LogKeepBlocksDepth } +func (e *evmConfig) BackupLogPollerBlockDelay() uint64 { + return *e.c.BackupLogPollerBlockDelay +} + func (e *evmConfig) NonceAutoSync() bool { return *e.c.NonceAutoSync } diff --git a/core/chains/evm/config/config.go b/core/chains/evm/config/config.go index cb1a6073e0b..c878fea46e9 100644 --- a/core/chains/evm/config/config.go +++ b/core/chains/evm/config/config.go @@ -36,6 +36,7 @@ type EVM interface { LinkContractAddress() string LogBackfillBatchSize() uint32 LogKeepBlocksDepth() uint32 + BackupLogPollerBlockDelay() uint64 LogPollInterval() time.Duration LogPrunePageSize() uint32 MinContractPayment() *commonassets.Link diff --git a/core/chains/evm/config/toml/config.go b/core/chains/evm/config/toml/config.go index 95b6c342161..1ab4aa20d01 100644 --- a/core/chains/evm/config/toml/config.go +++ b/core/chains/evm/config/toml/config.go @@ -341,25 +341,26 @@ func (c *EVMConfig) TOMLString() (string, error) { } type Chain struct { - AutoCreateKey *bool - BlockBackfillDepth *uint32 - BlockBackfillSkip *bool - ChainType *string - FinalityDepth *uint32 - FinalityTagEnabled *bool - FlagsContractAddress *ethkey.EIP55Address - LinkContractAddress *ethkey.EIP55Address - LogBackfillBatchSize *uint32 - LogPollInterval *commonconfig.Duration - LogKeepBlocksDepth *uint32 - LogPrunePageSize *uint32 - MinIncomingConfirmations *uint32 - MinContractPayment *commonassets.Link - NonceAutoSync *bool - NoNewHeadsThreshold *commonconfig.Duration - OperatorFactoryAddress *ethkey.EIP55Address - RPCDefaultBatchSize *uint32 - RPCBlockQueryDelay *uint16 + AutoCreateKey *bool + BlockBackfillDepth *uint32 + BlockBackfillSkip *bool + ChainType *string + FinalityDepth *uint32 + FinalityTagEnabled *bool + FlagsContractAddress *ethkey.EIP55Address + LinkContractAddress *ethkey.EIP55Address + LogBackfillBatchSize *uint32 + LogPollInterval *commonconfig.Duration + LogKeepBlocksDepth *uint32 + LogPrunePageSize *uint32 + BackupLogPollerBlockDelay *uint64 + MinIncomingConfirmations *uint32 + MinContractPayment *commonassets.Link + NonceAutoSync *bool + NoNewHeadsThreshold *commonconfig.Duration + OperatorFactoryAddress *ethkey.EIP55Address + RPCDefaultBatchSize *uint32 + RPCBlockQueryDelay *uint16 Transactions Transactions `toml:",omitempty"` BalanceMonitor BalanceMonitor `toml:",omitempty"` diff --git a/core/chains/evm/config/toml/defaults.go b/core/chains/evm/config/toml/defaults.go index 242373fd4af..951246eeb22 100644 --- a/core/chains/evm/config/toml/defaults.go +++ b/core/chains/evm/config/toml/defaults.go @@ -140,6 +140,9 @@ func (c *Chain) SetFrom(f *Chain) { if v := f.LogPrunePageSize; v != nil { c.LogPrunePageSize = v } + if v := f.BackupLogPollerBlockDelay; v != nil { + c.BackupLogPollerBlockDelay = v + } if v := f.MinIncomingConfirmations; v != nil { c.MinIncomingConfirmations = v } diff --git a/core/chains/evm/config/toml/defaults/fallback.toml b/core/chains/evm/config/toml/defaults/fallback.toml index 8587a5b4b20..1a1d9b69439 100644 --- a/core/chains/evm/config/toml/defaults/fallback.toml +++ b/core/chains/evm/config/toml/defaults/fallback.toml @@ -7,6 +7,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinContractPayment = '.00001 link' MinIncomingConfirmations = 3 NonceAutoSync = true diff --git a/core/chains/evm/forwarders/forwarder_manager_test.go b/core/chains/evm/forwarders/forwarder_manager_test.go index 4480e533525..0eb51a535e0 100644 --- a/core/chains/evm/forwarders/forwarder_manager_test.go +++ b/core/chains/evm/forwarders/forwarder_manager_test.go @@ -60,7 +60,15 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) { t.Log(authorized) evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, lpOpts) fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database()) fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database()) @@ -113,7 +121,14 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) { ec.Commit() evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, lpOpts) fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database()) fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database()) diff --git a/core/chains/evm/logpoller/helper_test.go b/core/chains/evm/logpoller/helper_test.go index 3215a7ec20c..a2a470741f6 100644 --- a/core/chains/evm/logpoller/helper_test.go +++ b/core/chains/evm/logpoller/helper_test.go @@ -46,7 +46,7 @@ type TestHarness struct { EthDB ethdb.Database } -func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth int64) TestHarness { +func SetupTH(t testing.TB, opts logpoller.Opts) TestHarness { lggr := logger.Test(t) chainID := testutils.NewRandomEVMChainID() chainID2 := testutils.NewRandomEVMChainID() @@ -67,7 +67,11 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize // Mark genesis block as finalized to avoid any nulls in the tests head := esc.Backend().Blockchain().CurrentHeader() esc.Backend().Blockchain().SetFinalized(head) - lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth, 0) + + if opts.PollPeriod == 0 { + opts.PollPeriod = 1 * time.Hour + } + lp := logpoller.NewLogPoller(o, esc, lggr, opts) emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec) require.NoError(t, err) emitterAddress2, _, emitter2, err := log_emitter.DeployLogEmitter(owner, ec) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index b1d00206130..444d0153542 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -73,7 +73,7 @@ const ( type LogPollerTest interface { LogPoller PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) - BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64) + BackupPollAndSaveLogs(ctx context.Context) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) PruneOldBlocks(ctx context.Context) (bool, error) @@ -105,8 +105,9 @@ type logPoller struct { keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database backfillBatchSize int64 // batch size to use when backfilling finalized logs rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks - backupPollerNextBlock int64 logPrunePageSize int64 + backupPollerNextBlock int64 // next block to be processed by Backup LogPoller + backupPollerBlockDelay int64 // how far behind regular LogPoller should BackupLogPoller run. 0 = disabled filterMu sync.RWMutex filters map[string]Filter @@ -121,6 +122,17 @@ type logPoller struct { wg sync.WaitGroup } +type Opts struct { + PollPeriod time.Duration + UseFinalityTag bool + FinalityDepth int64 + BackfillBatchSize int64 + RpcBatchSize int64 + KeepFinalizedBlocksDepth int64 + BackupPollerBlockDelay int64 + LogPrunePageSize int64 +} + // NewLogPoller creates a log poller. Note there is an assumption // that blocks can be processed faster than they are produced for the given chain, or the poller will fall behind. // Block processing involves the following calls in steady state (without reorgs): @@ -131,7 +143,7 @@ type logPoller struct { // // How fast that can be done depends largely on network speed and DB, but even for the fastest // support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency -func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64, logsPrunePageSize int64) *logPoller { +func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller { ctx, cancel := context.WithCancel(context.Background()) return &logPoller{ ctx: ctx, @@ -141,13 +153,14 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), replayStart: make(chan int64), replayComplete: make(chan error), - pollPeriod: pollPeriod, - finalityDepth: finalityDepth, - useFinalityTag: useFinalityTag, - backfillBatchSize: backfillBatchSize, - rpcBatchSize: rpcBatchSize, - keepFinalizedBlocksDepth: keepFinalizedBlocksDepth, - logPrunePageSize: logsPrunePageSize, + pollPeriod: opts.PollPeriod, + backupPollerBlockDelay: opts.BackupPollerBlockDelay, + finalityDepth: opts.FinalityDepth, + useFinalityTag: opts.UseFinalityTag, + backfillBatchSize: opts.BackfillBatchSize, + rpcBatchSize: opts.RpcBatchSize, + keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, + logPrunePageSize: opts.LogPrunePageSize, filters: make(map[string]Filter), filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. } @@ -436,6 +449,20 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i return mathutil.Min(requested, lastProcessed.BlockNumber), nil } +func (lp *logPoller) loadFilters() error { + lp.filterMu.Lock() + defer lp.filterMu.Unlock() + filters, err := lp.orm.LoadFilters(pg.WithParentCtx(lp.ctx)) + + if err != nil { + return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") + } + + lp.filters = filters + lp.filterDirty = true + return nil +} + func (lp *logPoller) run() { defer lp.wg.Done() logPollTick := time.After(0) @@ -443,60 +470,20 @@ func (lp *logPoller) run() { backupLogPollTick := time.After(100 * time.Millisecond) filtersLoaded := false - loadFilters := func() error { - lp.filterMu.Lock() - defer lp.filterMu.Unlock() - filters, err := lp.orm.LoadFilters(pg.WithParentCtx(lp.ctx)) - - if err != nil { - return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") - } - - lp.filters = filters - lp.filterDirty = true - filtersLoaded = true - return nil - } - for { select { case <-lp.ctx.Done(): return case fromBlockReq := <-lp.replayStart: - fromBlock, err := lp.GetReplayFromBlock(lp.ctx, fromBlockReq) - if err == nil { - if !filtersLoaded { - lp.lggr.Warnw("Received replayReq before filters loaded", "fromBlock", fromBlock, "requested", fromBlockReq) - if err = loadFilters(); err != nil { - lp.lggr.Errorw("Failed loading filters during Replay", "err", err, "fromBlock", fromBlock) - } - } - if err == nil { - // Serially process replay requests. - lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq) - lp.PollAndSaveLogs(lp.ctx, fromBlock) - lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq) - } - } else { - lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err) - } - select { - case <-lp.ctx.Done(): - // We're shutting down, notify client and exit - select { - case lp.replayComplete <- ErrReplayRequestAborted: - default: - } - return - case lp.replayComplete <- err: - } + lp.handleReplayRequest(fromBlockReq, filtersLoaded) case <-logPollTick: logPollTick = time.After(utils.WithJitter(lp.pollPeriod)) if !filtersLoaded { - if err := loadFilters(); err != nil { + if err := lp.loadFilters(); err != nil { lp.lggr.Errorw("Failed loading filters in main logpoller loop, retrying later", "err", err) continue } + filtersLoaded = true } // Always start from the latest block in the db. @@ -529,22 +516,23 @@ func (lp *logPoller) run() { } lp.PollAndSaveLogs(lp.ctx, start) case <-backupLogPollTick: + if lp.backupPollerBlockDelay == 0 { + continue // backup poller is disabled + } // Backup log poller: this serves as an emergency backup to protect against eventual-consistency behavior // of an rpc node (seen occasionally on optimism, but possibly could happen on other chains?). If the first // time we request a block, no logs or incomplete logs come back, this ensures that every log is eventually - // re-requested after it is finalized. This doesn't add much overhead, because we can request all of them - // in one shot, since we don't need to worry about re-orgs after finality depth, and it runs 100x less - // frequently than the primary log poller. - - // If pollPeriod is set to 1 block time, backup log poller will run once every 100 blocks - const backupPollerBlockDelay = 100 + // re-requested after it is finalized. This doesn't add much overhead, because we can request all of them + // in one shot, since we don't need to worry about re-orgs after finality depth, and it runs far less + // frequently than the primary log poller (instead of roughly once per block it runs once roughly once every + // lp.backupPollerDelay blocks--with default settings about 100x less frequently). - backupLogPollTick = time.After(utils.WithJitter(backupPollerBlockDelay * lp.pollPeriod)) + backupLogPollTick = time.After(utils.WithJitter(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod)) if !filtersLoaded { lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping") continue } - lp.BackupPollAndSaveLogs(lp.ctx, backupPollerBlockDelay) + lp.BackupPollAndSaveLogs(lp.ctx) } } } @@ -582,7 +570,37 @@ func (lp *logPoller) backgroundWorkerRun() { } } -func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64) { +func (lp *logPoller) handleReplayRequest(fromBlockReq int64, filtersLoaded bool) { + fromBlock, err := lp.GetReplayFromBlock(lp.ctx, fromBlockReq) + if err == nil { + if !filtersLoaded { + lp.lggr.Warnw("Received replayReq before filters loaded", "fromBlock", fromBlock, "requested", fromBlockReq) + if err = lp.loadFilters(); err != nil { + lp.lggr.Errorw("Failed loading filters during Replay", "err", err, "fromBlock", fromBlock) + } + } + if err == nil { + // Serially process replay requests. + lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq) + lp.PollAndSaveLogs(lp.ctx, fromBlock) + lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq) + } + } else { + lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err) + } + select { + case <-lp.ctx.Done(): + // We're shutting down, notify client and exit + select { + case lp.replayComplete <- ErrReplayRequestAborted: + default: + } + return + case lp.replayComplete <- err: + } +} + +func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) { if lp.backupPollerNextBlock == 0 { lastProcessed, err := lp.orm.SelectLatestBlock(pg.WithParentCtx(ctx)) if err != nil { @@ -594,7 +612,7 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context, backupPollerBloc return } // If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay) - backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay) + backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-lp.backupPollerBlockDelay) // (or at block 0 if whole blockchain is too short) lp.backupPollerNextBlock = mathutil.Max(backupStartBlock, 0) } diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index 0bde65e5556..af2d9a558e1 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -64,7 +64,13 @@ func TestLogPoller_RegisterFilter(t *testing.T) { orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) // Set up a test chain with a log emitting contract deployed. - lp := NewLogPoller(orm, nil, lggr, time.Hour, false, 1, 1, 2, 1000, 0) + lpOpts := Opts{ + PollPeriod: time.Hour, + BackfillBatchSize: 1, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := NewLogPoller(orm, nil, lggr, lpOpts) // We expect a zero Filter if nothing registered yet. f := lp.Filter(nil, nil, nil) @@ -217,9 +223,15 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { ec.On("ConfiguredChainID").Return(chainID, nil) ctx := testutils.Context(t) - - lp := NewLogPoller(orm, ec, lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0) - lp.BackupPollAndSaveLogs(ctx, 100) + lpOpts := Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := NewLogPoller(orm, ec, lggr, lpOpts) + lp.BackupPollAndSaveLogs(ctx) assert.Equal(t, int64(0), lp.backupPollerNextBlock) assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len()) @@ -229,7 +241,7 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(3), lastProcessed.BlockNumber) - lp.BackupPollAndSaveLogs(ctx, 100) + lp.BackupPollAndSaveLogs(ctx) assert.Equal(t, int64(1), lp.backupPollerNextBlock) // Ensure non-negative! } @@ -258,7 +270,15 @@ func TestLogPoller_Replay(t *testing.T) { ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once() ec.On("ConfiguredChainID").Return(chainID, nil) - lp := NewLogPoller(orm, ec, lggr, time.Hour, false, 3, 3, 3, 20, 0) + lpOpts := Opts{ + PollPeriod: time.Hour, + FinalityDepth: 3, + BackfillBatchSize: 3, + RpcBatchSize: 3, + KeepFinalizedBlocksDepth: 20, + BackupPollerBlockDelay: 100, + } + lp := NewLogPoller(orm, ec, lggr, lpOpts) // process 1 log in block 3 lp.PollAndSaveLogs(testutils.Context(t), 4) @@ -440,17 +460,26 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + lpOpts := Opts{ + PollPeriod: time.Hour, + BackfillBatchSize: 3, + RpcBatchSize: 3, + KeepFinalizedBlocksDepth: 20, + } + t.Run("pick latest block from chain and use finality from config with finality disabled", func(t *testing.T) { head := evmtypes.Head{Number: 4} - finalityDepth := int64(3) + + lpOpts.UseFinalityTag = false + lpOpts.FinalityDepth = int64(3) ec := evmclimocks.NewClient(t) ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) - lp := NewLogPoller(orm, ec, lggr, time.Hour, false, finalityDepth, 3, 3, 20, 0) + lp := NewLogPoller(orm, ec, lggr, lpOpts) latestBlock, lastFinalizedBlockNumber, err := lp.latestBlocks(testutils.Context(t)) require.NoError(t, err) require.Equal(t, latestBlock.Number, head.Number) - require.Equal(t, finalityDepth, latestBlock.Number-lastFinalizedBlockNumber) + require.Equal(t, lpOpts.FinalityDepth, latestBlock.Number-lastFinalizedBlockNumber) }) t.Run("finality tags in use", func(t *testing.T) { @@ -470,7 +499,8 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { *(elems[1].Result.(*evmtypes.Head)) = evmtypes.Head{Number: expectedLastFinalizedBlockNumber, Hash: utils.RandomBytes32()} }) - lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0) + lpOpts.UseFinalityTag = true + lp := NewLogPoller(orm, ec, lggr, lpOpts) latestBlock, lastFinalizedBlockNumber, err := lp.latestBlocks(testutils.Context(t)) require.NoError(t, err) @@ -488,7 +518,8 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { elems[1].Error = fmt.Errorf("some error") }) - lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0) + lpOpts.UseFinalityTag = true + lp := NewLogPoller(orm, ec, lggr, lpOpts) _, _, err := lp.latestBlocks(testutils.Context(t)) require.Error(t, err) }) @@ -496,8 +527,8 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { t.Run("BatchCall returns an error", func(t *testing.T) { ec := evmclimocks.NewClient(t) ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(fmt.Errorf("some error")) - - lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0) + lpOpts.UseFinalityTag = true + lp := NewLogPoller(orm, ec, lggr, lpOpts) _, _, err := lp.latestBlocks(testutils.Context(t)) require.Error(t, err) }) @@ -506,7 +537,14 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { lggr := logger.Test(b) - lp := NewLogPoller(nil, nil, lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0) + lpOpts := Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := NewLogPoller(nil, nil, lggr, lpOpts) for i := 0; i < nFilters; i++ { var addresses []common.Address var events []common.Hash diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 5b894b8a19a..21246e24ec0 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -147,7 +147,14 @@ func TestPopulateLoadedDB(t *testing.T) { } func TestLogPoller_Integration(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 100, + } + th := SetupTH(t, lpOpts) th.Client.Commit() // Block 2. Ensure we have finality number of blocks require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{Name: "Integration test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{th.EmitterAddress1}})) @@ -242,7 +249,16 @@ func Test_BackupLogPoller(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000) + th := SetupTH(t, + logpoller.Opts{ + UseFinalityTag: tt.finalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 100, + }, + ) // later, we will need at least 32 blocks filled with logs for cache invalidation for i := int64(0); i < 32; i++ { // to invalidate geth's internal read-cache, a matching log must be found in the bloom Filter @@ -350,7 +366,7 @@ func Test_BackupLogPoller(t *testing.T) { // Run ordinary poller + backup poller at least once currentBlock, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1) - th.LogPoller.BackupPollAndSaveLogs(ctx, 100) + th.LogPoller.BackupPollAndSaveLogs(ctx) currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.Equal(t, int64(37), currentBlock.BlockNumber+1) @@ -366,7 +382,7 @@ func Test_BackupLogPoller(t *testing.T) { // Run ordinary poller + backup poller at least once more th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1) - th.LogPoller.BackupPollAndSaveLogs(ctx, 100) + th.LogPoller.BackupPollAndSaveLogs(ctx) currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.Equal(t, int64(38), currentBlock.BlockNumber+1) @@ -391,9 +407,15 @@ func Test_BackupLogPoller(t *testing.T) { func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) { emittedLogs := 30 // Intentionally use very low backupLogPollerDelay to verify if finality is used properly - backupLogPollerDelay := int64(0) ctx := testutils.Context(t) - th := SetupTH(t, true, 0, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 1, + } + th := SetupTH(t, lpOpts) header, err := th.Client.HeaderByNumber(ctx, nil) require.NoError(t, err) @@ -424,7 +446,7 @@ func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) { // LogPoller should backfill starting from the last finalized block stored in db (genesis block) // till the latest finalized block reported by chain. - th.LogPoller.BackupPollAndSaveLogs(ctx, backupLogPollerDelay) + th.LogPoller.BackupPollAndSaveLogs(ctx) require.NoError(t, err) logs, err := th.LogPoller.Logs( @@ -440,7 +462,7 @@ func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) { // Progressing even more, move blockchain forward by 1 block and mark it as finalized th.Client.Commit() markBlockAsFinalized(t, th, currentBlock) - th.LogPoller.BackupPollAndSaveLogs(ctx, backupLogPollerDelay) + th.LogPoller.BackupPollAndSaveLogs(ctx) // All emitted logs should be backfilled logs, err = th.LogPoller.Logs( @@ -457,7 +479,14 @@ func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) { func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) { emittedLogs := 30 ctx := testutils.Context(t) - th := SetupTH(t, true, 0, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: int64(emittedLogs), + } + th := SetupTH(t, lpOpts) // Emit some logs in blocks for i := 0; i < emittedLogs; i++ { @@ -493,7 +522,7 @@ func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) { require.NoError(t, err) // Should fallback to the backupPollerBlockDelay when finalization was very high in a previous PollAndSave - th.LogPoller.BackupPollAndSaveLogs(ctx, int64(emittedLogs)) + th.LogPoller.BackupPollAndSaveLogs(ctx) require.NoError(t, err) // All emitted logs should be backfilled @@ -512,8 +541,14 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) logsBatch := 10 // Intentionally use very low backupLogPollerDelay to verify if finality is used properly ctx := testutils.Context(t) - th := SetupTH(t, true, 0, 3, 2, 1000) - + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 1, + } + th := SetupTH(t, lpOpts) //header, err := th.Client.HeaderByNumber(ctx, nil) //require.NoError(t, err) @@ -551,7 +586,7 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) require.NoError(t, err) // Should pick logs starting from one block behind the latest finalized block - th.LogPoller.BackupPollAndSaveLogs(ctx, 0) + th.LogPoller.BackupPollAndSaveLogs(ctx) require.NoError(t, err) // Only the 2nd batch + 1 log from a previous batch should be backfilled, because we perform backfill starting @@ -571,7 +606,13 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) func TestLogPoller_BlockTimestamps(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - th := SetupTH(t, false, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) addresses := []common.Address{th.EmitterAddress1, th.EmitterAddress2} events := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID} @@ -676,7 +717,15 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { }, 10e6) _, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec) require.NoError(t, err) - lp := logpoller.NewLogPoller(orm, client.NewSimulatedBackendClient(t, ec, chainID), lggr, 15*time.Second, false, int64(finalityDepth), 3, 2, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: 15 * time.Second, + FinalityDepth: int64(finalityDepth), + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(orm, client.NewSimulatedBackendClient(t, ec, chainID), lggr, lpOpts) for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1. ec.Commit() } @@ -763,7 +812,14 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: tt.finalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) // Set up a log poller listening for log emitter logs. err := th.LogPoller.RegisterFilter(logpoller.Filter{ @@ -1012,7 +1068,14 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: tt.finalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) // Set up a log poller listening for log emitter logs. err := th.LogPoller.RegisterFilter(logpoller.Filter{ @@ -1072,7 +1135,15 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { func TestLogPoller_LoadFilters(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + + lpOpts := logpoller.Opts{ + UseFinalityTag: false, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) filter1 := logpoller.Filter{ Name: "first Filter", @@ -1133,7 +1204,14 @@ func TestLogPoller_LoadFilters(t *testing.T) { func TestLogPoller_GetBlocks_Range(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: false, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) err := th.LogPoller.RegisterFilter(logpoller.Filter{ Name: "GetBlocks Test", @@ -1245,7 +1323,14 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { func TestGetReplayFromBlock(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: false, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) // Commit a few blocks for i := 0; i < 10; i++ { th.Client.Commit() @@ -1306,7 +1391,14 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { ec.Commit() ec.Commit() - lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, lpOpts) err = lp.Replay(ctx, 5) // block number too high require.ErrorContains(t, err, "Invalid replay block number") @@ -1321,7 +1413,7 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { time.Sleep(100 * time.Millisecond) require.NoError(t, lp.Start(ctx)) require.Eventually(t, func() bool { - return observedLogs.Len() >= 5 + return observedLogs.Len() >= 4 }, 2*time.Second, 20*time.Millisecond) lp.Close() @@ -1338,7 +1430,6 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { assert.Contains(t, logMsgs, "SQL ERROR") assert.Contains(t, logMsgs, "Failed loading filters in main logpoller loop, retrying later") assert.Contains(t, logMsgs, "Error executing replay, could not get fromBlock") - assert.Contains(t, logMsgs, "Backup log poller ran before filters loaded, skipping") } type getLogErrData struct { @@ -1354,7 +1445,15 @@ func TestTooManyLogResults(t *testing.T) { chainID := testutils.NewRandomEVMChainID() db := pgtest.NewSqlxDB(t) o := logpoller.NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) - lp := logpoller.NewLogPoller(o, ec, lggr, 1*time.Hour, false, 2, 20, 10, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 20, + RpcBatchSize: 10, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(o, ec, lggr, lpOpts) expected := []int64{10, 5, 2, 1} clientErr := client.JsonError{ @@ -1441,7 +1540,13 @@ func Test_PollAndQueryFinalizedBlocks(t *testing.T) { firstBatchLen := 3 secondBatchLen := 5 - th := SetupTH(t, true, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) eventSig := EmitterABI.Events["Log1"].ID err := th.LogPoller.RegisterFilter(logpoller.Filter{ @@ -1480,7 +1585,7 @@ func Test_PollAndQueryFinalizedBlocks(t *testing.T) { logpoller.Finalized, ) require.NoError(t, err) - require.Len(t, finalizedLogs, firstBatchLen) + require.Len(t, finalizedLogs, firstBatchLen, fmt.Sprintf("len(finalizedLogs) = %d, should have been %d", len(finalizedLogs), firstBatchLen)) numberOfConfirmations := 1 logsByConfs, err := th.LogPoller.LogsDataWordGreaterThan( @@ -1525,7 +1630,14 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: tt.useFinalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) // Should return error before the first poll and save _, err := th.LogPoller.LatestBlock() require.Error(t, err) @@ -1572,7 +1684,15 @@ func Test_CreatedAfterQueriesWithBackfill(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: tt.finalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 100, + } + th := SetupTH(t, lpOpts) header, err := th.Client.HeaderByNumber(ctx, nil) require.NoError(t, err) @@ -1603,7 +1723,7 @@ func Test_CreatedAfterQueriesWithBackfill(t *testing.T) { } // LogPoller should backfill entire history - th.LogPoller.BackupPollAndSaveLogs(ctx, 100) + th.LogPoller.BackupPollAndSaveLogs(ctx) require.NoError(t, err) // Make sure that all logs are backfilled @@ -1663,7 +1783,13 @@ func Test_PruneOldBlocks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, true, 0, 3, 2, tt.keepFinalizedBlocksDepth) + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: tt.keepFinalizedBlocksDepth, + } + th := SetupTH(t, lpOpts) for i := 1; i <= tt.blockToCreate; i++ { err := th.ORM.InsertBlock(utils.RandomBytes32(), int64(i+10), time.Now(), int64(i)) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index a9f39a7ac4a..5bc26ed62b4 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -34,6 +34,13 @@ type block struct { timestamp int64 } +var lpOpts = logpoller.Opts{ + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, +} + func GenLog(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, topic1 []byte, address common.Address) logpoller.Log { return GenLogWithTimestamp(chainID, logIndex, blockNum, blockHash, topic1, address, time.Now()) } @@ -70,7 +77,7 @@ func GenLogWithData(chainID *big.Int, address common.Address, eventSig common.Ha func TestLogPoller_Batching(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) var logs []logpoller.Log // Inserts are limited to 65535 parameters. A log being 10 parameters this results in // a maximum of 6553 log inserts per tx. As inserting more than 6553 would result in @@ -86,7 +93,7 @@ func TestLogPoller_Batching(t *testing.T) { } func TestORM_GetBlocks_From_Range(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM // Insert many blocks and read them back together blocks := []block{ @@ -141,7 +148,7 @@ func TestORM_GetBlocks_From_Range(t *testing.T) { } func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM // Insert many blocks and read them back together var recentBlocks []block @@ -173,7 +180,7 @@ func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { } func TestORM(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM o2 := th.ORM2 // Insert and read back a block. @@ -572,7 +579,7 @@ func insertLogsTopicValueRange(t *testing.T, chainID *big.Int, o *logpoller.DbOR } func TestORM_IndexedLogs(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM eventSig := common.HexToHash("0x1599") addr := common.HexToAddress("0x1234") @@ -633,7 +640,7 @@ func TestORM_IndexedLogs(t *testing.T) { } func TestORM_SelectIndexedLogsByTxHash(t *testing.T) { - th := SetupTH(t, false, 0, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM eventSig := common.HexToHash("0x1599") txHash := common.HexToHash("0x1888") @@ -699,7 +706,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) { } func TestORM_DataWords(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM eventSig := common.HexToHash("0x1599") addr := common.HexToAddress("0x1234") @@ -762,7 +769,7 @@ func TestORM_DataWords(t *testing.T) { } func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM // Insert logs on different topics, should be able to read them @@ -856,7 +863,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { } func TestORM_DeleteBlocksBefore(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM require.NoError(t, o1.InsertBlock(common.HexToHash("0x1234"), 1, time.Now(), 0)) require.NoError(t, o1.InsertBlock(common.HexToHash("0x1235"), 2, time.Now(), 0)) @@ -883,7 +890,7 @@ func TestORM_DeleteBlocksBefore(t *testing.T) { func TestLogPoller_Logs(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) event1 := EmitterABI.Events["Log1"].ID event2 := EmitterABI.Events["Log2"].ID address1 := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb") @@ -931,7 +938,7 @@ func TestLogPoller_Logs(t *testing.T) { } func BenchmarkLogs(b *testing.B) { - th := SetupTH(b, false, 2, 3, 2, 1000) + th := SetupTH(b, lpOpts) o := th.ORM var lgs []logpoller.Log addr := common.HexToAddress("0x1234") @@ -957,7 +964,7 @@ func BenchmarkLogs(b *testing.B) { } func TestSelectLogsWithSigsExcluding(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) orm := th.ORM addressA := common.HexToAddress("0x11111") addressB := common.HexToAddress("0x22222") @@ -1203,7 +1210,7 @@ func TestSelectLogsWithSigsExcluding(t *testing.T) { } func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) event1 := EmitterABI.Events["Log1"].ID event2 := EmitterABI.Events["Log2"].ID address1 := utils.RandomAddress() @@ -1300,7 +1307,7 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { } func TestSelectLogsCreatedAfter(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) event := EmitterABI.Events["Log1"].ID address := utils.RandomAddress() @@ -1404,7 +1411,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { } func TestNestedLogPollerBlocksQuery(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) event := EmitterABI.Events["Log1"].ID address := utils.RandomAddress() @@ -1564,7 +1571,7 @@ func TestInsertLogsInTx(t *testing.T) { func TestSelectLogsDataWordBetween(t *testing.T) { address := utils.RandomAddress() eventSig := utils.RandomBytes32() - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) firstLogData := make([]byte, 0, 64) firstLogData = append(firstLogData, logpoller.EvmWord(1).Bytes()...) diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 8e4a59bc099..da18c592a55 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -50,7 +50,14 @@ import ( func makeTestEvmTxm( t *testing.T, db *sqlx.DB, ethClient evmclient.Client, estimator gas.EvmFeeEstimator, ccfg txmgr.ChainConfig, fcfg txmgr.FeeConfig, txConfig evmconfig.Transactions, dbConfig txmgr.DatabaseConfig, listenerConfig txmgr.ListenerConfig, keyStore keystore.Eth) (txmgr.TxManager, error) { lggr := logger.Test(t) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) // logic for building components (from evm/evm_txm.go) ------- lggr.Infow("Initializing EVM transaction manager", diff --git a/core/chains/legacyevm/chain.go b/core/chains/legacyevm/chain.go index 4e0344281cf..50e6d3914c6 100644 --- a/core/chains/legacyevm/chain.go +++ b/core/chains/legacyevm/chain.go @@ -242,17 +242,17 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod if opts.GenLogPoller != nil { logPoller = opts.GenLogPoller(chainID) } else { - logPoller = logpoller.NewLogPoller( - logpoller.NewObservedORM(chainID, db, l, cfg.Database()), - client, - l, - cfg.EVM().LogPollInterval(), - cfg.EVM().FinalityTagEnabled(), - int64(cfg.EVM().FinalityDepth()), - int64(cfg.EVM().LogBackfillBatchSize()), - int64(cfg.EVM().RPCDefaultBatchSize()), - int64(cfg.EVM().LogKeepBlocksDepth()), - int64(cfg.EVM().LogPrunePageSize())) + lpOpts := logpoller.Opts{ + PollPeriod: cfg.EVM().LogPollInterval(), + UseFinalityTag: cfg.EVM().FinalityTagEnabled(), + FinalityDepth: int64(cfg.EVM().FinalityDepth()), + BackfillBatchSize: int64(cfg.EVM().LogBackfillBatchSize()), + RpcBatchSize: int64(cfg.EVM().RPCDefaultBatchSize()), + KeepFinalizedBlocksDepth: int64(cfg.EVM().LogKeepBlocksDepth()), + LogPrunePageSize: int64(cfg.EVM().LogPrunePageSize()), + BackupPollerBlockDelay: int64(cfg.EVM().BackupLogPollerBlockDelay()), + } + logPoller = logpoller.NewLogPoller(logpoller.NewObservedORM(chainID, db, l, cfg.Database()), client, l, lpOpts) } } diff --git a/core/config/docs/chains-evm.toml b/core/config/docs/chains-evm.toml index f70dcd0ee45..5359fe4b22b 100644 --- a/core/config/docs/chains-evm.toml +++ b/core/config/docs/chains-evm.toml @@ -57,6 +57,10 @@ LogKeepBlocksDepth = 100000 # Default # **ADVANCED** # LogPrunePageSize defines size of the page for pruning logs. Controls how many logs/blocks (at most) are deleted in a single prune tick. Default value 0 means no paging, delete everything at once. LogPrunePageSize = 0 # Default +# **ADVANCED** +# BackupLogPollerBlockDelay works in conjunction with Feature.LogPoller. Controls the block delay of Backup LogPoller, affecting how far behind the latest finalized block it starts and how often it runs. +# BackupLogPollerDelay=0 will disable Backup LogPoller (_not recommended for production environment_). +BackupLogPollerBlockDelay = 100 # Default # MinContractPayment is the minimum payment in LINK required to execute a direct request job. This can be overridden on a per-job basis. MinContractPayment = '10000000000000 juels' # Default # MinIncomingConfirmations is the minimum required confirmations before a log event will be consumed. diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index ed252eadd97..44c19aa257d 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -525,18 +525,19 @@ func TestConfig_Marshal(t *testing.T) { }, }, - LinkContractAddress: mustAddress("0x538aAaB4ea120b2bC2fe5D296852D948F07D849e"), - LogBackfillBatchSize: ptr[uint32](17), - LogPollInterval: &minute, - LogKeepBlocksDepth: ptr[uint32](100000), - LogPrunePageSize: ptr[uint32](0), - MinContractPayment: commonassets.NewLinkFromJuels(math.MaxInt64), - MinIncomingConfirmations: ptr[uint32](13), - NonceAutoSync: ptr(true), - NoNewHeadsThreshold: &minute, - OperatorFactoryAddress: mustAddress("0xa5B85635Be42F21f94F28034B7DA440EeFF0F418"), - RPCDefaultBatchSize: ptr[uint32](17), - RPCBlockQueryDelay: ptr[uint16](10), + LinkContractAddress: mustAddress("0x538aAaB4ea120b2bC2fe5D296852D948F07D849e"), + LogBackfillBatchSize: ptr[uint32](17), + LogPollInterval: &minute, + LogKeepBlocksDepth: ptr[uint32](100000), + LogPrunePageSize: ptr[uint32](0), + BackupLogPollerBlockDelay: ptr[uint64](532), + MinContractPayment: commonassets.NewLinkFromJuels(math.MaxInt64), + MinIncomingConfirmations: ptr[uint32](13), + NonceAutoSync: ptr(true), + NoNewHeadsThreshold: &minute, + OperatorFactoryAddress: mustAddress("0xa5B85635Be42F21f94F28034B7DA440EeFF0F418"), + RPCDefaultBatchSize: ptr[uint32](17), + RPCBlockQueryDelay: ptr[uint16](10), Transactions: evmcfg.Transactions{ MaxInFlight: ptr[uint32](19), @@ -926,6 +927,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 532 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index e90f6c6611a..c1606a5b067 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -254,6 +254,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 532 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index c230a764c74..9f69d4aa909 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -241,6 +241,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -330,6 +331,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -414,6 +416,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 5 MinContractPayment = '0.00001 link' NonceAutoSync = true diff --git a/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go b/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go index 427dd6b32c2..c24ea46231d 100644 --- a/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go +++ b/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go @@ -77,7 +77,15 @@ func Test_ChannelDefinitionCache_Integration(t *testing.T) { require.NoError(t, err) t.Run("with zero fromblock", func(t *testing.T) { - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 1, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller( + logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) servicetest.Run(t, lp) cdc := llo.NewChannelDefinitionCache(lggr, orm, lp, configStoreAddress, 0) @@ -141,8 +149,15 @@ func Test_ChannelDefinitionCache_Integration(t *testing.T) { t.Run("loads from ORM", func(t *testing.T) { // Override logpoller to always return no logs + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } lp := &mockLogPoller{ - LogPoller: logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 1, 3, 2, 1000, 0), + LogPoller: logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts), LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) { return 0, nil }, @@ -176,7 +191,14 @@ func Test_ChannelDefinitionCache_Integration(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM channel_definitions`) t.Run("with non-zero fromBlock", func(t *testing.T) { - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 1, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) servicetest.Run(t, lp) cdc := llo.NewChannelDefinitionCache(lggr, orm, lp, configStoreAddress, channel2Block.Number().Int64()+1) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go index ba89c52c2ef..aa8a5c97d70 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go @@ -661,7 +661,14 @@ func setupDependencies(t *testing.T, db *sqlx.DB, backend *backends.SimulatedBac pollerLggr := logger.TestLogger(t) pollerLggr.SetLogLevel(zapcore.WarnLevel) lorm := logpoller.NewORM(big.NewInt(1337), db, pollerLggr, pgtest.NewQConfig(false)) - lp := logpoller.NewLogPoller(lorm, ethClient, pollerLggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(lorm, ethClient, pollerLggr, lpOpts) return lp, ethClient } diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/promreporter/prom_reporter_test.go index d283cb8f873..66133072ebd 100644 --- a/core/services/promreporter/prom_reporter_test.go +++ b/core/services/promreporter/prom_reporter_test.go @@ -38,7 +38,14 @@ func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainCon ethClient := evmtest.NewEthClientMockWithDefaultChain(t) estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) lggr := logger.TestLogger(t) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) txm, err := txmgr.NewTxm( db, diff --git a/core/services/relay/evm/chain_reader_test.go b/core/services/relay/evm/chain_reader_test.go index 64d9f9f1cac..39cf317204b 100644 --- a/core/services/relay/evm/chain_reader_test.go +++ b/core/services/relay/evm/chain_reader_test.go @@ -136,11 +136,11 @@ func (it *chainReaderInterfaceTester) MaxWaitTimeForEvents() time.Duration { maxWaitTime := time.Second * 20 maxWaitTimeStr, ok := os.LookupEnv("MAX_WAIT_TIME_FOR_EVENTS_S") if ok { - wiatS, err := strconv.ParseInt(maxWaitTimeStr, 10, 64) + waitS, err := strconv.ParseInt(maxWaitTimeStr, 10, 64) if err != nil { fmt.Printf("Error parsing MAX_WAIT_TIME_FOR_EVENTS_S: %v, defaulting to %v\n", err, maxWaitTime) } - maxWaitTime = time.Second * time.Duration(wiatS) + maxWaitTime = time.Second * time.Duration(waitS) } return maxWaitTime @@ -263,7 +263,15 @@ func (it *chainReaderInterfaceTester) GetChainReader(t *testing.T) clcommontypes lggr := logger.NullLogger db := pgtest.NewSqlxDB(t) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), it.chain.Client(), lggr, time.Millisecond, false, 0, 1, 1, 10000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: time.Millisecond, + FinalityDepth: 4, + BackfillBatchSize: 1, + RpcBatchSize: 1, + KeepFinalizedBlocksDepth: 10000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), it.chain.Client(), lggr, lpOpts) require.NoError(t, lp.Start(ctx)) it.chain.On("LogPoller").Return(lp) cr, err := evm.NewChainReaderService(lggr, lp, it.chain, it.chainConfig) diff --git a/core/services/relay/evm/config_poller_test.go b/core/services/relay/evm/config_poller_test.go index 089db6decd5..d2d33944df7 100644 --- a/core/services/relay/evm/config_poller_test.go +++ b/core/services/relay/evm/config_poller_test.go @@ -90,7 +90,15 @@ func TestConfigPoller(t *testing.T) { cfg := pgtest.NewQConfig(false) ethClient = evmclient.NewSimulatedBackendClient(t, b, testutils.SimulatedChainID) lorm := logpoller.NewORM(testutils.SimulatedChainID, db, lggr, cfg) - lp = logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp = logpoller.NewLogPoller(lorm, ethClient, lggr, lpOpts) servicetest.Run(t, lp) } diff --git a/core/services/relay/evm/functions/config_poller_test.go b/core/services/relay/evm/functions/config_poller_test.go index 6a8c682a81b..ab80f3ae565 100644 --- a/core/services/relay/evm/functions/config_poller_test.go +++ b/core/services/relay/evm/functions/config_poller_test.go @@ -81,7 +81,14 @@ func runTest(t *testing.T, pluginType functions.FunctionsPluginType, expectedDig defer ethClient.Close() lggr := logger.TestLogger(t) lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg) - lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(lorm, ethClient, lggr, lpOpts) servicetest.Run(t, lp) configPoller, err := functions.NewFunctionsConfigPoller(pluginType, lp, lggr) require.NoError(t, err) diff --git a/core/services/relay/evm/mercury/helpers_test.go b/core/services/relay/evm/mercury/helpers_test.go index 8283e80916e..4b05b974c3d 100644 --- a/core/services/relay/evm/mercury/helpers_test.go +++ b/core/services/relay/evm/mercury/helpers_test.go @@ -167,7 +167,15 @@ func SetupTH(t *testing.T, feedID common.Hash) TestHarness { ethClient := evmclient.NewSimulatedBackendClient(t, b, big.NewInt(1337)) lggr := logger.TestLogger(t) lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg) - lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(lorm, ethClient, lggr, lpOpts) servicetest.Run(t, lp) configPoller, err := NewConfigPoller(lggr, lp, verifierAddress, feedID) diff --git a/core/services/vrf/v2/bhs_feeder_test.go b/core/services/vrf/v2/bhs_feeder_test.go index a02eea75757..6f857c80834 100644 --- a/core/services/vrf/v2/bhs_feeder_test.go +++ b/core/services/vrf/v2/bhs_feeder_test.go @@ -7,12 +7,14 @@ import ( "github.com/stretchr/testify/require" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrftesthelpers" ) @@ -37,14 +39,14 @@ func TestStartHeartbeats(t *testing.T) { bhsKeyAddresses = append(bhsKeyAddresses, bhsKey.Address.String()) keys = append(keys, bhsKey) keySpecificOverrides = append(keySpecificOverrides, toml.KeySpecific{ - Key: ptr(bhsKey.EIP55Address), + Key: ptr[ethkey.EIP55Address](bhsKey.EIP55Address), GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei}, }) sendEth(t, ownerKey, uni.backend, bhsKey.Address, 10) } keySpecificOverrides = append(keySpecificOverrides, toml.KeySpecific{ // Gas lane. - Key: ptr(vrfKey.EIP55Address), + Key: ptr[ethkey.EIP55Address](vrfKey.EIP55Address), GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei}, }) diff --git a/core/services/vrf/v2/integration_helpers_test.go b/core/services/vrf/v2/integration_helpers_test.go index b0ae4266b12..6649b5972e0 100644 --- a/core/services/vrf/v2/integration_helpers_test.go +++ b/core/services/vrf/v2/integration_helpers_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml" @@ -66,11 +67,11 @@ func testSingleConsumerHappyPath( config, db := heavyweight.FullTestDBV2(t, func(c *chainlink.Config, s *chainlink.Secrets) { simulatedOverrides(t, assets.GWei(10), toml.KeySpecific{ // Gas lane. - Key: ptr(key1.EIP55Address), + Key: ptr[ethkey.EIP55Address](key1.EIP55Address), GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei}, }, toml.KeySpecific{ // Gas lane. - Key: ptr(key2.EIP55Address), + Key: ptr[ethkey.EIP55Address](key2.EIP55Address), GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei}, })(c, s) c.EVM[0].MinIncomingConfirmations = ptr[uint32](2) diff --git a/core/services/vrf/v2/listener_v2_log_listener_test.go b/core/services/vrf/v2/listener_v2_log_listener_test.go index c92795f55a6..cda172abefd 100644 --- a/core/services/vrf/v2/listener_v2_log_listener_test.go +++ b/core/services/vrf/v2/listener_v2_log_listener_test.go @@ -92,7 +92,16 @@ func setupVRFLogPollerListenerTH(t *testing.T, // Poll period doesn't matter, we intend to call poll and save logs directly in the test. // Set it to some insanely high value to not interfere with any tests. - lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: time.Hour, + UseFinalityTag: useFinalityTag, + FinalityDepth: finalityDepth, + BackfillBatchSize: backfillBatchSize, + RpcBatchSize: rpcBatchSize, + KeepFinalizedBlocksDepth: keepFinalizedBlocksDepth, + } + lp := logpoller.NewLogPoller(o, esc, lggr, lpOpts) emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec) require.NoError(t, err) diff --git a/core/web/resolver/chain_test.go b/core/web/resolver/chain_test.go index e2663af561f..a0f2ca22b07 100644 --- a/core/web/resolver/chain_test.go +++ b/core/web/resolver/chain_test.go @@ -46,6 +46,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true @@ -164,6 +165,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index f698f55fb25..cdfb85a6f5c 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -254,6 +254,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index c230a764c74..9f69d4aa909 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -241,6 +241,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -330,6 +331,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -414,6 +416,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 5 MinContractPayment = '0.00001 link' NonceAutoSync = true diff --git a/docs/CONFIG.md b/docs/CONFIG.md index e4e25e6694f..938c3de66b1 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1604,6 +1604,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -1688,6 +1689,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -1771,6 +1773,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -1854,6 +1857,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -1938,6 +1942,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2021,6 +2026,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '30s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.001 link' NonceAutoSync = true @@ -2104,6 +2110,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '30s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.001 link' NonceAutoSync = true @@ -2187,6 +2194,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -2271,6 +2279,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2353,6 +2362,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2435,6 +2445,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2518,6 +2529,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2602,6 +2614,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2685,6 +2698,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2768,6 +2782,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 5 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2851,6 +2866,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2934,6 +2950,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3017,6 +3034,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3100,6 +3118,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3184,6 +3203,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3267,6 +3287,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3349,6 +3370,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3432,6 +3454,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3514,6 +3537,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '30s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3597,6 +3621,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3680,6 +3705,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3762,6 +3788,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '100' NonceAutoSync = true @@ -3844,6 +3871,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '30s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3927,6 +3955,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4010,6 +4039,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4092,6 +4122,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4175,6 +4206,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4258,6 +4290,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4342,6 +4375,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4425,6 +4459,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4508,6 +4543,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4591,6 +4627,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4674,6 +4711,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4756,6 +4794,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4838,6 +4877,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4921,6 +4961,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 5 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5004,6 +5045,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5087,6 +5129,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5171,6 +5214,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5255,6 +5299,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5338,6 +5383,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5421,6 +5467,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5504,6 +5551,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5587,6 +5635,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -5670,6 +5719,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5753,6 +5803,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5935,6 +5986,14 @@ LogPrunePageSize = 0 # Default ``` LogPrunePageSize defines size of the page for pruning logs. Controls how many logs/blocks (at most) are deleted in a single prune tick. Default value 0 means no paging, delete everything at once. +### BackupLogPollerBlockDelay +:warning: **_ADVANCED_**: _Do not change this setting unless you know what you are doing._ +```toml +BackupLogPollerBlockDelay = 100 # Default +``` +BackupLogPollerBlockDelay works in conjunction with Feature.LogPoller. Controls the block delay of Backup LogPoller, affecting how far behind the latest finalized block it starts and how often it runs. +BackupLogPollerDelay=0 will disable Backup LogPoller (_not recommended for production environment_). + ### MinContractPayment ```toml MinContractPayment = '10000000000000 juels' # Default diff --git a/integration-tests/smoke/log_poller_test.go b/integration-tests/smoke/log_poller_test.go index 593b4eb879a..4b4533d3a37 100644 --- a/integration-tests/smoke/log_poller_test.go +++ b/integration-tests/smoke/log_poller_test.go @@ -277,7 +277,8 @@ func prepareEnvironment(l zerolog.Logger, t *testing.T, testConfig *tc.TestConfi ethereum.RegistryVersion_2_1, logpoller.DefaultOCRRegistryConfig, upKeepsNeeded, - time.Duration(500*time.Millisecond), + cfg.General.LogPollInterval.Duration, + *cfg.General.BackupLogPollerBlockDelay, *cfg.General.UseFinalityTag, testConfig, ) diff --git a/integration-tests/testconfig/log_poller/config.go b/integration-tests/testconfig/log_poller/config.go index 96c3b55c276..890c33f26c9 100644 --- a/integration-tests/testconfig/log_poller/config.go +++ b/integration-tests/testconfig/log_poller/config.go @@ -90,11 +90,13 @@ func (l *LoopedConfig) Validate() error { } type General struct { - Generator *string `toml:"generator"` - EventsToEmit []abi.Event `toml:"-"` - Contracts *int `toml:"contracts"` - EventsPerTx *int `toml:"events_per_tx"` - UseFinalityTag *bool `toml:"use_finality_tag"` + Generator *string `toml:"generator"` + EventsToEmit []abi.Event `toml:"-"` + Contracts *int `toml:"contracts"` + EventsPerTx *int `toml:"events_per_tx"` + UseFinalityTag *bool `toml:"use_finality_tag"` + BackupLogPollerBlockDelay *uint64 `toml:"backup_log_poller_block_delay"` + LogPollInterval *blockchain.StrDuration `toml:"log_poll_interval"` } func (g *General) Validate() error { diff --git a/integration-tests/testconfig/log_poller/log_poller.toml b/integration-tests/testconfig/log_poller/log_poller.toml index 2f46ebf11c2..89d2f07b4e3 100644 --- a/integration-tests/testconfig/log_poller/log_poller.toml +++ b/integration-tests/testconfig/log_poller/log_poller.toml @@ -5,6 +5,9 @@ generator = "looped" contracts = 2 events_per_tx = 4 use_finality_tag = true +log_poll_interval = "500ms" +# 0 disables backup poller +backup_log_poller_block_delay = 0 [LogPoller.Looped] execution_count = 100 diff --git a/integration-tests/universal/log_poller/helpers.go b/integration-tests/universal/log_poller/helpers.go index b91156a3784..b6e62cff2f6 100644 --- a/integration-tests/universal/log_poller/helpers.go +++ b/integration-tests/universal/log_poller/helpers.go @@ -1079,6 +1079,7 @@ func SetupLogPollerTestDocker( registryConfig contracts.KeeperRegistrySettings, upkeepsNeeded int, lpPollingInterval time.Duration, + backupPollingInterval uint64, finalityTagEnabled bool, testConfig *tc.TestConfig, ) ( @@ -1121,6 +1122,7 @@ func SetupLogPollerTestDocker( chain.LogPollInterval = commonconfig.MustNewDuration(lpPollingInterval) chain.FinalityDepth = ptr.Ptr[uint32](uint32(finalityDepth)) chain.FinalityTagEnabled = ptr.Ptr[bool](finalityTagEnabled) + chain.BackupLogPollerBlockDelay = ptr.Ptr[uint64](backupPollingInterval) return chain } diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index bb845b05201..873b9e91bc1 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -297,6 +297,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index cdac1b3702c..0c00fbb7adc 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -297,6 +297,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index 832b3fac584..0bbddd6f40f 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -297,6 +297,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 280fa209f0d..011298fcde7 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -287,6 +287,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index bdd83a9eb31..e0bd015a184 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -294,6 +294,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true