From 2534e1a6055b9e409b2a9f0710cc860f0518ec24 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Tue, 31 Oct 2023 10:40:05 +0100 Subject: [PATCH] CCIP-1230 Exposing entire LogPollerBlock from LatestBlock in LogPoller (#11105) * Exposing entire LogPollerBlock from LatestBlock function in the LogPoller's interface * Exposing entire LogPollerBlock from LatestBlock function in the LogPoller's interface --- core/chains/evm/logpoller/disabled.go | 4 +- core/chains/evm/logpoller/helper_test.go | 2 +- core/chains/evm/logpoller/log_poller.go | 8 ++-- .../evm/logpoller/log_poller_internal_test.go | 2 +- core/chains/evm/logpoller/log_poller_test.go | 39 ++++++++++++------- core/chains/evm/logpoller/mocks/log_poller.go | 10 ++--- core/services/blockhashstore/coordinators.go | 6 +-- core/services/blockhashstore/delegate.go | 2 +- core/services/blockhashstore/delegate_test.go | 3 +- core/services/blockhashstore/feeder_test.go | 6 +-- .../plugins/ocr2keeper/evm20/log_provider.go | 24 ++++++------ .../ocr2/plugins/ocr2keeper/evm20/registry.go | 10 ++--- .../plugins/ocr2keeper/evm20/registry_test.go | 2 +- .../ocr2keeper/evm21/block_subscriber.go | 7 ++-- .../ocr2keeper/evm21/block_subscriber_test.go | 4 +- .../evm21/logprovider/block_time.go | 5 ++- .../evm21/logprovider/block_time_test.go | 2 +- .../evm21/logprovider/integration_test.go | 4 +- .../ocr2keeper/evm21/logprovider/provider.go | 10 ++--- .../evm21/logprovider/provider_life_cycle.go | 2 +- .../logprovider/provider_life_cycle_test.go | 5 ++- .../evm21/logprovider/provider_test.go | 2 +- .../ocr2keeper/evm21/logprovider/recoverer.go | 8 ++-- .../evm21/logprovider/recoverer_test.go | 11 +++--- .../ocr2/plugins/ocr2keeper/evm21/registry.go | 10 ++--- .../plugins/ocr2keeper/evm21/registry_test.go | 2 +- .../evm21/transmit/event_provider.go | 6 +-- .../evm21/transmit/event_provider_test.go | 2 +- .../ocr2vrf/coordinator/coordinator.go | 2 +- .../ocr2vrf/coordinator/coordinator_test.go | 4 +- core/services/relay/evm/config_poller.go | 2 +- .../relay/evm/functions/config_poller.go | 2 +- .../relay/evm/functions/logpoller_wrapper.go | 15 +++---- .../evm/functions/logpoller_wrapper_test.go | 2 +- .../relay/evm/mercury/config_poller.go | 2 +- 35 files changed, 123 insertions(+), 104 deletions(-) diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index 4bcf1c50863..b54d4e6fc84 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -39,7 +39,9 @@ func (disabled) UnregisterFilter(name string, qopts ...pg.QOpt) error { return E func (disabled) HasFilter(name string) bool { return false } -func (disabled) LatestBlock(qopts ...pg.QOpt) (int64, error) { return -1, ErrDisabled } +func (disabled) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) { + return LogPollerBlock{}, ErrDisabled +} func (disabled) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) { return nil, ErrDisabled diff --git a/core/chains/evm/logpoller/helper_test.go b/core/chains/evm/logpoller/helper_test.go index 8415641c402..c61d3d5fad6 100644 --- a/core/chains/evm/logpoller/helper_test.go +++ b/core/chains/evm/logpoller/helper_test.go @@ -92,7 +92,7 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 { th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) latest, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(ctx)) - return latest + 1 + return latest.BlockNumber + 1 } func (th *TestHarness) assertDontHave(t *testing.T, start, end int) { diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 6cda8f5b46c..4cd2804d9f3 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -38,7 +38,7 @@ type LogPoller interface { RegisterFilter(filter Filter, qopts ...pg.QOpt) error UnregisterFilter(name string, qopts ...pg.QOpt) error HasFilter(name string) bool - LatestBlock(qopts ...pg.QOpt) (int64, error) + LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) // General querying @@ -1019,13 +1019,13 @@ func (lp *logPoller) IndexedLogsTopicRange(eventSig common.Hash, address common. // LatestBlock returns the latest block the log poller is on. It tracks blocks to be able // to detect reorgs. -func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) { +func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) { b, err := lp.orm.SelectLatestBlock(qopts...) if err != nil { - return 0, err + return LogPollerBlock{}, err } - return b.BlockNumber, nil + return *b, nil } func (lp *logPoller) BlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) { diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index b9474158a6b..c0d081582f7 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -262,7 +262,7 @@ func TestLogPoller_Replay(t *testing.T) { lp.PollAndSaveLogs(tctx, 4) latest, err := lp.LatestBlock() require.NoError(t, err) - require.Equal(t, int64(4), latest) + require.Equal(t, int64(4), latest.BlockNumber) t.Run("abort before replayStart received", func(t *testing.T) { // Replay() should abort immediately if caller's context is cancelled before request signal is read diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 1ee8f4dcb78..471c728cdd6 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -311,8 +311,8 @@ func Test_BackupLogPoller(t *testing.T) { body.Transactions = types.Transactions{} // number of tx's must match # of logs for GetLogs() to succeed rawdb.WriteBody(th.EthDB, h.Hash(), h.Number.Uint64(), body) - currentBlock := th.PollAndSaveLogs(ctx, 1) - assert.Equal(t, int64(35), currentBlock) + currentBlockNumber := th.PollAndSaveLogs(ctx, 1) + assert.Equal(t, int64(35), currentBlockNumber) // simulate logs becoming available rawdb.WriteReceipts(th.EthDB, h.Hash(), h.Number.Uint64(), receipts) @@ -342,12 +342,12 @@ func Test_BackupLogPoller(t *testing.T) { markBlockAsFinalized(t, th, 34) // Run ordinary poller + backup poller at least once - currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) - th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1) + currentBlock, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) + th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1) th.LogPoller.BackupPollAndSaveLogs(ctx, 100) currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) - require.Equal(t, int64(37), currentBlock+1) + require.Equal(t, int64(37), currentBlock.BlockNumber+1) // logs still shouldn't show up, because we don't want to backfill the last finalized log // to help with reorg detection @@ -359,11 +359,11 @@ func Test_BackupLogPoller(t *testing.T) { markBlockAsFinalized(t, th, 35) // Run ordinary poller + backup poller at least once more - th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1) th.LogPoller.BackupPollAndSaveLogs(ctx, 100) currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) - require.Equal(t, int64(38), currentBlock+1) + require.Equal(t, int64(38), currentBlock.BlockNumber+1) // all 3 logs in block 34 should show up now, thanks to backup logger logs, err = th.LogPoller.Logs(30, 37, EmitterABI.Events["Log1"].ID, th.EmitterAddress1, @@ -471,6 +471,13 @@ func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) { // 1 -> 2 -> ... th.PollAndSaveLogs(ctx, 1) + // Check that latest block has the same properties as the head + latestBlock, err := th.LogPoller.LatestBlock() + require.NoError(t, err) + assert.Equal(t, latestBlock.BlockNumber, header.Number.Int64()) + assert.Equal(t, latestBlock.FinalizedBlockNumber, header.Number.Int64()) + assert.Equal(t, latestBlock.BlockHash, header.Hash()) + // Register filter err = th.LogPoller.RegisterFilter(logpoller.Filter{ Name: "Test Emitter", @@ -619,7 +626,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) { require.Len(t, gethLogs, 2) lb, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) - th.PollAndSaveLogs(context.Background(), lb+1) + th.PollAndSaveLogs(context.Background(), lb.BlockNumber+1) lg1, err := th.LogPoller.Logs(0, 20, EmitterABI.Events["Log1"].ID, th.EmitterAddress1, pg.WithParentCtx(testutils.Context(t))) require.NoError(t, err) @@ -667,9 +674,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1. ec.Commit() } - currentBlock := int64(1) - lp.PollAndSaveLogs(testutils.Context(t), currentBlock) - currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t))) + currentBlockNumber := int64(1) + lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber) + currentBlock, err := lp.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.NoError(t, err) matchesGeth := func() bool { // Check every block is identical @@ -719,7 +726,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err1) t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock) + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber) currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.NoError(t, err) } @@ -1245,7 +1252,7 @@ func TestGetReplayFromBlock(t *testing.T) { require.NoError(t, err) latest, err := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t))) require.NoError(t, err) - assert.Equal(t, latest, fromBlock) + assert.Equal(t, latest.BlockNumber, fromBlock) // Should take min(latest, requested) in this case requested. requested = int64(7) @@ -1551,6 +1558,10 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2, 1000) + // Should return error before the first poll and save + _, err := th.LogPoller.LatestBlock() + require.Error(t, err) + // Mark first block as finalized h := th.Client.Blockchain().CurrentHeader() th.Client.Blockchain().SetFinalized(h) @@ -1562,7 +1573,7 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) { th.PollAndSaveLogs(ctx, 1) - latestBlock, err := th.ORM.SelectLatestBlock() + latestBlock, err := th.LogPoller.LatestBlock() require.NoError(t, err) require.Equal(t, int64(numberOfBlocks), latestBlock.BlockNumber) require.Equal(t, tt.expectedFinalizedBlock, latestBlock.FinalizedBlockNumber) diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index f4357341646..01be5f7ba55 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -330,7 +330,7 @@ func (_m *LogPoller) IndexedLogsWithSigsExcluding(address common.Address, eventS } // LatestBlock provides a mock function with given fields: qopts -func (_m *LogPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) { +func (_m *LogPoller) LatestBlock(qopts ...pg.QOpt) (logpoller.LogPollerBlock, error) { _va := make([]interface{}, len(qopts)) for _i := range qopts { _va[_i] = qopts[_i] @@ -339,15 +339,15 @@ func (_m *LogPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) { _ca = append(_ca, _va...) ret := _m.Called(_ca...) - var r0 int64 + var r0 logpoller.LogPollerBlock var r1 error - if rf, ok := ret.Get(0).(func(...pg.QOpt) (int64, error)); ok { + if rf, ok := ret.Get(0).(func(...pg.QOpt) (logpoller.LogPollerBlock, error)); ok { return rf(qopts...) } - if rf, ok := ret.Get(0).(func(...pg.QOpt) int64); ok { + if rf, ok := ret.Get(0).(func(...pg.QOpt) logpoller.LogPollerBlock); ok { r0 = rf(qopts...) } else { - r0 = ret.Get(0).(int64) + r0 = ret.Get(0).(logpoller.LogPollerBlock) } if rf, ok := ret.Get(1).(func(...pg.QOpt) error); ok { diff --git a/core/services/blockhashstore/coordinators.go b/core/services/blockhashstore/coordinators.go index ff5aff1f5e5..4cb58bab6fd 100644 --- a/core/services/blockhashstore/coordinators.go +++ b/core/services/blockhashstore/coordinators.go @@ -128,7 +128,7 @@ func (v *V1Coordinator) Fulfillments(ctx context.Context, fromBlock uint64) ([]E logs, err := v.lp.LogsWithSigs( int64(fromBlock), - int64(toBlock), + toBlock.BlockNumber, []common.Hash{ v1.VRFCoordinatorRandomnessRequestFulfilled{}.Topic(), }, @@ -219,7 +219,7 @@ func (v *V2Coordinator) Fulfillments(ctx context.Context, fromBlock uint64) ([]E logs, err := v.lp.LogsWithSigs( int64(fromBlock), - int64(toBlock), + toBlock.BlockNumber, []common.Hash{ v2.VRFCoordinatorV2RandomWordsFulfilled{}.Topic(), }, @@ -310,7 +310,7 @@ func (v *V2PlusCoordinator) Fulfillments(ctx context.Context, fromBlock uint64) logs, err := v.lp.LogsWithSigs( int64(fromBlock), - int64(toBlock), + toBlock.BlockNumber, []common.Hash{ v2plus.IVRFCoordinatorV2PlusInternalRandomWordsFulfilled{}.Topic(), }, diff --git a/core/services/blockhashstore/delegate.go b/core/services/blockhashstore/delegate.go index 123052550ba..c8e55e47c3c 100644 --- a/core/services/blockhashstore/delegate.go +++ b/core/services/blockhashstore/delegate.go @@ -173,7 +173,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { if err != nil { return 0, errors.Wrap(err, "getting chain head") } - return uint64(head), nil + return uint64(head.BlockNumber), nil }) return []job.ServiceCtx{&service{ diff --git a/core/services/blockhashstore/delegate_test.go b/core/services/blockhashstore/delegate_test.go index 089e9544af5..011ab87ad6b 100644 --- a/core/services/blockhashstore/delegate_test.go +++ b/core/services/blockhashstore/delegate_test.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" mocklp "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -58,7 +59,7 @@ func createTestDelegate(t *testing.T) (*blockhashstore.Delegate, *testData) { sendingKey, _ := cltest.MustInsertRandomKey(t, kst) lp := &mocklp.LogPoller{} lp.On("RegisterFilter", mock.Anything).Return(nil) - lp.On("LatestBlock", mock.Anything, mock.Anything).Return(int64(0), nil) + lp.On("LatestBlock", mock.Anything, mock.Anything).Return(logpoller.LogPollerBlock{}, nil) relayExtenders := evmtest.NewChainRelayExtenders( t, diff --git a/core/services/blockhashstore/feeder_test.go b/core/services/blockhashstore/feeder_test.go index 3145a9fd76d..8d9ed48c4bf 100644 --- a/core/services/blockhashstore/feeder_test.go +++ b/core/services/blockhashstore/feeder_test.go @@ -445,7 +445,7 @@ func (test testCase) testFeederWithLogPollerVRFv1(t *testing.T) { // Mock log poller. lp.On("LatestBlock", mock.Anything). - Return(latest, nil) + Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil) lp.On( "LogsWithSigs", fromBlock, @@ -543,7 +543,7 @@ func (test testCase) testFeederWithLogPollerVRFv2(t *testing.T) { // Mock log poller. lp.On("LatestBlock", mock.Anything). - Return(latest, nil) + Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil) lp.On( "LogsWithSigs", fromBlock, @@ -641,7 +641,7 @@ func (test testCase) testFeederWithLogPollerVRFv2Plus(t *testing.T) { // Mock log poller. lp.On("LatestBlock", mock.Anything). - Return(latest, nil) + Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil) lp.On( "LogsWithSigs", fromBlock, diff --git a/core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go b/core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go index 856e508fc57..4044bb5f2a4 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go @@ -151,8 +151,8 @@ func (c *LogProvider) PerformLogs(ctx context.Context) ([]ocr2keepers.PerformLog // always check the last lookback number of blocks and rebroadcast // this allows the plugin to make decisions based on event confirmations logs, err := c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ registry.KeeperRegistryUpkeepPerformed{}.Topic(), }, @@ -175,7 +175,7 @@ func (c *LogProvider) PerformLogs(ctx context.Context) ([]ocr2keepers.PerformLog Key: UpkeepKeyHelper[uint32]{}.MakeUpkeepKey(p.CheckBlockNumber, p.Id), TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(p.BlockNumber), TransactionHash: p.TxHash.Hex(), - Confirmations: end - p.BlockNumber, + Confirmations: end.BlockNumber - p.BlockNumber, } vals = append(vals, l) } @@ -194,8 +194,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR // ReorgedUpkeepReportLogs logs, err := c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ registry.KeeperRegistryReorgedUpkeepReport{}.Topic(), }, @@ -212,8 +212,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR // StaleUpkeepReportLogs logs, err = c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ registry.KeeperRegistryStaleUpkeepReport{}.Topic(), }, @@ -230,8 +230,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR // InsufficientFundsUpkeepReportLogs logs, err = c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ registry.KeeperRegistryInsufficientFundsUpkeepReport{}.Topic(), }, @@ -258,7 +258,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId), TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber), TransactionHash: r.TxHash.Hex(), - Confirmations: end - r.BlockNumber, + Confirmations: end.BlockNumber - r.BlockNumber, } vals = append(vals, l) } @@ -273,7 +273,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId), TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber), TransactionHash: r.TxHash.Hex(), - Confirmations: end - r.BlockNumber, + Confirmations: end.BlockNumber - r.BlockNumber, } vals = append(vals, l) } @@ -288,7 +288,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId), TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber), TransactionHash: r.TxHash.Hex(), - Confirmations: end - r.BlockNumber, + Confirmations: end.BlockNumber - r.BlockNumber, } vals = append(vals, l) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm20/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm20/registry.go index 49cab7b5a48..2d49a91e98f 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm20/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm20/registry.go @@ -347,7 +347,7 @@ func (r *EvmRegistry) initialize() error { func (r *EvmRegistry) pollLogs() error { var latest int64 - var end int64 + var end logpoller.LogPollerBlock var err error if end, err = r.poller.LatestBlock(pg.WithParentCtx(r.ctx)); err != nil { @@ -356,11 +356,11 @@ func (r *EvmRegistry) pollLogs() error { r.mu.Lock() latest = r.lastPollBlock - r.lastPollBlock = end + r.lastPollBlock = end.BlockNumber r.mu.Unlock() // if start and end are the same, no polling needs to be done - if latest == 0 || latest == end { + if latest == 0 || latest == end.BlockNumber { return nil } @@ -368,8 +368,8 @@ func (r *EvmRegistry) pollLogs() error { var logs []logpoller.Log if logs, err = r.poller.LogsWithSigs( - end-logEventLookback, - end, + end.BlockNumber-logEventLookback, + end.BlockNumber, upkeepStateEvents, r.addr, pg.WithParentCtx(r.ctx), diff --git a/core/services/ocr2/plugins/ocr2keeper/evm20/registry_test.go b/core/services/ocr2/plugins/ocr2keeper/evm20/registry_test.go index 348b5a47c0f..8662bfd0475 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm20/registry_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm20/registry_test.go @@ -189,7 +189,7 @@ func TestPollLogs(t *testing.T) { if test.LatestBlock != nil { mp.On("LatestBlock", mock.Anything). - Return(test.LatestBlock.OutputBlock, test.LatestBlock.OutputErr) + Return(logpoller.LogPollerBlock{BlockNumber: test.LatestBlock.OutputBlock}, test.LatestBlock.OutputErr) } if test.LogsWithSigs != nil { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go index 2d524e6f6cd..d97156ed180 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go @@ -79,12 +79,13 @@ func (bs *BlockSubscriber) getBlockRange(ctx context.Context) ([]uint64, error) if err != nil { return nil, err } - bs.lggr.Infof("latest block from log poller is %d", h) + latestBlockNumber := h.BlockNumber + bs.lggr.Infof("latest block from log poller is %d", latestBlockNumber) var blocks []uint64 for i := bs.blockSize - 1; i >= 0; i-- { - if h-i > 0 { - blocks = append(blocks, uint64(h-i)) + if latestBlockNumber-i > 0 { + blocks = append(blocks, uint64(latestBlockNumber-i)) } } return blocks, nil diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go index 618ea83d4e9..004b5fac6cc 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go @@ -97,7 +97,7 @@ func TestBlockSubscriber_GetBlockRange(t *testing.T) { for _, tc := range tests { t.Run(tc.Name, func(t *testing.T) { lp := new(mocks.LogPoller) - lp.On("LatestBlock", mock.Anything).Return(tc.LatestBlock, tc.LatestBlockErr) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.LatestBlock}, tc.LatestBlockErr) bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize @@ -278,7 +278,7 @@ func TestBlockSubscriber_Start(t *testing.T) { hb := commonmocks.NewHeadBroadcaster[*evmtypes.Head, common.Hash](t) hb.On("Subscribe", mock.Anything).Return(&evmtypes.Head{Number: 42}, func() {}) lp := new(mocks.LogPoller) - lp.On("LatestBlock", mock.Anything).Return(int64(100), nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) blocks := []uint64{97, 98, 99, 100} pollerBlocks := []logpoller.LogPollerBlock{ { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go index 9fc35dd84be..814ed29d900 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go @@ -34,10 +34,11 @@ func (r *blockTimeResolver) BlockTime(ctx context.Context, blockSampleSize int64 if err != nil { return 0, fmt.Errorf("failed to get latest block from poller: %w", err) } - if latest <= blockSampleSize { + latestBlockNumber := latest.BlockNumber + if latestBlockNumber <= blockSampleSize { return defaultBlockTime, nil } - start, end := latest-blockSampleSize, latest + start, end := latestBlockNumber-blockSampleSize, latestBlockNumber startTime, endTime, err := r.getSampleTimestamps(ctx, uint64(start), uint64(end)) if err != nil { return 0, err diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time_test.go index 0ad9990e185..7009cfaa9b2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time_test.go @@ -69,7 +69,7 @@ func TestBlockTimeResolver_BlockTime(t *testing.T) { lp := new(lpmocks.LogPoller) resolver := newBlockTimeResolver(lp) - lp.On("LatestBlock", mock.Anything).Return(tc.latestBlock, tc.latestBlockErr) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.latestBlock}, tc.latestBlockErr) lp.On("GetBlocksRange", mock.Anything, mock.Anything).Return(tc.blocksRange, tc.blocksRangeErr) blockTime, err := resolver.BlockTime(ctx, tc.blockSampleSize) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go index 811468746e3..dad35420398 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go @@ -317,7 +317,7 @@ func TestIntegration_LogEventProvider_RateLimit(t *testing.T) { var minimumBlockCount int64 = 500 latestBlock, _ := lp.LatestBlock() - assert.GreaterOrEqual(t, latestBlock, minimumBlockCount, "to ensure the integrety of the test, the minimum block count before the test should be %d but got %d", minimumBlockCount, latestBlock) + assert.GreaterOrEqual(t, latestBlock.BlockNumber, minimumBlockCount, "to ensure the integrety of the test, the minimum block count before the test should be %d but got %d", minimumBlockCount, latestBlock) } require.NoError(t, logProvider.ReadLogs(ctx, ids...)) @@ -564,7 +564,7 @@ func waitLogPoller(ctx context.Context, t *testing.T, backend *backends.Simulate for { latestPolled, lberr := lp.LatestBlock(pg.WithParentCtx(ctx)) require.NoError(t, lberr) - if latestPolled >= latestBlock { + if latestPolled.BlockNumber >= latestBlock { break } lp.PollAndSaveLogs(ctx, latestBlock) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index 729bf4ade5f..349db2902b6 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -161,11 +161,11 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers if err != nil { return nil, fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) } - start := latest - p.opts.LookbackBlocks + start := latest.BlockNumber - p.opts.LookbackBlocks if start <= 0 { start = 1 } - logs := p.buffer.dequeueRange(start, latest, AllowedLogsPerUpkeep, MaxPayloads) + logs := p.buffer.dequeueRange(start, latest.BlockNumber, AllowedLogsPerUpkeep, MaxPayloads) // p.lggr.Debugw("got latest logs from buffer", "latest", latest, "diff", diff, "logs", len(logs)) @@ -199,12 +199,12 @@ func (p *logEventProvider) ReadLogs(pctx context.Context, ids ...*big.Int) error if err != nil { return fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) } - if latest == 0 { + if latest.BlockNumber == 0 { return fmt.Errorf("%w: %s", ErrHeadNotAvailable, "latest block is 0") } - filters := p.getFilters(latest, ids...) + filters := p.getFilters(latest.BlockNumber, ids...) - err = p.readLogs(ctx, latest, filters) + err = p.readLogs(ctx, latest.BlockNumber, filters) p.updateFiltersLastPoll(filters) // p.lggr.Debugw("read logs for entries", "latestBlock", latest, "entries", len(entries), "err", err) if err != nil { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go index ab816adb1b3..69a4872351d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go @@ -128,7 +128,7 @@ func (p *logEventProvider) register(ctx context.Context, lpFilter logpoller.Filt // already registered in DB before, no need to backfill return nil } - backfillBlock := latest - int64(LogBackfillBuffer) + backfillBlock := latest.BlockNumber - int64(LogBackfillBuffer) if backfillBlock < 1 { // New chain, backfill from start backfillBlock = 1 diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go index 4b1ff06f316..03395cb5b5f 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/core" @@ -109,7 +110,7 @@ func TestLogEventProvider_LifeCycle(t *testing.T) { lp := new(mocks.LogPoller) lp.On("RegisterFilter", mock.Anything).Return(nil) lp.On("UnregisterFilter", mock.Anything).Return(nil) - lp.On("LatestBlock", mock.Anything).Return(int64(0), nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{}, nil) hasFitlerTimes := 1 if tc.unregister { hasFitlerTimes = 2 @@ -149,7 +150,7 @@ func TestEventLogProvider_RefreshActiveUpkeeps(t *testing.T) { mp.On("RegisterFilter", mock.Anything).Return(nil) mp.On("UnregisterFilter", mock.Anything).Return(nil) mp.On("HasFilter", mock.Anything).Return(false) - mp.On("LatestBlock", mock.Anything).Return(int64(0), nil) + mp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{}, nil) mp.On("ReplayAsync", mock.Anything).Return(nil) p := NewLogProvider(logger.TestLogger(t), mp, &mockedPacker{}, NewUpkeepFilterStore(), NewOptions(200)) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go index db22886cbb7..a8e33ba23b7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_test.go @@ -248,7 +248,7 @@ func TestLogEventProvider_ReadLogs(t *testing.T) { mp.On("ReplayAsync", mock.Anything).Return() mp.On("HasFilter", mock.Anything).Return(false) mp.On("UnregisterFilter", mock.Anything, mock.Anything).Return(nil) - mp.On("LatestBlock", mock.Anything).Return(int64(1), nil) + mp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: int64(1)}, nil) mp.On("LogsWithSigs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{ { BlockNumber: 1, diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index b74160ae91c..d6e7ad51d13 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -208,7 +208,7 @@ func (r *logRecoverer) getLogTriggerCheckData(ctx context.Context, proposal ocr2 return nil, err } - start, offsetBlock := r.getRecoveryWindow(latest) + start, offsetBlock := r.getRecoveryWindow(latest.BlockNumber) if proposal.Trigger.LogTriggerExtension == nil { return nil, errors.New("missing log trigger extension") } @@ -297,7 +297,7 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. allLogsCounter := 0 logsCount := map[string]int{} - r.sortPending(uint64(latestBlock)) + r.sortPending(uint64(latestBlock.BlockNumber)) var results, pending []ocr2keepers.UpkeepPayload for _, payload := range r.pending { @@ -330,7 +330,7 @@ func (r *logRecoverer) recover(ctx context.Context) error { return fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) } - start, offsetBlock := r.getRecoveryWindow(latest) + start, offsetBlock := r.getRecoveryWindow(latest.BlockNumber) if offsetBlock < 0 { // too soon to recover, we don't have enough blocks return nil @@ -611,7 +611,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { return fmt.Errorf("failed to get states: %w", err) } lggr := r.lggr.With("where", "clean") - start, _ := r.getRecoveryWindow(latestBlock) + start, _ := r.getRecoveryWindow(latestBlock.BlockNumber) r.lock.Lock() defer r.lock.Unlock() var removed int diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go index 2fdf04f76c7..c882a22bc1a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go @@ -32,7 +32,7 @@ func TestLogRecoverer_GetRecoverables(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() lp := &lpmocks.LogPoller{} - lp.On("LatestBlock", mock.Anything).Return(int64(100), nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) r := NewLogRecoverer(logger.TestLogger(t), lp, nil, nil, nil, nil, NewOptions(200)) tests := []struct { @@ -182,7 +182,7 @@ func TestLogRecoverer_Clean(t *testing.T) { start, _ := r.getRecoveryWindow(0) block24h := int64(math.Abs(float64(start))) - lp.On("LatestBlock", mock.Anything).Return(block24h+oldLogsOffset, nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: block24h + oldLogsOffset}, nil) statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, nil) r.lock.Lock() @@ -423,7 +423,7 @@ func TestLogRecoverer_Recover(t *testing.T) { recoverer, filterStore, lp, statesReader := setupTestRecoverer(t, time.Millisecond*50, lookbackBlocks) filterStore.AddActiveUpkeeps(tc.active...) - lp.On("LatestBlock", mock.Anything).Return(tc.latestBlock, tc.latestBlockErr) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.latestBlock}, tc.latestBlockErr) lp.On("LogsWithSigs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.logs, tc.logsErr) statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, tc.statesErr) @@ -1206,8 +1206,9 @@ type mockLogPoller struct { func (p *mockLogPoller) LogsWithSigs(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) { return p.LogsWithSigsFn(start, end, eventSigs, address, qopts...) } -func (p *mockLogPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) { - return p.LatestBlockFn(qopts...) +func (p *mockLogPoller) LatestBlock(qopts ...pg.QOpt) (logpoller.LogPollerBlock, error) { + block, err := p.LatestBlockFn(qopts...) + return logpoller.LogPollerBlock{BlockNumber: block}, err } type mockClient struct { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index 5d180c05b80..0ca20477f20 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -362,7 +362,7 @@ func (r *EvmRegistry) refreshLogTriggerUpkeepsBatch(logTriggerIDs []*big.Int) er func (r *EvmRegistry) pollUpkeepStateLogs() error { var latest int64 - var end int64 + var end logpoller.LogPollerBlock var err error if end, err = r.poller.LatestBlock(pg.WithParentCtx(r.ctx)); err != nil { @@ -371,18 +371,18 @@ func (r *EvmRegistry) pollUpkeepStateLogs() error { r.mu.Lock() latest = r.lastPollBlock - r.lastPollBlock = end + r.lastPollBlock = end.BlockNumber r.mu.Unlock() // if start and end are the same, no polling needs to be done - if latest == 0 || latest == end { + if latest == 0 || latest == end.BlockNumber { return nil } var logs []logpoller.Log if logs, err = r.poller.LogsWithSigs( - end-logEventLookback, - end, + end.BlockNumber-logEventLookback, + end.BlockNumber, upkeepStateEvents, r.addr, pg.WithParentCtx(r.ctx), diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go index 0cd5ecd2592..4be0ccce4e9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go @@ -145,7 +145,7 @@ func TestPollLogs(t *testing.T) { if test.LatestBlock != nil { mp.On("LatestBlock", mock.Anything). - Return(test.LatestBlock.OutputBlock, test.LatestBlock.OutputErr) + Return(logpoller.LogPollerBlock{BlockNumber: test.LatestBlock.OutputBlock}, test.LatestBlock.OutputErr) } if test.LogsWithSigs != nil { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go index 5fd320df8ed..8f84ca1495c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go @@ -141,8 +141,8 @@ func (c *EventProvider) GetLatestEvents(ctx context.Context) ([]ocr2keepers.Tran // always check the last lookback number of blocks and rebroadcast // this allows the plugin to make decisions based on event confirmations logs, err := c.logPoller.LogsWithSigs( - end-c.lookbackBlocks, - end, + end.BlockNumber-c.lookbackBlocks, + end.BlockNumber, []common.Hash{ iregistry21.IKeeperRegistryMasterUpkeepPerformed{}.Topic(), iregistry21.IKeeperRegistryMasterStaleUpkeepReport{}.Topic(), @@ -156,7 +156,7 @@ func (c *EventProvider) GetLatestEvents(ctx context.Context) ([]ocr2keepers.Tran return nil, fmt.Errorf("%w: failed to collect logs from log poller", err) } - return c.processLogs(end, logs...) + return c.processLogs(end.BlockNumber, logs...) } // processLogs will parse the unseen logs and return the corresponding transmit events. diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider_test.go index 72f3b63088d..58e95bc423e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider_test.go @@ -89,7 +89,7 @@ func TestTransmitEventProvider_Sanity(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - lp.On("LatestBlock", mock.Anything).Return(tc.latestBlock, nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: tc.latestBlock}, nil) lp.On("LogsWithSigs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.logs, nil) res, err := provider.GetLatestEvents(ctx) diff --git a/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator.go b/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator.go index e51b68f415d..1b58a017322 100644 --- a/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator.go +++ b/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator.go @@ -227,7 +227,7 @@ func (c *coordinator) CurrentChainHeight(ctx context.Context) (uint64, error) { if err != nil { return 0, err } - return uint64(head), nil + return uint64(head.BlockNumber), nil } // ReportIsOnchain returns true iff a report for the given OCR epoch/round is diff --git a/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator_test.go b/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator_test.go index 26d0f2996a9..dc489b4958a 100644 --- a/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator_test.go +++ b/core/services/ocr2/plugins/ocr2vrf/coordinator/coordinator_test.go @@ -1032,7 +1032,7 @@ func TestCoordinator_ReportBlocks(t *testing.T) { requestedBlocks := []uint64{195, 196} lp := lp_mocks.NewLogPoller(t) lp.On("LatestBlock", mock.Anything). - Return(int64(latestHeadNumber), nil) + Return(logpoller.LogPollerBlock{BlockNumber: int64(latestHeadNumber)}, nil) lp.On("GetBlocksRange", mock.Anything, append(requestedBlocks, uint64(latestHeadNumber-lookbackBlocks+1), uint64(latestHeadNumber)), mock.Anything). Return(nil, errors.New("GetBlocks error")) @@ -1720,7 +1720,7 @@ func getLogPoller( lp := lp_mocks.NewLogPoller(t) if needsLatestBlock { lp.On("LatestBlock", mock.Anything). - Return(int64(latestHeadNumber), nil) + Return(logpoller.LogPollerBlock{BlockNumber: int64(latestHeadNumber)}, nil) } var logPollerBlocks []logpoller.LogPollerBlock diff --git a/core/services/relay/evm/config_poller.go b/core/services/relay/evm/config_poller.go index 1cf2318b296..daccf400ea7 100644 --- a/core/services/relay/evm/config_poller.go +++ b/core/services/relay/evm/config_poller.go @@ -212,7 +212,7 @@ func (cp *configPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint } return 0, err } - return uint64(latest), nil + return uint64(latest.BlockNumber), nil } func (cp *configPoller) isConfigStoreAvailable() bool { diff --git a/core/services/relay/evm/functions/config_poller.go b/core/services/relay/evm/functions/config_poller.go index f068f13cc77..7a59d499898 100644 --- a/core/services/relay/evm/functions/config_poller.go +++ b/core/services/relay/evm/functions/config_poller.go @@ -181,7 +181,7 @@ func (cp *configPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint } return 0, err } - return uint64(latest), nil + return uint64(latest.BlockNumber), nil } // called from LogPollerWrapper in a separate goroutine diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index 777717d01c7..d355bd6569b 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -77,7 +77,7 @@ func (l *logPollerWrapper) Start(context.Context) error { l.lggr.Errorw("LogPollerWrapper: LatestBlock() failed, starting from 0", "error", err) } else { l.lggr.Debugw("LogPollerWrapper: LatestBlock() got starting block", "block", nextBlock) - l.nextBlock = nextBlock - l.blockOffset + l.nextBlock = nextBlock.BlockNumber - l.blockOffset } l.closeWait.Add(1) go l.checkForRouteUpdates() @@ -123,9 +123,10 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR l.mu.Unlock() return nil, nil, err } - latest -= l.blockOffset - if latest >= nextBlock { - l.nextBlock = latest + 1 + latestBlockNumber := latest.BlockNumber + latestBlockNumber -= l.blockOffset + if latestBlockNumber >= nextBlock { + l.nextBlock = latestBlockNumber + 1 } l.mu.Unlock() @@ -136,18 +137,18 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR l.lggr.Debug("LatestEvents: no non-zero coordinators to check") return resultsReq, resultsResp, errors.New("no non-zero coordinators to check") } - if latest < nextBlock { + if latestBlockNumber < nextBlock { l.lggr.Debugw("LatestEvents: no new blocks to check", "latest", latest, "nextBlock", nextBlock) return resultsReq, resultsResp, nil } for _, coordinator := range coordinators { - requestLogs, err := l.logPoller.Logs(nextBlock, latest, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), coordinator) + requestLogs, err := l.logPoller.Logs(nextBlock, latestBlockNumber, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), coordinator) if err != nil { l.lggr.Errorw("LatestEvents: fetching request logs from LogPoller failed", "latest", latest, "nextBlock", nextBlock) return nil, nil, err } - responseLogs, err := l.logPoller.Logs(nextBlock, latest, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), coordinator) + responseLogs, err := l.logPoller.Logs(nextBlock, latestBlockNumber, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), coordinator) if err != nil { l.lggr.Errorw("LatestEvents: fetching response logs from LogPoller failed", "latest", latest, "nextBlock", nextBlock) return nil, nil, err diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index 224aa51a5da..c91c3c49aad 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -60,7 +60,7 @@ func setUp(t *testing.T, updateFrequencySec uint32) (*lpmocks.LogPoller, types.L lpWrapper, err := functions.NewLogPollerWrapper(gethcommon.Address{}, config, client, lp, lggr) require.NoError(t, err) - lp.On("LatestBlock").Return(int64(100), nil) + lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) return lp, lpWrapper, client } diff --git a/core/services/relay/evm/mercury/config_poller.go b/core/services/relay/evm/mercury/config_poller.go index 2f16157bfac..8964a283049 100644 --- a/core/services/relay/evm/mercury/config_poller.go +++ b/core/services/relay/evm/mercury/config_poller.go @@ -188,7 +188,7 @@ func (cp *ConfigPoller) LatestBlockHeight(ctx context.Context) (blockHeight uint } return 0, err } - return uint64(latest), nil + return uint64(latest.BlockNumber), nil } func (cp *ConfigPoller) startLogSubscription() {