From d6f1e2c5f701ea6a2685985e470debbdd817bcf8 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 22 Jan 2024 19:58:39 -0800 Subject: [PATCH 01/11] Refactor NewLogPoller() params into logpoller.Opts struct, add BackupPollerBlockDela --- .../evm/forwarders/forwarder_manager_test.go | 21 +- core/chains/evm/logpoller/helper_test.go | 8 +- core/chains/evm/logpoller/log_poller.go | 54 +++-- .../evm/logpoller/log_poller_internal_test.go | 63 ++++-- core/chains/evm/logpoller/log_poller_test.go | 190 +++++++++++++++--- core/chains/evm/logpoller/orm_test.go | 39 ++-- core/chains/evm/txmgr/txmgr_test.go | 8 +- core/chains/legacyevm/chain.go | 21 +- .../v21/logprovider/integration_test.go | 8 +- .../promreporter/prom_reporter_test.go | 9 +- core/services/relay/evm/chain_reader_test.go | 9 +- core/services/relay/evm/config_poller_test.go | 9 +- .../relay/evm/functions/config_poller_test.go | 8 +- .../relay/evm/mercury/helpers_test.go | 9 +- core/services/vrf/v2/bhs_feeder_test.go | 6 +- .../vrf/v2/integration_helpers_test.go | 5 +- .../vrf/v2/listener_v2_log_listener_test.go | 11 +- 17 files changed, 374 insertions(+), 104 deletions(-) diff --git a/core/chains/evm/forwarders/forwarder_manager_test.go b/core/chains/evm/forwarders/forwarder_manager_test.go index 4480e533525..6a093553800 100644 --- a/core/chains/evm/forwarders/forwarder_manager_test.go +++ b/core/chains/evm/forwarders/forwarder_manager_test.go @@ -60,7 +60,15 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) { t.Log(authorized) evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, lpOpts) fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database()) fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database()) @@ -113,7 +121,18 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) { ec.Commit() evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID) +<<<<<<< HEAD lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0) +======= + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, lpOpts) +>>>>>>> 9b8bd0dea3 (Refactor NewLogPoller() params into logpoller.Opts struct, add BackupPollerBlockDela) fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database()) fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database()) diff --git a/core/chains/evm/logpoller/helper_test.go b/core/chains/evm/logpoller/helper_test.go index 3215a7ec20c..a2a470741f6 100644 --- a/core/chains/evm/logpoller/helper_test.go +++ b/core/chains/evm/logpoller/helper_test.go @@ -46,7 +46,7 @@ type TestHarness struct { EthDB ethdb.Database } -func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth int64) TestHarness { +func SetupTH(t testing.TB, opts logpoller.Opts) TestHarness { lggr := logger.Test(t) chainID := testutils.NewRandomEVMChainID() chainID2 := testutils.NewRandomEVMChainID() @@ -67,7 +67,11 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize // Mark genesis block as finalized to avoid any nulls in the tests head := esc.Backend().Blockchain().CurrentHeader() esc.Backend().Blockchain().SetFinalized(head) - lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth, 0) + + if opts.PollPeriod == 0 { + opts.PollPeriod = 1 * time.Hour + } + lp := logpoller.NewLogPoller(o, esc, lggr, opts) emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec) require.NoError(t, err) emitterAddress2, _, emitter2, err := log_emitter.DeployLogEmitter(owner, ec) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index a19bbfda7f1..0bd8b90732e 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -73,7 +73,7 @@ const ( type LogPollerTest interface { LogPoller PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) - BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64) + BackupPollAndSaveLogs(ctx context.Context) Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) PruneOldBlocks(ctx context.Context) (bool, error) @@ -105,8 +105,9 @@ type logPoller struct { keepFinalizedBlocksDepth int64 // the number of blocks behind the last finalized block we keep in database backfillBatchSize int64 // batch size to use when backfilling finalized logs rpcBatchSize int64 // batch size to use for fallback RPC calls made in GetBlocks - backupPollerNextBlock int64 logPrunePageSize int64 + backupPollerNextBlock int64 // next block to be processed by Backup LogPoller + backupPollerBlockDelay int64 // how far behind regular LogPoller should BackupLogPoller run. 0 = disabled filterMu sync.RWMutex filters map[string]Filter @@ -121,6 +122,17 @@ type logPoller struct { wg sync.WaitGroup } +type Opts struct { + PollPeriod time.Duration + UseFinalityTag bool + FinalityDepth int64 + BackfillBatchSize int64 + RpcBatchSize int64 + KeepFinalizedBlocksDepth int64 + BackupPollerBlockDelay int64 + LogPrunePageSize int64 +} + // NewLogPoller creates a log poller. Note there is an assumption // that blocks can be processed faster than they are produced for the given chain, or the poller will fall behind. // Block processing involves the following calls in steady state (without reorgs): @@ -131,7 +143,7 @@ type logPoller struct { // // How fast that can be done depends largely on network speed and DB, but even for the fastest // support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency -func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64, logsPrunePageSize int64) *logPoller { +func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller { ctx, cancel := context.WithCancel(context.Background()) return &logPoller{ ctx: ctx, @@ -141,13 +153,14 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), replayStart: make(chan int64), replayComplete: make(chan error), - pollPeriod: pollPeriod, - finalityDepth: finalityDepth, - useFinalityTag: useFinalityTag, - backfillBatchSize: backfillBatchSize, - rpcBatchSize: rpcBatchSize, - keepFinalizedBlocksDepth: keepFinalizedBlocksDepth, - logPrunePageSize: logsPrunePageSize, + pollPeriod: opts.PollPeriod, + backupPollerBlockDelay: opts.BackupPollerBlockDelay, + finalityDepth: opts.FinalityDepth, + useFinalityTag: opts.UseFinalityTag, + backfillBatchSize: opts.BackfillBatchSize, + rpcBatchSize: opts.RpcBatchSize, + keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, + logPrunePageSize: opts.LogPrunePageSize, filters: make(map[string]Filter), filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. } @@ -529,22 +542,23 @@ func (lp *logPoller) run() { } lp.PollAndSaveLogs(lp.ctx, start) case <-backupLogPollTick: + if lp.backupPollerBlockDelay == 0 { + continue // backup poller is disabled + } // Backup log poller: this serves as an emergency backup to protect against eventual-consistency behavior // of an rpc node (seen occasionally on optimism, but possibly could happen on other chains?). If the first // time we request a block, no logs or incomplete logs come back, this ensures that every log is eventually - // re-requested after it is finalized. This doesn't add much overhead, because we can request all of them - // in one shot, since we don't need to worry about re-orgs after finality depth, and it runs 100x less - // frequently than the primary log poller. - - // If pollPeriod is set to 1 block time, backup log poller will run once every 100 blocks - const backupPollerBlockDelay = 100 + // re-requested after it is finalized. This doesn't add much overhead, because we can request all of them + // in one shot, since we don't need to worry about re-orgs after finality depth, and it runs far less + // frequently than the primary log poller (instead of roughly once per block it runs once roughly once every + // lp.backupPollerDelay blocks--with default settings about 100x less frequently). - backupLogPollTick = time.After(utils.WithJitter(backupPollerBlockDelay * lp.pollPeriod)) + backupLogPollTick = time.After(utils.WithJitter(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod)) if !filtersLoaded { lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping") continue } - lp.BackupPollAndSaveLogs(lp.ctx, backupPollerBlockDelay) + lp.BackupPollAndSaveLogs(lp.ctx) } } } @@ -582,7 +596,7 @@ func (lp *logPoller) backgroundWorkerRun() { } } -func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context, backupPollerBlockDelay int64) { +func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) { if lp.backupPollerNextBlock == 0 { lastProcessed, err := lp.orm.SelectLatestBlock(pg.WithParentCtx(ctx)) if err != nil { @@ -594,7 +608,7 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context, backupPollerBloc return } // If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay) - backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay) + backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-lp.backupPollerBlockDelay) // (or at block 0 if whole blockchain is too short) lp.backupPollerNextBlock = mathutil.Max(backupStartBlock, 0) } diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index 0bde65e5556..c69408c1748 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -64,7 +64,13 @@ func TestLogPoller_RegisterFilter(t *testing.T) { orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) // Set up a test chain with a log emitting contract deployed. - lp := NewLogPoller(orm, nil, lggr, time.Hour, false, 1, 1, 2, 1000, 0) + lpOpts := Opts{ + PollPeriod: time.Hour, + BackfillBatchSize: 1, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := NewLogPoller(orm, nil, lggr, lpOpts) // We expect a zero Filter if nothing registered yet. f := lp.Filter(nil, nil, nil) @@ -217,9 +223,14 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { ec.On("ConfiguredChainID").Return(chainID, nil) ctx := testutils.Context(t) - - lp := NewLogPoller(orm, ec, lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0) - lp.BackupPollAndSaveLogs(ctx, 100) + lpOpts := Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + } + lp := NewLogPoller(orm, ec, lggr, lpOpts) + lp.BackupPollAndSaveLogs(ctx) assert.Equal(t, int64(0), lp.backupPollerNextBlock) assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len()) @@ -229,7 +240,7 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(3), lastProcessed.BlockNumber) - lp.BackupPollAndSaveLogs(ctx, 100) + lp.BackupPollAndSaveLogs(ctx) assert.Equal(t, int64(1), lp.backupPollerNextBlock) // Ensure non-negative! } @@ -258,7 +269,14 @@ func TestLogPoller_Replay(t *testing.T) { ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once() ec.On("ConfiguredChainID").Return(chainID, nil) - lp := NewLogPoller(orm, ec, lggr, time.Hour, false, 3, 3, 3, 20, 0) + lpOpts := Opts{ + PollPeriod: time.Hour, + FinalityDepth: 3, + BackfillBatchSize: 3, + RpcBatchSize: 3, + KeepFinalizedBlocksDepth: 20, + } + lp := NewLogPoller(orm, ec, lggr, lpOpts) // process 1 log in block 3 lp.PollAndSaveLogs(testutils.Context(t), 4) @@ -440,17 +458,26 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { db := pgtest.NewSqlxDB(t) orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) + lpOpts := Opts{ + PollPeriod: time.Hour, + BackfillBatchSize: 3, + RpcBatchSize: 3, + KeepFinalizedBlocksDepth: 20, + } + t.Run("pick latest block from chain and use finality from config with finality disabled", func(t *testing.T) { head := evmtypes.Head{Number: 4} - finalityDepth := int64(3) + + lpOpts.UseFinalityTag = false + lpOpts.FinalityDepth = int64(3) ec := evmclimocks.NewClient(t) ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(&head, nil) - lp := NewLogPoller(orm, ec, lggr, time.Hour, false, finalityDepth, 3, 3, 20, 0) + lp := NewLogPoller(orm, ec, lggr, lpOpts) latestBlock, lastFinalizedBlockNumber, err := lp.latestBlocks(testutils.Context(t)) require.NoError(t, err) require.Equal(t, latestBlock.Number, head.Number) - require.Equal(t, finalityDepth, latestBlock.Number-lastFinalizedBlockNumber) + require.Equal(t, lpOpts.FinalityDepth, latestBlock.Number-lastFinalizedBlockNumber) }) t.Run("finality tags in use", func(t *testing.T) { @@ -470,7 +497,8 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { *(elems[1].Result.(*evmtypes.Head)) = evmtypes.Head{Number: expectedLastFinalizedBlockNumber, Hash: utils.RandomBytes32()} }) - lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0) + lpOpts.UseFinalityTag = true + lp := NewLogPoller(orm, ec, lggr, lpOpts) latestBlock, lastFinalizedBlockNumber, err := lp.latestBlocks(testutils.Context(t)) require.NoError(t, err) @@ -488,7 +516,8 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { elems[1].Error = fmt.Errorf("some error") }) - lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0) + lpOpts.UseFinalityTag = true + lp := NewLogPoller(orm, ec, lggr, lpOpts) _, _, err := lp.latestBlocks(testutils.Context(t)) require.Error(t, err) }) @@ -496,8 +525,8 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { t.Run("BatchCall returns an error", func(t *testing.T) { ec := evmclimocks.NewClient(t) ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(fmt.Errorf("some error")) - - lp := NewLogPoller(orm, ec, lggr, time.Hour, true, 3, 3, 3, 20, 0) + lpOpts.UseFinalityTag = true + lp := NewLogPoller(orm, ec, lggr, lpOpts) _, _, err := lp.latestBlocks(testutils.Context(t)) require.Error(t, err) }) @@ -506,7 +535,13 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { lggr := logger.Test(b) - lp := NewLogPoller(nil, nil, lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0) + lpOpts := Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + } + lp := NewLogPoller(nil, nil, lggr, lpOpts) for i := 0; i < nFilters; i++ { var addresses []common.Address var events []common.Hash diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 5b894b8a19a..5cee649f0d8 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -147,7 +147,14 @@ func TestPopulateLoadedDB(t *testing.T) { } func TestLogPoller_Integration(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 100, + } + th := SetupTH(t, lpOpts) th.Client.Commit() // Block 2. Ensure we have finality number of blocks require.NoError(t, th.LogPoller.RegisterFilter(logpoller.Filter{Name: "Integration test", EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, Addresses: []common.Address{th.EmitterAddress1}})) @@ -242,7 +249,16 @@ func Test_BackupLogPoller(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000) + th := SetupTH(t, + logpoller.Opts{ + UseFinalityTag: tt.finalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 100, + }, + ) // later, we will need at least 32 blocks filled with logs for cache invalidation for i := int64(0); i < 32; i++ { // to invalidate geth's internal read-cache, a matching log must be found in the bloom Filter @@ -350,7 +366,7 @@ func Test_BackupLogPoller(t *testing.T) { // Run ordinary poller + backup poller at least once currentBlock, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1) - th.LogPoller.BackupPollAndSaveLogs(ctx, 100) + th.LogPoller.BackupPollAndSaveLogs(ctx) currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.Equal(t, int64(37), currentBlock.BlockNumber+1) @@ -366,7 +382,7 @@ func Test_BackupLogPoller(t *testing.T) { // Run ordinary poller + backup poller at least once more th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1) - th.LogPoller.BackupPollAndSaveLogs(ctx, 100) + th.LogPoller.BackupPollAndSaveLogs(ctx) currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.Equal(t, int64(38), currentBlock.BlockNumber+1) @@ -391,9 +407,15 @@ func Test_BackupLogPoller(t *testing.T) { func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) { emittedLogs := 30 // Intentionally use very low backupLogPollerDelay to verify if finality is used properly - backupLogPollerDelay := int64(0) ctx := testutils.Context(t) - th := SetupTH(t, true, 0, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 1, + } + th := SetupTH(t, lpOpts) header, err := th.Client.HeaderByNumber(ctx, nil) require.NoError(t, err) @@ -424,7 +446,7 @@ func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) { // LogPoller should backfill starting from the last finalized block stored in db (genesis block) // till the latest finalized block reported by chain. - th.LogPoller.BackupPollAndSaveLogs(ctx, backupLogPollerDelay) + th.LogPoller.BackupPollAndSaveLogs(ctx) require.NoError(t, err) logs, err := th.LogPoller.Logs( @@ -440,7 +462,7 @@ func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) { // Progressing even more, move blockchain forward by 1 block and mark it as finalized th.Client.Commit() markBlockAsFinalized(t, th, currentBlock) - th.LogPoller.BackupPollAndSaveLogs(ctx, backupLogPollerDelay) + th.LogPoller.BackupPollAndSaveLogs(ctx) // All emitted logs should be backfilled logs, err = th.LogPoller.Logs( @@ -457,7 +479,14 @@ func TestLogPoller_BackupPollAndSaveLogsWithPollerNotWorking(t *testing.T) { func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) { emittedLogs := 30 ctx := testutils.Context(t) - th := SetupTH(t, true, 0, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: int64(emittedLogs), + } + th := SetupTH(t, lpOpts) // Emit some logs in blocks for i := 0; i < emittedLogs; i++ { @@ -493,7 +522,7 @@ func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) { require.NoError(t, err) // Should fallback to the backupPollerBlockDelay when finalization was very high in a previous PollAndSave - th.LogPoller.BackupPollAndSaveLogs(ctx, int64(emittedLogs)) + th.LogPoller.BackupPollAndSaveLogs(ctx) require.NoError(t, err) // All emitted logs should be backfilled @@ -512,8 +541,14 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) logsBatch := 10 // Intentionally use very low backupLogPollerDelay to verify if finality is used properly ctx := testutils.Context(t) - th := SetupTH(t, true, 0, 3, 2, 1000) - + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 1, + } + th := SetupTH(t, lpOpts) //header, err := th.Client.HeaderByNumber(ctx, nil) //require.NoError(t, err) @@ -551,7 +586,7 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) require.NoError(t, err) // Should pick logs starting from one block behind the latest finalized block - th.LogPoller.BackupPollAndSaveLogs(ctx, 0) + th.LogPoller.BackupPollAndSaveLogs(ctx) require.NoError(t, err) // Only the 2nd batch + 1 log from a previous batch should be backfilled, because we perform backfill starting @@ -571,7 +606,13 @@ func TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld(t *testing.T) func TestLogPoller_BlockTimestamps(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - th := SetupTH(t, false, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) addresses := []common.Address{th.EmitterAddress1, th.EmitterAddress2} events := []common.Hash{EmitterABI.Events["Log1"].ID, EmitterABI.Events["Log2"].ID} @@ -676,7 +717,14 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { }, 10e6) _, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec) require.NoError(t, err) - lp := logpoller.NewLogPoller(orm, client.NewSimulatedBackendClient(t, ec, chainID), lggr, 15*time.Second, false, int64(finalityDepth), 3, 2, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: 15 * time.Second, + FinalityDepth: int64(finalityDepth), + BackfillBatchSize: 3, + RpcBatchSize: 2, + } + lp := logpoller.NewLogPoller(orm, client.NewSimulatedBackendClient(t, ec, chainID), lggr, lpOpts) for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1. ec.Commit() } @@ -763,7 +811,15 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: tt.finalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, + } + th := SetupTH(t, lpOpts) // Set up a log poller listening for log emitter logs. err := th.LogPoller.RegisterFilter(logpoller.Filter{ @@ -1012,7 +1068,15 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: tt.finalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, + } + th := SetupTH(t, lpOpts) // Set up a log poller listening for log emitter logs. err := th.LogPoller.RegisterFilter(logpoller.Filter{ @@ -1072,7 +1136,16 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { func TestLogPoller_LoadFilters(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + + lpOpts := logpoller.Opts{ + UseFinalityTag: false, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, + } + th := SetupTH(t, lpOpts) filter1 := logpoller.Filter{ Name: "first Filter", @@ -1133,7 +1206,15 @@ func TestLogPoller_LoadFilters(t *testing.T) { func TestLogPoller_GetBlocks_Range(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: false, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, + } + th := SetupTH(t, lpOpts) err := th.LogPoller.RegisterFilter(logpoller.Filter{ Name: "GetBlocks Test", @@ -1245,7 +1326,15 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { func TestGetReplayFromBlock(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: false, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, + } + th := SetupTH(t, lpOpts) // Commit a few blocks for i := 0; i < 10; i++ { th.Client.Commit() @@ -1306,7 +1395,14 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { ec.Commit() ec.Commit() - lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, 1*time.Hour, false, 2, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + BackupPollerBlockDelay: 0, + } + lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, lpOpts) err = lp.Replay(ctx, 5) // block number too high require.ErrorContains(t, err, "Invalid replay block number") @@ -1354,7 +1450,15 @@ func TestTooManyLogResults(t *testing.T) { chainID := testutils.NewRandomEVMChainID() db := pgtest.NewSqlxDB(t) o := logpoller.NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) - lp := logpoller.NewLogPoller(o, ec, lggr, 1*time.Hour, false, 2, 20, 10, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 20, + RpcBatchSize: 10, + BackupPollerBlockDelay: 0, + } + lp := logpoller.NewLogPoller(o, ec, lggr, lpOpts) expected := []int64{10, 5, 2, 1} clientErr := client.JsonError{ @@ -1441,7 +1545,14 @@ func Test_PollAndQueryFinalizedBlocks(t *testing.T) { firstBatchLen := 3 secondBatchLen := 5 - th := SetupTH(t, true, 2, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, + } + th := SetupTH(t, lpOpts) eventSig := EmitterABI.Events["Log1"].ID err := th.LogPoller.RegisterFilter(logpoller.Filter{ @@ -1480,7 +1591,7 @@ func Test_PollAndQueryFinalizedBlocks(t *testing.T) { logpoller.Finalized, ) require.NoError(t, err) - require.Len(t, finalizedLogs, firstBatchLen) + require.Len(t, finalizedLogs, firstBatchLen, fmt.Sprintf("len(finalizedLogs) = %d, should have been %d", len(finalizedLogs), firstBatchLen)) numberOfConfirmations := 1 logsByConfs, err := th.LogPoller.LogsDataWordGreaterThan( @@ -1525,7 +1636,15 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: tt.useFinalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, + } + th := SetupTH(t, lpOpts) // Should return error before the first poll and save _, err := th.LogPoller.LatestBlock() require.Error(t, err) @@ -1572,7 +1691,15 @@ func Test_CreatedAfterQueriesWithBackfill(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000) + lpOpts := logpoller.Opts{ + UseFinalityTag: tt.finalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 100, + } + th := SetupTH(t, lpOpts) header, err := th.Client.HeaderByNumber(ctx, nil) require.NoError(t, err) @@ -1603,7 +1730,7 @@ func Test_CreatedAfterQueriesWithBackfill(t *testing.T) { } // LogPoller should backfill entire history - th.LogPoller.BackupPollAndSaveLogs(ctx, 100) + th.LogPoller.BackupPollAndSaveLogs(ctx) require.NoError(t, err) // Make sure that all logs are backfilled @@ -1663,7 +1790,14 @@ func Test_PruneOldBlocks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - th := SetupTH(t, true, 0, 3, 2, tt.keepFinalizedBlocksDepth) + lpOpts := logpoller.Opts{ + UseFinalityTag: true, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: tt.keepFinalizedBlocksDepth, + BackupPollerBlockDelay: 0, + } + th := SetupTH(t, lpOpts) for i := 1; i <= tt.blockToCreate; i++ { err := th.ORM.InsertBlock(utils.RandomBytes32(), int64(i+10), time.Now(), int64(i)) diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 655a7295dab..9691bd58290 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -34,6 +34,13 @@ type block struct { timestamp int64 } +var lpOpts = logpoller.Opts{ + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, +} + func GenLog(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, topic1 []byte, address common.Address) logpoller.Log { return GenLogWithTimestamp(chainID, logIndex, blockNum, blockHash, topic1, address, time.Now()) } @@ -70,7 +77,7 @@ func GenLogWithData(chainID *big.Int, address common.Address, eventSig common.Ha func TestLogPoller_Batching(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) var logs []logpoller.Log // Inserts are limited to 65535 parameters. A log being 10 parameters this results in // a maximum of 6553 log inserts per tx. As inserting more than 6553 would result in @@ -86,7 +93,7 @@ func TestLogPoller_Batching(t *testing.T) { } func TestORM_GetBlocks_From_Range(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM // Insert many blocks and read them back together blocks := []block{ @@ -141,7 +148,7 @@ func TestORM_GetBlocks_From_Range(t *testing.T) { } func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM // Insert many blocks and read them back together var recentBlocks []block @@ -173,7 +180,7 @@ func TestORM_GetBlocks_From_Range_Recent_Blocks(t *testing.T) { } func TestORM(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM o2 := th.ORM2 // Insert and read back a block. @@ -564,7 +571,7 @@ func insertLogsTopicValueRange(t *testing.T, chainID *big.Int, o *logpoller.DbOR } func TestORM_IndexedLogs(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM eventSig := common.HexToHash("0x1599") addr := common.HexToAddress("0x1234") @@ -625,7 +632,7 @@ func TestORM_IndexedLogs(t *testing.T) { } func TestORM_SelectIndexedLogsByTxHash(t *testing.T) { - th := SetupTH(t, false, 0, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM eventSig := common.HexToHash("0x1599") txHash := common.HexToHash("0x1888") @@ -691,7 +698,7 @@ func TestORM_SelectIndexedLogsByTxHash(t *testing.T) { } func TestORM_DataWords(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM eventSig := common.HexToHash("0x1599") addr := common.HexToAddress("0x1234") @@ -754,7 +761,7 @@ func TestORM_DataWords(t *testing.T) { } func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM // Insert logs on different topics, should be able to read them @@ -848,7 +855,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { } func TestORM_DeleteBlocksBefore(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) o1 := th.ORM require.NoError(t, o1.InsertBlock(common.HexToHash("0x1234"), 1, time.Now(), 0)) require.NoError(t, o1.InsertBlock(common.HexToHash("0x1235"), 2, time.Now(), 0)) @@ -875,7 +882,7 @@ func TestORM_DeleteBlocksBefore(t *testing.T) { func TestLogPoller_Logs(t *testing.T) { t.Parallel() - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) event1 := EmitterABI.Events["Log1"].ID event2 := EmitterABI.Events["Log2"].ID address1 := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb") @@ -923,7 +930,7 @@ func TestLogPoller_Logs(t *testing.T) { } func BenchmarkLogs(b *testing.B) { - th := SetupTH(b, false, 2, 3, 2, 1000) + th := SetupTH(b, lpOpts) o := th.ORM var lgs []logpoller.Log addr := common.HexToAddress("0x1234") @@ -949,7 +956,7 @@ func BenchmarkLogs(b *testing.B) { } func TestSelectLogsWithSigsExcluding(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) orm := th.ORM addressA := common.HexToAddress("0x11111") addressB := common.HexToAddress("0x22222") @@ -1195,7 +1202,7 @@ func TestSelectLogsWithSigsExcluding(t *testing.T) { } func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) event1 := EmitterABI.Events["Log1"].ID event2 := EmitterABI.Events["Log2"].ID address1 := utils.RandomAddress() @@ -1292,7 +1299,7 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { } func TestSelectLogsCreatedAfter(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) event := EmitterABI.Events["Log1"].ID address := utils.RandomAddress() @@ -1396,7 +1403,7 @@ func TestSelectLogsCreatedAfter(t *testing.T) { } func TestNestedLogPollerBlocksQuery(t *testing.T) { - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) event := EmitterABI.Events["Log1"].ID address := utils.RandomAddress() @@ -1556,7 +1563,7 @@ func TestInsertLogsInTx(t *testing.T) { func TestSelectLogsDataWordBetween(t *testing.T) { address := utils.RandomAddress() eventSig := utils.RandomBytes32() - th := SetupTH(t, false, 2, 3, 2, 1000) + th := SetupTH(t, lpOpts) firstLogData := make([]byte, 0, 64) firstLogData = append(firstLogData, logpoller.EvmWord(1).Bytes()...) diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 8e4a59bc099..f807da7ce09 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -50,7 +50,13 @@ import ( func makeTestEvmTxm( t *testing.T, db *sqlx.DB, ethClient evmclient.Client, estimator gas.EvmFeeEstimator, ccfg txmgr.ChainConfig, fcfg txmgr.FeeConfig, txConfig evmconfig.Transactions, dbConfig txmgr.DatabaseConfig, listenerConfig txmgr.ListenerConfig, keyStore keystore.Eth) (txmgr.TxManager, error) { lggr := logger.Test(t) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) // logic for building components (from evm/evm_txm.go) ------- lggr.Infow("Initializing EVM transaction manager", diff --git a/core/chains/legacyevm/chain.go b/core/chains/legacyevm/chain.go index 4e0344281cf..532ea0ac542 100644 --- a/core/chains/legacyevm/chain.go +++ b/core/chains/legacyevm/chain.go @@ -242,17 +242,16 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod if opts.GenLogPoller != nil { logPoller = opts.GenLogPoller(chainID) } else { - logPoller = logpoller.NewLogPoller( - logpoller.NewObservedORM(chainID, db, l, cfg.Database()), - client, - l, - cfg.EVM().LogPollInterval(), - cfg.EVM().FinalityTagEnabled(), - int64(cfg.EVM().FinalityDepth()), - int64(cfg.EVM().LogBackfillBatchSize()), - int64(cfg.EVM().RPCDefaultBatchSize()), - int64(cfg.EVM().LogKeepBlocksDepth()), - int64(cfg.EVM().LogPrunePageSize())) + lpOpts := logpoller.Opts{ + PollPeriod: cfg.EVM().LogPollInterval(), + UseFinalityTag: cfg.EVM().FinalityTagEnabled(), + FinalityDepth: int64(cfg.EVM().FinalityDepth()), + BackfillBatchSize: int64(cfg.EVM().LogBackfillBatchSize()), + RpcBatchSize: int64(cfg.EVM().RPCDefaultBatchSize()), + KeepFinalizedBlocksDepth: int64(cfg.EVM().LogKeepBlocksDepth()), + LogPrunePageSize: int64(cfg.EVM().LogPrunePageSize()), + } + logPoller = logpoller.NewLogPoller(logpoller.NewObservedORM(chainID, db, l, cfg.Database()), client, l, lpOpts) } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go index ba89c52c2ef..f9459617af4 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go @@ -661,7 +661,13 @@ func setupDependencies(t *testing.T, db *sqlx.DB, backend *backends.SimulatedBac pollerLggr := logger.TestLogger(t) pollerLggr.SetLogLevel(zapcore.WarnLevel) lorm := logpoller.NewORM(big.NewInt(1337), db, pollerLggr, pgtest.NewQConfig(false)) - lp := logpoller.NewLogPoller(lorm, ethClient, pollerLggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + } + lp := logpoller.NewLogPoller(lorm, ethClient, pollerLggr, lpOpts) return lp, ethClient } diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/promreporter/prom_reporter_test.go index d283cb8f873..66133072ebd 100644 --- a/core/services/promreporter/prom_reporter_test.go +++ b/core/services/promreporter/prom_reporter_test.go @@ -38,7 +38,14 @@ func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainCon ethClient := evmtest.NewEthClientMockWithDefaultChain(t) estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) lggr := logger.TestLogger(t) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) txm, err := txmgr.NewTxm( db, diff --git a/core/services/relay/evm/chain_reader_test.go b/core/services/relay/evm/chain_reader_test.go index 64d9f9f1cac..37cdfbd2c1d 100644 --- a/core/services/relay/evm/chain_reader_test.go +++ b/core/services/relay/evm/chain_reader_test.go @@ -263,7 +263,14 @@ func (it *chainReaderInterfaceTester) GetChainReader(t *testing.T) clcommontypes lggr := logger.NullLogger db := pgtest.NewSqlxDB(t) - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), it.chain.Client(), lggr, time.Millisecond, false, 0, 1, 1, 10000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: time.Millisecond, + BackfillBatchSize: 1, + RpcBatchSize: 1, + KeepFinalizedBlocksDepth: 10000, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), it.chain.Client(), lggr, lpOpts) require.NoError(t, lp.Start(ctx)) it.chain.On("LogPoller").Return(lp) cr, err := evm.NewChainReaderService(lggr, lp, it.chain, it.chainConfig) diff --git a/core/services/relay/evm/config_poller_test.go b/core/services/relay/evm/config_poller_test.go index 089db6decd5..eb154992c70 100644 --- a/core/services/relay/evm/config_poller_test.go +++ b/core/services/relay/evm/config_poller_test.go @@ -90,7 +90,14 @@ func TestConfigPoller(t *testing.T) { cfg := pgtest.NewQConfig(false) ethClient = evmclient.NewSimulatedBackendClient(t, b, testutils.SimulatedChainID) lorm := logpoller.NewORM(testutils.SimulatedChainID, db, lggr, cfg) - lp = logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + } + lp = logpoller.NewLogPoller(lorm, ethClient, lggr, lpOpts) servicetest.Run(t, lp) } diff --git a/core/services/relay/evm/functions/config_poller_test.go b/core/services/relay/evm/functions/config_poller_test.go index 6a8c682a81b..d1a38492cc8 100644 --- a/core/services/relay/evm/functions/config_poller_test.go +++ b/core/services/relay/evm/functions/config_poller_test.go @@ -81,7 +81,13 @@ func runTest(t *testing.T, pluginType functions.FunctionsPluginType, expectedDig defer ethClient.Close() lggr := logger.TestLogger(t) lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg) - lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + } + lp := logpoller.NewLogPoller(lorm, ethClient, lggr, lpOpts) servicetest.Run(t, lp) configPoller, err := functions.NewFunctionsConfigPoller(pluginType, lp, lggr) require.NoError(t, err) diff --git a/core/services/relay/evm/mercury/helpers_test.go b/core/services/relay/evm/mercury/helpers_test.go index 8283e80916e..18ab0c48801 100644 --- a/core/services/relay/evm/mercury/helpers_test.go +++ b/core/services/relay/evm/mercury/helpers_test.go @@ -167,7 +167,14 @@ func SetupTH(t *testing.T, feedID common.Hash) TestHarness { ethClient := evmclient.NewSimulatedBackendClient(t, b, big.NewInt(1337)) lggr := logger.TestLogger(t) lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg) - lp := logpoller.NewLogPoller(lorm, ethClient, lggr, 100*time.Millisecond, false, 1, 2, 2, 1000, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + } + lp := logpoller.NewLogPoller(lorm, ethClient, lggr, lpOpts) servicetest.Run(t, lp) configPoller, err := NewConfigPoller(lggr, lp, verifierAddress, feedID) diff --git a/core/services/vrf/v2/bhs_feeder_test.go b/core/services/vrf/v2/bhs_feeder_test.go index a02eea75757..6f857c80834 100644 --- a/core/services/vrf/v2/bhs_feeder_test.go +++ b/core/services/vrf/v2/bhs_feeder_test.go @@ -7,12 +7,14 @@ import ( "github.com/stretchr/testify/require" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrftesthelpers" ) @@ -37,14 +39,14 @@ func TestStartHeartbeats(t *testing.T) { bhsKeyAddresses = append(bhsKeyAddresses, bhsKey.Address.String()) keys = append(keys, bhsKey) keySpecificOverrides = append(keySpecificOverrides, toml.KeySpecific{ - Key: ptr(bhsKey.EIP55Address), + Key: ptr[ethkey.EIP55Address](bhsKey.EIP55Address), GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei}, }) sendEth(t, ownerKey, uni.backend, bhsKey.Address, 10) } keySpecificOverrides = append(keySpecificOverrides, toml.KeySpecific{ // Gas lane. - Key: ptr(vrfKey.EIP55Address), + Key: ptr[ethkey.EIP55Address](vrfKey.EIP55Address), GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei}, }) diff --git a/core/services/vrf/v2/integration_helpers_test.go b/core/services/vrf/v2/integration_helpers_test.go index b0ae4266b12..6649b5972e0 100644 --- a/core/services/vrf/v2/integration_helpers_test.go +++ b/core/services/vrf/v2/integration_helpers_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml" @@ -66,11 +67,11 @@ func testSingleConsumerHappyPath( config, db := heavyweight.FullTestDBV2(t, func(c *chainlink.Config, s *chainlink.Secrets) { simulatedOverrides(t, assets.GWei(10), toml.KeySpecific{ // Gas lane. - Key: ptr(key1.EIP55Address), + Key: ptr[ethkey.EIP55Address](key1.EIP55Address), GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei}, }, toml.KeySpecific{ // Gas lane. - Key: ptr(key2.EIP55Address), + Key: ptr[ethkey.EIP55Address](key2.EIP55Address), GasEstimator: toml.KeySpecificGasEstimator{PriceMax: gasLanePriceWei}, })(c, s) c.EVM[0].MinIncomingConfirmations = ptr[uint32](2) diff --git a/core/services/vrf/v2/listener_v2_log_listener_test.go b/core/services/vrf/v2/listener_v2_log_listener_test.go index c92795f55a6..cda172abefd 100644 --- a/core/services/vrf/v2/listener_v2_log_listener_test.go +++ b/core/services/vrf/v2/listener_v2_log_listener_test.go @@ -92,7 +92,16 @@ func setupVRFLogPollerListenerTH(t *testing.T, // Poll period doesn't matter, we intend to call poll and save logs directly in the test. // Set it to some insanely high value to not interfere with any tests. - lp := logpoller.NewLogPoller(o, esc, lggr, 1*time.Hour, useFinalityTag, finalityDepth, backfillBatchSize, rpcBatchSize, keepFinalizedBlocksDepth, 0) + + lpOpts := logpoller.Opts{ + PollPeriod: time.Hour, + UseFinalityTag: useFinalityTag, + FinalityDepth: finalityDepth, + BackfillBatchSize: backfillBatchSize, + RpcBatchSize: rpcBatchSize, + KeepFinalizedBlocksDepth: keepFinalizedBlocksDepth, + } + lp := logpoller.NewLogPoller(o, esc, lggr, lpOpts) emitterAddress1, _, emitter1, err := log_emitter.DeployLogEmitter(owner, ec) require.NoError(t, err) From 33851afc2dc9e82eba3a7b6983f43ba4970a3eb2 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 23 Jan 2024 17:37:51 -0800 Subject: [PATCH 02/11] Fix chain reader test & logpoller test This failing chainreader test had FinalityDepth set to 0, so LogPoller was starting after the block where the logs were emitted. It was only passing due to Backup LogPoller. With Backup LogPoller disabled, we have to increase the finality depth so that LogPoller will start a few blocks back (at the first unfinalized block). The failing logpoller test was looking for an extra log entry emitted by BackupLogPoller. Now we can run it with BackupLogPoller disabled (leading to a more robust test) and stop looking for that --- core/chains/evm/logpoller/log_poller_test.go | 12 +++++------- core/services/relay/evm/chain_reader_test.go | 5 +++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 5cee649f0d8..c2f42f03b5c 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1396,11 +1396,10 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { ec.Commit() lpOpts := logpoller.Opts{ - PollPeriod: time.Hour, - FinalityDepth: 2, - BackfillBatchSize: 3, - RpcBatchSize: 2, - BackupPollerBlockDelay: 0, + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, } lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, lpOpts) @@ -1417,7 +1416,7 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { time.Sleep(100 * time.Millisecond) require.NoError(t, lp.Start(ctx)) require.Eventually(t, func() bool { - return observedLogs.Len() >= 5 + return observedLogs.Len() >= 4 }, 2*time.Second, 20*time.Millisecond) lp.Close() @@ -1434,7 +1433,6 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { assert.Contains(t, logMsgs, "SQL ERROR") assert.Contains(t, logMsgs, "Failed loading filters in main logpoller loop, retrying later") assert.Contains(t, logMsgs, "Error executing replay, could not get fromBlock") - assert.Contains(t, logMsgs, "Backup log poller ran before filters loaded, skipping") } type getLogErrData struct { diff --git a/core/services/relay/evm/chain_reader_test.go b/core/services/relay/evm/chain_reader_test.go index 37cdfbd2c1d..39cf317204b 100644 --- a/core/services/relay/evm/chain_reader_test.go +++ b/core/services/relay/evm/chain_reader_test.go @@ -136,11 +136,11 @@ func (it *chainReaderInterfaceTester) MaxWaitTimeForEvents() time.Duration { maxWaitTime := time.Second * 20 maxWaitTimeStr, ok := os.LookupEnv("MAX_WAIT_TIME_FOR_EVENTS_S") if ok { - wiatS, err := strconv.ParseInt(maxWaitTimeStr, 10, 64) + waitS, err := strconv.ParseInt(maxWaitTimeStr, 10, 64) if err != nil { fmt.Printf("Error parsing MAX_WAIT_TIME_FOR_EVENTS_S: %v, defaulting to %v\n", err, maxWaitTime) } - maxWaitTime = time.Second * time.Duration(wiatS) + maxWaitTime = time.Second * time.Duration(waitS) } return maxWaitTime @@ -266,6 +266,7 @@ func (it *chainReaderInterfaceTester) GetChainReader(t *testing.T) clcommontypes lpOpts := logpoller.Opts{ PollPeriod: time.Millisecond, + FinalityDepth: 4, BackfillBatchSize: 1, RpcBatchSize: 1, KeepFinalizedBlocksDepth: 10000, From 1eb0e2dbb7db7eb8137b65ed9d609bb78da4abe2 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 23 Jan 2024 18:53:15 -0800 Subject: [PATCH 03/11] Fix LogPoller_Replay test --- core/chains/evm/logpoller/log_poller_internal_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index c69408c1748..cdec0c84041 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -275,6 +275,7 @@ func TestLogPoller_Replay(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 3, KeepFinalizedBlocksDepth: 20, + BackupPollerBlockDelay: 100, } lp := NewLogPoller(orm, ec, lggr, lpOpts) From eba8e1843b3aa237d8b8e347a4a0fce94baadbba Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 23 Jan 2024 19:21:16 -0800 Subject: [PATCH 04/11] Address SonarQube CodeSmell: reduce cognitive complexity of lp.run() --- core/chains/evm/logpoller/log_poller.go | 90 +++++++++++++------------ 1 file changed, 47 insertions(+), 43 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 0bd8b90732e..d7ad663c80a 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -449,6 +449,20 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i return mathutil.Min(requested, lastProcessed.BlockNumber), nil } +func (lp *logPoller) loadFilters() error { + lp.filterMu.Lock() + defer lp.filterMu.Unlock() + filters, err := lp.orm.LoadFilters(pg.WithParentCtx(lp.ctx)) + + if err != nil { + return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") + } + + lp.filters = filters + lp.filterDirty = true + return nil +} + func (lp *logPoller) run() { defer lp.wg.Done() logPollTick := time.After(0) @@ -456,60 +470,20 @@ func (lp *logPoller) run() { backupLogPollTick := time.After(100 * time.Millisecond) filtersLoaded := false - loadFilters := func() error { - lp.filterMu.Lock() - defer lp.filterMu.Unlock() - filters, err := lp.orm.LoadFilters(pg.WithParentCtx(lp.ctx)) - - if err != nil { - return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") - } - - lp.filters = filters - lp.filterDirty = true - filtersLoaded = true - return nil - } - for { select { case <-lp.ctx.Done(): return case fromBlockReq := <-lp.replayStart: - fromBlock, err := lp.GetReplayFromBlock(lp.ctx, fromBlockReq) - if err == nil { - if !filtersLoaded { - lp.lggr.Warnw("Received replayReq before filters loaded", "fromBlock", fromBlock, "requested", fromBlockReq) - if err = loadFilters(); err != nil { - lp.lggr.Errorw("Failed loading filters during Replay", "err", err, "fromBlock", fromBlock) - } - } - if err == nil { - // Serially process replay requests. - lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq) - lp.PollAndSaveLogs(lp.ctx, fromBlock) - lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq) - } - } else { - lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err) - } - select { - case <-lp.ctx.Done(): - // We're shutting down, notify client and exit - select { - case lp.replayComplete <- ErrReplayRequestAborted: - default: - } - return - case lp.replayComplete <- err: - } + lp.handleReplayRequest(fromBlockReq, filtersLoaded) case <-logPollTick: logPollTick = time.After(utils.WithJitter(lp.pollPeriod)) if !filtersLoaded { - if err := loadFilters(); err != nil { + if err := lp.loadFilters(); err != nil { lp.lggr.Errorw("Failed loading filters in main logpoller loop, retrying later", "err", err) continue } + filtersLoaded = true } // Always start from the latest block in the db. @@ -596,6 +570,36 @@ func (lp *logPoller) backgroundWorkerRun() { } } +func (lp *logPoller) handleReplayRequest(fromBlockReq int64, filtersLoaded bool) { + fromBlock, err := lp.GetReplayFromBlock(lp.ctx, fromBlockReq) + if err == nil { + if !filtersLoaded { + lp.lggr.Warnw("Received replayReq before filters loaded", "fromBlock", fromBlock, "requested", fromBlockReq) + if err = lp.loadFilters(); err != nil { + lp.lggr.Errorw("Failed loading filters during Replay", "err", err, "fromBlock", fromBlock) + } + } + if err == nil { + // Serially process replay requests. + lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq) + lp.PollAndSaveLogs(lp.ctx, fromBlock) + lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq) + } + } else { + lp.lggr.Errorw("Error executing replay, could not get fromBlock", "err", err) + } + select { + case <-lp.ctx.Done(): + // We're shutting down, notify client and exit + select { + case lp.replayComplete <- ErrReplayRequestAborted: + default: + } + return + case lp.replayComplete <- err: + } +} + func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) { if lp.backupPollerNextBlock == 0 { lastProcessed, err := lp.orm.SelectLatestBlock(pg.WithParentCtx(ctx)) From f55f9fe1e864156c73eb5780e996b93522211851 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 21 Feb 2024 13:23:23 -0800 Subject: [PATCH 05/11] Add BackupLogPollerBlockDelay to toml config --- core/chains/evm/config/chain_scoped.go | 4 ++ core/chains/evm/config/config.go | 1 + core/chains/evm/config/toml/config.go | 39 ++++++------ core/chains/evm/config/toml/defaults.go | 3 + .../evm/config/toml/defaults/fallback.toml | 1 + core/chains/legacyevm/chain.go | 1 + core/config/docs/chains-evm.toml | 3 + core/services/chainlink/config_test.go | 26 ++++---- .../chainlink/testdata/config-full.toml | 1 + .../config-multi-chain-effective.toml | 3 + core/web/resolver/chain_test.go | 2 + docs/CONFIG.md | 59 +++++++++++++++++++ .../disk-based-logging-disabled.txtar | 1 + .../validate/disk-based-logging-no-dir.txtar | 1 + .../node/validate/disk-based-logging.txtar | 1 + testdata/scripts/node/validate/invalid.txtar | 1 + testdata/scripts/node/validate/valid.txtar | 1 + 17 files changed, 117 insertions(+), 31 deletions(-) diff --git a/core/chains/evm/config/chain_scoped.go b/core/chains/evm/config/chain_scoped.go index 69cc4c0a6ad..9247e77ba9d 100644 --- a/core/chains/evm/config/chain_scoped.go +++ b/core/chains/evm/config/chain_scoped.go @@ -134,6 +134,10 @@ func (e *evmConfig) LogKeepBlocksDepth() uint32 { return *e.c.LogKeepBlocksDepth } +func (e *evmConfig) BackupLogPollerBlockDelay() uint64 { + return *e.c.BackupLogPollerBlockDelay +} + func (e *evmConfig) NonceAutoSync() bool { return *e.c.NonceAutoSync } diff --git a/core/chains/evm/config/config.go b/core/chains/evm/config/config.go index cb1a6073e0b..c878fea46e9 100644 --- a/core/chains/evm/config/config.go +++ b/core/chains/evm/config/config.go @@ -36,6 +36,7 @@ type EVM interface { LinkContractAddress() string LogBackfillBatchSize() uint32 LogKeepBlocksDepth() uint32 + BackupLogPollerBlockDelay() uint64 LogPollInterval() time.Duration LogPrunePageSize() uint32 MinContractPayment() *commonassets.Link diff --git a/core/chains/evm/config/toml/config.go b/core/chains/evm/config/toml/config.go index 95b6c342161..1ab4aa20d01 100644 --- a/core/chains/evm/config/toml/config.go +++ b/core/chains/evm/config/toml/config.go @@ -341,25 +341,26 @@ func (c *EVMConfig) TOMLString() (string, error) { } type Chain struct { - AutoCreateKey *bool - BlockBackfillDepth *uint32 - BlockBackfillSkip *bool - ChainType *string - FinalityDepth *uint32 - FinalityTagEnabled *bool - FlagsContractAddress *ethkey.EIP55Address - LinkContractAddress *ethkey.EIP55Address - LogBackfillBatchSize *uint32 - LogPollInterval *commonconfig.Duration - LogKeepBlocksDepth *uint32 - LogPrunePageSize *uint32 - MinIncomingConfirmations *uint32 - MinContractPayment *commonassets.Link - NonceAutoSync *bool - NoNewHeadsThreshold *commonconfig.Duration - OperatorFactoryAddress *ethkey.EIP55Address - RPCDefaultBatchSize *uint32 - RPCBlockQueryDelay *uint16 + AutoCreateKey *bool + BlockBackfillDepth *uint32 + BlockBackfillSkip *bool + ChainType *string + FinalityDepth *uint32 + FinalityTagEnabled *bool + FlagsContractAddress *ethkey.EIP55Address + LinkContractAddress *ethkey.EIP55Address + LogBackfillBatchSize *uint32 + LogPollInterval *commonconfig.Duration + LogKeepBlocksDepth *uint32 + LogPrunePageSize *uint32 + BackupLogPollerBlockDelay *uint64 + MinIncomingConfirmations *uint32 + MinContractPayment *commonassets.Link + NonceAutoSync *bool + NoNewHeadsThreshold *commonconfig.Duration + OperatorFactoryAddress *ethkey.EIP55Address + RPCDefaultBatchSize *uint32 + RPCBlockQueryDelay *uint16 Transactions Transactions `toml:",omitempty"` BalanceMonitor BalanceMonitor `toml:",omitempty"` diff --git a/core/chains/evm/config/toml/defaults.go b/core/chains/evm/config/toml/defaults.go index 242373fd4af..951246eeb22 100644 --- a/core/chains/evm/config/toml/defaults.go +++ b/core/chains/evm/config/toml/defaults.go @@ -140,6 +140,9 @@ func (c *Chain) SetFrom(f *Chain) { if v := f.LogPrunePageSize; v != nil { c.LogPrunePageSize = v } + if v := f.BackupLogPollerBlockDelay; v != nil { + c.BackupLogPollerBlockDelay = v + } if v := f.MinIncomingConfirmations; v != nil { c.MinIncomingConfirmations = v } diff --git a/core/chains/evm/config/toml/defaults/fallback.toml b/core/chains/evm/config/toml/defaults/fallback.toml index 8587a5b4b20..1a1d9b69439 100644 --- a/core/chains/evm/config/toml/defaults/fallback.toml +++ b/core/chains/evm/config/toml/defaults/fallback.toml @@ -7,6 +7,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinContractPayment = '.00001 link' MinIncomingConfirmations = 3 NonceAutoSync = true diff --git a/core/chains/legacyevm/chain.go b/core/chains/legacyevm/chain.go index 532ea0ac542..50e6d3914c6 100644 --- a/core/chains/legacyevm/chain.go +++ b/core/chains/legacyevm/chain.go @@ -250,6 +250,7 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod RpcBatchSize: int64(cfg.EVM().RPCDefaultBatchSize()), KeepFinalizedBlocksDepth: int64(cfg.EVM().LogKeepBlocksDepth()), LogPrunePageSize: int64(cfg.EVM().LogPrunePageSize()), + BackupPollerBlockDelay: int64(cfg.EVM().BackupLogPollerBlockDelay()), } logPoller = logpoller.NewLogPoller(logpoller.NewObservedORM(chainID, db, l, cfg.Database()), client, l, lpOpts) } diff --git a/core/config/docs/chains-evm.toml b/core/config/docs/chains-evm.toml index f70dcd0ee45..10c39d10cdf 100644 --- a/core/config/docs/chains-evm.toml +++ b/core/config/docs/chains-evm.toml @@ -57,6 +57,9 @@ LogKeepBlocksDepth = 100000 # Default # **ADVANCED** # LogPrunePageSize defines size of the page for pruning logs. Controls how many logs/blocks (at most) are deleted in a single prune tick. Default value 0 means no paging, delete everything at once. LogPrunePageSize = 0 # Default +# BackupLogPollerBlockDelay works in conjunction with Feature.LogPoller. Controls the block delay of Backup LogPoller, affecting how far behind the latest finalized block it starts and how often it runs. +# BackupLogPollerDelay=0 will disable Backup LogPoller (_not recommended for production environment_). +BackupLogPollerBlockDelay = 100 #Default # MinContractPayment is the minimum payment in LINK required to execute a direct request job. This can be overridden on a per-job basis. MinContractPayment = '10000000000000 juels' # Default # MinIncomingConfirmations is the minimum required confirmations before a log event will be consumed. diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index ed252eadd97..44c19aa257d 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -525,18 +525,19 @@ func TestConfig_Marshal(t *testing.T) { }, }, - LinkContractAddress: mustAddress("0x538aAaB4ea120b2bC2fe5D296852D948F07D849e"), - LogBackfillBatchSize: ptr[uint32](17), - LogPollInterval: &minute, - LogKeepBlocksDepth: ptr[uint32](100000), - LogPrunePageSize: ptr[uint32](0), - MinContractPayment: commonassets.NewLinkFromJuels(math.MaxInt64), - MinIncomingConfirmations: ptr[uint32](13), - NonceAutoSync: ptr(true), - NoNewHeadsThreshold: &minute, - OperatorFactoryAddress: mustAddress("0xa5B85635Be42F21f94F28034B7DA440EeFF0F418"), - RPCDefaultBatchSize: ptr[uint32](17), - RPCBlockQueryDelay: ptr[uint16](10), + LinkContractAddress: mustAddress("0x538aAaB4ea120b2bC2fe5D296852D948F07D849e"), + LogBackfillBatchSize: ptr[uint32](17), + LogPollInterval: &minute, + LogKeepBlocksDepth: ptr[uint32](100000), + LogPrunePageSize: ptr[uint32](0), + BackupLogPollerBlockDelay: ptr[uint64](532), + MinContractPayment: commonassets.NewLinkFromJuels(math.MaxInt64), + MinIncomingConfirmations: ptr[uint32](13), + NonceAutoSync: ptr(true), + NoNewHeadsThreshold: &minute, + OperatorFactoryAddress: mustAddress("0xa5B85635Be42F21f94F28034B7DA440EeFF0F418"), + RPCDefaultBatchSize: ptr[uint32](17), + RPCBlockQueryDelay: ptr[uint16](10), Transactions: evmcfg.Transactions{ MaxInFlight: ptr[uint32](19), @@ -926,6 +927,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 532 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index e90f6c6611a..14b19b3f42f 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -254,6 +254,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index c230a764c74..9f69d4aa909 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -241,6 +241,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -330,6 +331,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -414,6 +416,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 5 MinContractPayment = '0.00001 link' NonceAutoSync = true diff --git a/core/web/resolver/chain_test.go b/core/web/resolver/chain_test.go index e2663af561f..a0f2ca22b07 100644 --- a/core/web/resolver/chain_test.go +++ b/core/web/resolver/chain_test.go @@ -46,6 +46,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true @@ -164,6 +165,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true diff --git a/docs/CONFIG.md b/docs/CONFIG.md index e4e25e6694f..cb1ef16192b 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1604,6 +1604,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -1688,6 +1689,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -1771,6 +1773,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -1854,6 +1857,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -1938,6 +1942,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2021,6 +2026,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '30s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.001 link' NonceAutoSync = true @@ -2104,6 +2110,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '30s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.001 link' NonceAutoSync = true @@ -2187,6 +2194,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -2271,6 +2279,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2353,6 +2362,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2435,6 +2445,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2518,6 +2529,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2602,6 +2614,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2685,6 +2698,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2768,6 +2782,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 5 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2851,6 +2866,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -2934,6 +2950,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3017,6 +3034,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3100,6 +3118,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3184,6 +3203,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3267,6 +3287,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3349,6 +3370,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3432,6 +3454,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3514,6 +3537,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '30s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3597,6 +3621,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3680,6 +3705,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3762,6 +3788,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '100' NonceAutoSync = true @@ -3844,6 +3871,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '30s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -3927,6 +3955,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4010,6 +4039,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4092,6 +4122,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4175,6 +4206,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4258,6 +4290,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4342,6 +4375,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4425,6 +4459,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4508,6 +4543,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4591,6 +4627,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4674,6 +4711,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '5s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4756,6 +4794,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4838,6 +4877,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -4921,6 +4961,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 5 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5004,6 +5045,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5087,6 +5129,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5171,6 +5214,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5255,6 +5299,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5338,6 +5383,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5421,6 +5467,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5504,6 +5551,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '3s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5587,6 +5635,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -5670,6 +5719,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5753,6 +5803,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '2s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 1 MinContractPayment = '0.00001 link' NonceAutoSync = true @@ -5935,6 +5986,14 @@ LogPrunePageSize = 0 # Default ``` LogPrunePageSize defines size of the page for pruning logs. Controls how many logs/blocks (at most) are deleted in a single prune tick. Default value 0 means no paging, delete everything at once. +### BackupLogPollerBlockDelay +:warning: **_ADVANCED_**: _Do not change this setting unless you know what you are doing._ +```toml +BackupLogPollerBlockDelay = 100 # Default +``` +BackupLogPollerBlockDelay works in conjunction with Feature.LogPoller. Controls the block delay of Backup LogPoller, affecting how far behind the latest finalized block it starts and how often it runs. +BackupLogPollerDelay=0 will disable Backup LogPoller (_not recommended for production environment_). + ### MinContractPayment ```toml MinContractPayment = '10000000000000 juels' # Default diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index bb845b05201..873b9e91bc1 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -297,6 +297,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index cdac1b3702c..0c00fbb7adc 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -297,6 +297,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index 832b3fac584..0bbddd6f40f 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -297,6 +297,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 280fa209f0d..011298fcde7 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -287,6 +287,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index bdd83a9eb31..e0bd015a184 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -294,6 +294,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true From a483eb016f08b35f2729720dbb0598a28d91b520 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Thu, 22 Feb 2024 14:16:09 -0800 Subject: [PATCH 06/11] Fix some merge conflicts --- .../evm/forwarders/forwarder_manager_test.go | 4 --- .../chainlink/testdata/config-full.toml | 2 +- ...annel_definition_cache_integration_test.go | 25 ++++++++++++++++--- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/core/chains/evm/forwarders/forwarder_manager_test.go b/core/chains/evm/forwarders/forwarder_manager_test.go index 6a093553800..0eb51a535e0 100644 --- a/core/chains/evm/forwarders/forwarder_manager_test.go +++ b/core/chains/evm/forwarders/forwarder_manager_test.go @@ -121,9 +121,6 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) { ec.Commit() evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID) -<<<<<<< HEAD - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000, 0) -======= lpOpts := logpoller.Opts{ PollPeriod: 100 * time.Millisecond, FinalityDepth: 2, @@ -132,7 +129,6 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) { KeepFinalizedBlocksDepth: 1000, } lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), evmClient, lggr, lpOpts) ->>>>>>> 9b8bd0dea3 (Refactor NewLogPoller() params into logpoller.Opts struct, add BackupPollerBlockDela) fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM(), evmcfg.Database()) fwdMgr.ORM = forwarders.NewORM(db, logger.Test(t), cfg.Database()) diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index 14b19b3f42f..c1606a5b067 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -254,7 +254,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 -BackupLogPollerBlockDelay = 100 +BackupLogPollerBlockDelay = 532 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true diff --git a/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go b/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go index 427dd6b32c2..f5bc4a0e89a 100644 --- a/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go +++ b/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go @@ -77,7 +77,14 @@ func Test_ChannelDefinitionCache_Integration(t *testing.T) { require.NoError(t, err) t.Run("with zero fromblock", func(t *testing.T) { - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 1, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 3, + RpcBatchSize: 2, + } + lp := logpoller.NewLogPoller( + logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) servicetest.Run(t, lp) cdc := llo.NewChannelDefinitionCache(lggr, orm, lp, configStoreAddress, 0) @@ -141,8 +148,14 @@ func Test_ChannelDefinitionCache_Integration(t *testing.T) { t.Run("loads from ORM", func(t *testing.T) { // Override logpoller to always return no logs + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 3, + RpcBatchSize: 2, + } lp := &mockLogPoller{ - LogPoller: logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 1, 3, 2, 1000, 0), + LogPoller: logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts), LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) { return 0, nil }, @@ -176,7 +189,13 @@ func Test_ChannelDefinitionCache_Integration(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM channel_definitions`) t.Run("with non-zero fromBlock", func(t *testing.T) { - lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 1, 3, 2, 1000, 0) + lpOpts := logpoller.Opts{ + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 3, + RpcBatchSize: 2, + } + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) servicetest.Run(t, lp) cdc := llo.NewChannelDefinitionCache(lggr, orm, lp, configStoreAddress, channel2Block.Number().Int64()+1) From 6b6cd993631f2b6160778541c76e093194683676 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 26 Feb 2024 14:47:43 -0800 Subject: [PATCH 07/11] Restore KeepFinalizedBlocksDepth: 1000 --- .../evm/logpoller/log_poller_internal_test.go | 18 +++++++----- core/chains/evm/logpoller/log_poller_test.go | 29 ++++++++++--------- core/chains/evm/txmgr/txmgr_test.go | 9 +++--- ...annel_definition_cache_integration_test.go | 27 +++++++++-------- .../v21/logprovider/integration_test.go | 9 +++--- core/services/relay/evm/config_poller_test.go | 9 +++--- .../relay/evm/functions/config_poller_test.go | 9 +++--- .../relay/evm/mercury/helpers_test.go | 9 +++--- docs/CONFIG.md | 2 +- 9 files changed, 67 insertions(+), 54 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index cdec0c84041..af2d9a558e1 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -224,10 +224,11 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) { ctx := testutils.Context(t) lpOpts := Opts{ - PollPeriod: time.Hour, - FinalityDepth: 2, - BackfillBatchSize: 3, - RpcBatchSize: 2, + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := NewLogPoller(orm, ec, lggr, lpOpts) lp.BackupPollAndSaveLogs(ctx) @@ -537,10 +538,11 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { lggr := logger.Test(b) lpOpts := Opts{ - PollPeriod: time.Hour, - FinalityDepth: 2, - BackfillBatchSize: 3, - RpcBatchSize: 2, + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := NewLogPoller(nil, nil, lggr, lpOpts) for i := 0; i < nFilters; i++ { diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index c2f42f03b5c..2eb402721e3 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -719,10 +719,11 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err) lpOpts := logpoller.Opts{ - PollPeriod: 15 * time.Second, - FinalityDepth: int64(finalityDepth), - BackfillBatchSize: 3, - RpcBatchSize: 2, + PollPeriod: 15 * time.Second, + FinalityDepth: int64(finalityDepth), + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := logpoller.NewLogPoller(orm, client.NewSimulatedBackendClient(t, ec, chainID), lggr, lpOpts) for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1. @@ -1396,10 +1397,11 @@ func TestLogPoller_DBErrorHandling(t *testing.T) { ec.Commit() lpOpts := logpoller.Opts{ - PollPeriod: time.Hour, - FinalityDepth: 2, - BackfillBatchSize: 3, - RpcBatchSize: 2, + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := logpoller.NewLogPoller(o, client.NewSimulatedBackendClient(t, ec, chainID2), lggr, lpOpts) @@ -1450,11 +1452,12 @@ func TestTooManyLogResults(t *testing.T) { o := logpoller.NewORM(chainID, db, lggr, pgtest.NewQConfig(true)) lpOpts := logpoller.Opts{ - PollPeriod: time.Hour, - FinalityDepth: 2, - BackfillBatchSize: 20, - RpcBatchSize: 10, - BackupPollerBlockDelay: 0, + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 20, + RpcBatchSize: 10, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 0, } lp := logpoller.NewLogPoller(o, ec, lggr, lpOpts) expected := []int64{10, 5, 2, 1} diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index f807da7ce09..da18c592a55 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -51,10 +51,11 @@ func makeTestEvmTxm( t *testing.T, db *sqlx.DB, ethClient evmclient.Client, estimator gas.EvmFeeEstimator, ccfg txmgr.ChainConfig, fcfg txmgr.FeeConfig, txConfig evmconfig.Transactions, dbConfig txmgr.DatabaseConfig, listenerConfig txmgr.ListenerConfig, keyStore keystore.Eth) (txmgr.TxManager, error) { lggr := logger.Test(t) lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 2, - BackfillBatchSize: 3, - RpcBatchSize: 2, + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 2, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) diff --git a/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go b/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go index f5bc4a0e89a..c24ea46231d 100644 --- a/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go +++ b/core/services/ocr2/plugins/llo/onchain_channel_definition_cache_integration_test.go @@ -78,10 +78,11 @@ func Test_ChannelDefinitionCache_Integration(t *testing.T) { t.Run("with zero fromblock", func(t *testing.T) { lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 1, - BackfillBatchSize: 3, - RpcBatchSize: 2, + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := logpoller.NewLogPoller( logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) @@ -149,10 +150,11 @@ func Test_ChannelDefinitionCache_Integration(t *testing.T) { t.Run("loads from ORM", func(t *testing.T) { // Override logpoller to always return no logs lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 1, - BackfillBatchSize: 3, - RpcBatchSize: 2, + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := &mockLogPoller{ LogPoller: logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts), @@ -190,10 +192,11 @@ func Test_ChannelDefinitionCache_Integration(t *testing.T) { t.Run("with non-zero fromBlock", func(t *testing.T) { lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 1, - BackfillBatchSize: 3, - RpcBatchSize: 2, + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.SimulatedChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, lpOpts) servicetest.Run(t, lp) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go index f9459617af4..aa8a5c97d70 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/integration_test.go @@ -662,10 +662,11 @@ func setupDependencies(t *testing.T, db *sqlx.DB, backend *backends.SimulatedBac pollerLggr.SetLogLevel(zapcore.WarnLevel) lorm := logpoller.NewORM(big.NewInt(1337), db, pollerLggr, pgtest.NewQConfig(false)) lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 1, - BackfillBatchSize: 2, - RpcBatchSize: 2, + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := logpoller.NewLogPoller(lorm, ethClient, pollerLggr, lpOpts) return lp, ethClient diff --git a/core/services/relay/evm/config_poller_test.go b/core/services/relay/evm/config_poller_test.go index eb154992c70..d2d33944df7 100644 --- a/core/services/relay/evm/config_poller_test.go +++ b/core/services/relay/evm/config_poller_test.go @@ -92,10 +92,11 @@ func TestConfigPoller(t *testing.T) { lorm := logpoller.NewORM(testutils.SimulatedChainID, db, lggr, cfg) lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 1, - BackfillBatchSize: 2, - RpcBatchSize: 2, + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp = logpoller.NewLogPoller(lorm, ethClient, lggr, lpOpts) servicetest.Run(t, lp) diff --git a/core/services/relay/evm/functions/config_poller_test.go b/core/services/relay/evm/functions/config_poller_test.go index d1a38492cc8..ab80f3ae565 100644 --- a/core/services/relay/evm/functions/config_poller_test.go +++ b/core/services/relay/evm/functions/config_poller_test.go @@ -82,10 +82,11 @@ func runTest(t *testing.T, pluginType functions.FunctionsPluginType, expectedDig lggr := logger.TestLogger(t) lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg) lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 1, - BackfillBatchSize: 2, - RpcBatchSize: 2, + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := logpoller.NewLogPoller(lorm, ethClient, lggr, lpOpts) servicetest.Run(t, lp) diff --git a/core/services/relay/evm/mercury/helpers_test.go b/core/services/relay/evm/mercury/helpers_test.go index 18ab0c48801..4b05b974c3d 100644 --- a/core/services/relay/evm/mercury/helpers_test.go +++ b/core/services/relay/evm/mercury/helpers_test.go @@ -169,10 +169,11 @@ func SetupTH(t *testing.T, feedID common.Hash) TestHarness { lorm := logpoller.NewORM(big.NewInt(1337), db, lggr, cfg) lpOpts := logpoller.Opts{ - PollPeriod: 100 * time.Millisecond, - FinalityDepth: 1, - BackfillBatchSize: 2, - RpcBatchSize: 2, + PollPeriod: 100 * time.Millisecond, + FinalityDepth: 1, + BackfillBatchSize: 2, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, } lp := logpoller.NewLogPoller(lorm, ethClient, lggr, lpOpts) servicetest.Run(t, lp) diff --git a/docs/CONFIG.md b/docs/CONFIG.md index cb1ef16192b..938c3de66b1 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1604,7 +1604,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 -BackupLogPollerBlockDelay = 100 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true From 09869410d006ddda230be7b89af43ea61bd4a513 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 26 Feb 2024 20:17:19 -0800 Subject: [PATCH 08/11] Use default BackupPollerBlockDelay: 0 instead of explicitly setting to 0 --- core/chains/evm/logpoller/log_poller_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 2eb402721e3..21246e24ec0 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -818,7 +818,6 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, - BackupPollerBlockDelay: 0, } th := SetupTH(t, lpOpts) @@ -1075,7 +1074,6 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, - BackupPollerBlockDelay: 0, } th := SetupTH(t, lpOpts) @@ -1144,7 +1142,6 @@ func TestLogPoller_LoadFilters(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, - BackupPollerBlockDelay: 0, } th := SetupTH(t, lpOpts) @@ -1213,7 +1210,6 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, - BackupPollerBlockDelay: 0, } th := SetupTH(t, lpOpts) @@ -1333,7 +1329,6 @@ func TestGetReplayFromBlock(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, - BackupPollerBlockDelay: 0, } th := SetupTH(t, lpOpts) // Commit a few blocks @@ -1457,7 +1452,6 @@ func TestTooManyLogResults(t *testing.T) { BackfillBatchSize: 20, RpcBatchSize: 10, KeepFinalizedBlocksDepth: 1000, - BackupPollerBlockDelay: 0, } lp := logpoller.NewLogPoller(o, ec, lggr, lpOpts) expected := []int64{10, 5, 2, 1} @@ -1551,7 +1545,6 @@ func Test_PollAndQueryFinalizedBlocks(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, - BackupPollerBlockDelay: 0, } th := SetupTH(t, lpOpts) @@ -1643,7 +1636,6 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: 1000, - BackupPollerBlockDelay: 0, } th := SetupTH(t, lpOpts) // Should return error before the first poll and save @@ -1796,7 +1788,6 @@ func Test_PruneOldBlocks(t *testing.T) { BackfillBatchSize: 3, RpcBatchSize: 2, KeepFinalizedBlocksDepth: tt.keepFinalizedBlocksDepth, - BackupPollerBlockDelay: 0, } th := SetupTH(t, lpOpts) From 3520711db87ca78be69a9fae910fef4f4cb35386 Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Mon, 26 Feb 2024 17:42:54 -0300 Subject: [PATCH 09/11] disable backup log poller in lp smoke tests --- integration-tests/smoke/log_poller_test.go | 3 ++- integration-tests/testconfig/log_poller/config.go | 12 +++++++----- .../testconfig/log_poller/log_poller.toml | 3 +++ integration-tests/universal/log_poller/helpers.go | 2 ++ 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/integration-tests/smoke/log_poller_test.go b/integration-tests/smoke/log_poller_test.go index 593b4eb879a..4b4533d3a37 100644 --- a/integration-tests/smoke/log_poller_test.go +++ b/integration-tests/smoke/log_poller_test.go @@ -277,7 +277,8 @@ func prepareEnvironment(l zerolog.Logger, t *testing.T, testConfig *tc.TestConfi ethereum.RegistryVersion_2_1, logpoller.DefaultOCRRegistryConfig, upKeepsNeeded, - time.Duration(500*time.Millisecond), + cfg.General.LogPollInterval.Duration, + *cfg.General.BackupLogPollerBlockDelay, *cfg.General.UseFinalityTag, testConfig, ) diff --git a/integration-tests/testconfig/log_poller/config.go b/integration-tests/testconfig/log_poller/config.go index 96c3b55c276..890c33f26c9 100644 --- a/integration-tests/testconfig/log_poller/config.go +++ b/integration-tests/testconfig/log_poller/config.go @@ -90,11 +90,13 @@ func (l *LoopedConfig) Validate() error { } type General struct { - Generator *string `toml:"generator"` - EventsToEmit []abi.Event `toml:"-"` - Contracts *int `toml:"contracts"` - EventsPerTx *int `toml:"events_per_tx"` - UseFinalityTag *bool `toml:"use_finality_tag"` + Generator *string `toml:"generator"` + EventsToEmit []abi.Event `toml:"-"` + Contracts *int `toml:"contracts"` + EventsPerTx *int `toml:"events_per_tx"` + UseFinalityTag *bool `toml:"use_finality_tag"` + BackupLogPollerBlockDelay *uint64 `toml:"backup_log_poller_block_delay"` + LogPollInterval *blockchain.StrDuration `toml:"log_poll_interval"` } func (g *General) Validate() error { diff --git a/integration-tests/testconfig/log_poller/log_poller.toml b/integration-tests/testconfig/log_poller/log_poller.toml index 2f46ebf11c2..89d2f07b4e3 100644 --- a/integration-tests/testconfig/log_poller/log_poller.toml +++ b/integration-tests/testconfig/log_poller/log_poller.toml @@ -5,6 +5,9 @@ generator = "looped" contracts = 2 events_per_tx = 4 use_finality_tag = true +log_poll_interval = "500ms" +# 0 disables backup poller +backup_log_poller_block_delay = 0 [LogPoller.Looped] execution_count = 100 diff --git a/integration-tests/universal/log_poller/helpers.go b/integration-tests/universal/log_poller/helpers.go index b91156a3784..b6e62cff2f6 100644 --- a/integration-tests/universal/log_poller/helpers.go +++ b/integration-tests/universal/log_poller/helpers.go @@ -1079,6 +1079,7 @@ func SetupLogPollerTestDocker( registryConfig contracts.KeeperRegistrySettings, upkeepsNeeded int, lpPollingInterval time.Duration, + backupPollingInterval uint64, finalityTagEnabled bool, testConfig *tc.TestConfig, ) ( @@ -1121,6 +1122,7 @@ func SetupLogPollerTestDocker( chain.LogPollInterval = commonconfig.MustNewDuration(lpPollingInterval) chain.FinalityDepth = ptr.Ptr[uint32](uint32(finalityDepth)) chain.FinalityTagEnabled = ptr.Ptr[bool](finalityTagEnabled) + chain.BackupLogPollerBlockDelay = ptr.Ptr[uint64](backupPollingInterval) return chain } From 2a5c4469a2ab7babbe6341a382f3b2b52e083d1c Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 27 Feb 2024 11:15:54 -0800 Subject: [PATCH 10/11] Fix generated docs --- core/config/docs/chains-evm.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/config/docs/chains-evm.toml b/core/config/docs/chains-evm.toml index 10c39d10cdf..5359fe4b22b 100644 --- a/core/config/docs/chains-evm.toml +++ b/core/config/docs/chains-evm.toml @@ -57,9 +57,10 @@ LogKeepBlocksDepth = 100000 # Default # **ADVANCED** # LogPrunePageSize defines size of the page for pruning logs. Controls how many logs/blocks (at most) are deleted in a single prune tick. Default value 0 means no paging, delete everything at once. LogPrunePageSize = 0 # Default +# **ADVANCED** # BackupLogPollerBlockDelay works in conjunction with Feature.LogPoller. Controls the block delay of Backup LogPoller, affecting how far behind the latest finalized block it starts and how often it runs. # BackupLogPollerDelay=0 will disable Backup LogPoller (_not recommended for production environment_). -BackupLogPollerBlockDelay = 100 #Default +BackupLogPollerBlockDelay = 100 # Default # MinContractPayment is the minimum payment in LINK required to execute a direct request job. This can be overridden on a per-job basis. MinContractPayment = '10000000000000 juels' # Default # MinIncomingConfirmations is the minimum required confirmations before a log event will be consumed. From 78913bdf402e65b817141f8dc9a023f10d10bd15 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Tue, 27 Feb 2024 21:12:11 -0800 Subject: [PATCH 11/11] Add missing web resolver testdata --- core/web/resolver/testdata/config-full.toml | 1 + core/web/resolver/testdata/config-multi-chain-effective.toml | 3 +++ 2 files changed, 4 insertions(+) diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index f698f55fb25..cdfb85a6f5c 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -254,6 +254,7 @@ LogBackfillBatchSize = 17 LogPollInterval = '1m0s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 13 MinContractPayment = '9.223372036854775807 link' NonceAutoSync = true diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index c230a764c74..9f69d4aa909 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -241,6 +241,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -330,6 +331,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '15s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 3 MinContractPayment = '0.1 link' NonceAutoSync = true @@ -414,6 +416,7 @@ LogBackfillBatchSize = 1000 LogPollInterval = '1s' LogKeepBlocksDepth = 100000 LogPrunePageSize = 0 +BackupLogPollerBlockDelay = 100 MinIncomingConfirmations = 5 MinContractPayment = '0.00001 link' NonceAutoSync = true