diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go index 87b46c9785b..9766d988767 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go @@ -50,12 +50,13 @@ type BlockSubscriber struct { latestBlock atomic.Pointer[ocr2keepers.BlockKey] blockHistorySize int64 blockSize int64 + finalityDepth uint32 lggr logger.Logger } var _ ocr2keepers.BlockSubscriber = &BlockSubscriber{} -func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr logger.Logger) *BlockSubscriber { +func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, finalityDepth uint32, lggr logger.Logger) *BlockSubscriber { return &BlockSubscriber{ threadCtrl: utils.NewThreadControl(), hb: hb, @@ -65,6 +66,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr blocks: map[int64]string{}, blockHistorySize: blockHistorySize, blockSize: lookbackDepth, + finalityDepth: finalityDepth, latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{}, lggr: lggr.Named("BlockSubscriber"), } @@ -226,10 +228,13 @@ func (bs *BlockSubscriber) processHead(h *evmtypes.Head) { // head parent is a linked list with EVM finality depth // when re-org happens, new heads will have pointers to the new blocks i := int64(0) - for cp := h; ; cp = cp.Parent { - if cp == nil || bs.blocks[cp.Number] == cp.Hash.Hex() { - break - } + for cp := h; cp != nil; cp = cp.Parent { + // we don't stop when a matching (block number/hash) entry is seen in the map because parent linked list may be + // cut short during a re-org if head broadcaster backfill is not complete. This can cause some re-orged blocks + // left in the map. for example, re-org happens for block 98, 99, 100. next head 101 from broadcaster has parent list + // of 100, so block 100 and 101 are updated. when next head 102 arrives, it has full parent history of finality depth. + // if we stop when we see a block number/hash match, we won't look back and correct block 98 and 99. + // hence, we make a compromise here and check previous max(finality depth, blockSize) blocks and update the map. existingHash, ok := bs.blocks[cp.Number] if !ok { bs.lggr.Debugf("filling block %d with new hash %s", cp.Number, cp.Hash.Hex()) @@ -238,7 +243,7 @@ func (bs *BlockSubscriber) processHead(h *evmtypes.Head) { } bs.blocks[cp.Number] = cp.Hash.Hex() i++ - if i > bs.blockSize { + if i > int64(bs.finalityDepth) || i > bs.blockSize { break } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go index 23fcf3f6695..618ea83d4e9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber_test.go @@ -22,13 +22,14 @@ import ( const historySize = 4 const blockSize = int64(4) +const finality = uint32(4) func TestBlockSubscriber_Subscribe(t *testing.T) { lggr := logger.TestLogger(t) var hb types.HeadBroadcaster var lp logpoller.LogPoller - bs := NewBlockSubscriber(hb, lp, lggr) + bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize subId, _, err := bs.Subscribe() @@ -47,7 +48,7 @@ func TestBlockSubscriber_Unsubscribe(t *testing.T) { var hb types.HeadBroadcaster var lp logpoller.LogPoller - bs := NewBlockSubscriber(hb, lp, lggr) + bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize subId, _, err := bs.Subscribe() @@ -65,7 +66,7 @@ func TestBlockSubscriber_Unsubscribe_Failure(t *testing.T) { var hb types.HeadBroadcaster var lp logpoller.LogPoller - bs := NewBlockSubscriber(hb, lp, lggr) + bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize err := bs.Unsubscribe(2) @@ -97,7 +98,7 @@ func TestBlockSubscriber_GetBlockRange(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { lp := new(mocks.LogPoller) lp.On("LatestBlock", mock.Anything).Return(tc.LatestBlock, tc.LatestBlockErr) - bs := NewBlockSubscriber(hb, lp, lggr) + bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize blocks, err := bs.getBlockRange(testutils.Context(t)) @@ -155,7 +156,7 @@ func TestBlockSubscriber_InitializeBlocks(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { lp := new(mocks.LogPoller) lp.On("GetBlocksRange", mock.Anything, tc.Blocks, mock.Anything).Return(tc.PollerBlocks, tc.Error) - bs := NewBlockSubscriber(hb, lp, lggr) + bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize err := bs.initializeBlocks(testutils.Context(t), tc.Blocks) @@ -213,7 +214,7 @@ func TestBlockSubscriber_BuildHistory(t *testing.T) { for _, tc := range tests { t.Run(tc.Name, func(t *testing.T) { - bs := NewBlockSubscriber(hb, lp, lggr) + bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize bs.blocks = tc.Blocks @@ -258,7 +259,7 @@ func TestBlockSubscriber_Cleanup(t *testing.T) { for _, tc := range tests { t.Run(tc.Name, func(t *testing.T) { - bs := NewBlockSubscriber(hb, lp, lggr) + bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize bs.blocks = tc.Blocks @@ -300,7 +301,7 @@ func TestBlockSubscriber_Start(t *testing.T) { lp.On("GetBlocksRange", mock.Anything, blocks, mock.Anything).Return(pollerBlocks, nil) - bs := NewBlockSubscriber(hb, lp, lggr) + bs := NewBlockSubscriber(hb, lp, finality, lggr) bs.blockHistorySize = historySize bs.blockSize = blockSize err := bs.Start(context.Background()) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go index b9b04fabe43..d89ae8589ac 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go @@ -99,20 +99,21 @@ func (r *EvmRegistry) getBlockHash(blockNumber *big.Int) (common.Hash, error) { // verifyCheckBlock checks that the check block and hash are valid, returns the pipeline execution state and retryable func (r *EvmRegistry) verifyCheckBlock(_ context.Context, checkBlock, upkeepId *big.Int, checkHash common.Hash) (state encoding.PipelineExecutionState, retryable bool) { - var h string - var ok bool // verify check block number and hash are valid - h, ok = r.bs.queryBlocksMap(checkBlock.Int64()) - if !ok { - r.lggr.Warnf("check block %s does not exist in block subscriber for upkeepId %s, querying eth client", checkBlock, upkeepId) - b, err := r.getBlockHash(checkBlock) - if err != nil { - r.lggr.Warnf("failed to query block %s: %s", checkBlock, err.Error()) - return encoding.RpcFlakyFailure, true - } - h = b.Hex() + h, ok := r.bs.queryBlocksMap(checkBlock.Int64()) + // if this block number/hash combo exists in block subscriber, this check block and hash still exist on chain and are valid + // the block hash in block subscriber might be slightly outdated, if it doesn't match then we fetch the latest from RPC. + if ok && h == checkHash.Hex() { + r.lggr.Debugf("check block hash %s exists on chain at block number %d for upkeepId %s", checkHash.Hex(), checkBlock, upkeepId) + return encoding.NoPipelineError, false + } + r.lggr.Warnf("check block %s does not exist in block subscriber or hash does not match for upkeepId %s. this may be caused by block subscriber outdated due to re-org, querying eth client to confirm", checkBlock, upkeepId) + b, err := r.getBlockHash(checkBlock) + if err != nil { + r.lggr.Warnf("failed to query block %s: %s", checkBlock, err.Error()) + return encoding.RpcFlakyFailure, true } - if checkHash.Hex() != h { + if checkHash.Hex() != b.Hex() { r.lggr.Warnf("check block %s hash do not match. %s from block subscriber vs %s from trigger for upkeepId %s", checkBlock, h, checkHash.Hex(), upkeepId) return encoding.CheckBlockInvalid, false } @@ -126,11 +127,15 @@ func (r *EvmRegistry) verifyLogExists(upkeepId *big.Int, p ocr2keepers.UpkeepPay // if log block number is populated, check log block number and block hash if logBlockNumber != 0 { h, ok := r.bs.queryBlocksMap(logBlockNumber) + // if this block number/hash combo exists in block subscriber, this block and tx still exists on chain and is valid + // the block hash in block subscriber might be slightly outdated, if it doesn't match then we fetch the latest from RPC. if ok && h == logBlockHash.Hex() { r.lggr.Debugf("tx hash %s exists on chain at block number %d, block hash %s for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), logBlockHash.Hex(), logBlockNumber, upkeepId) return encoding.UpkeepFailureReasonNone, encoding.NoPipelineError, false } - r.lggr.Debugf("log block %d does not exist in block subscriber for upkeepId %s, querying eth client", logBlockNumber, upkeepId) + // if this block does not exist in the block subscriber, the block which this log lived on was probably re-orged + // hence, check eth client for this log's tx hash to confirm + r.lggr.Debugf("log block %d does not exist in block subscriber or block hash does not match for upkeepId %s. this may be caused by block subscriber outdated due to re-org, querying eth client to confirm", logBlockNumber, upkeepId) } else { r.lggr.Debugf("log block not provided, querying eth client for tx hash %s for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline_test.go index 152c912f70f..cdb5e50b5b1 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline_test.go @@ -144,6 +144,15 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) { Trigger: ocr2keepers.NewTrigger(500, common.HexToHash("0x5bff03de234fe771ac0d685f9ee0fb0b757ea02ec9e6f10e8e2ee806db1b6b83")), WorkID: "work", }, + poller: &mockLogPoller{ + GetBlocksRangeFn: func(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]logpoller.LogPollerBlock, error) { + return []logpoller.LogPollerBlock{ + { + BlockHash: common.HexToHash("0xcba5cf9e2bb32373c76015384e1098912d9510a72481c78057fcb088209167de"), + }, + }, nil + }, + }, blocks: map[int64]string{ 500: "0xa518faeadcc423338c62572da84dda35fe44b34f521ce88f6081b703b250cca4", }, @@ -388,6 +397,7 @@ func TestRegistry_CheckUpkeeps(t *testing.T) { err error ethCalls map[string]bool receipts map[string]*types.Receipt + poller logpoller.LogPoller ethCallErrors map[string]error }{ { @@ -463,6 +473,15 @@ func TestRegistry_CheckUpkeeps(t *testing.T) { uid1.String(): true, }, receipts: map[string]*types.Receipt{}, + poller: &mockLogPoller{ + GetBlocksRangeFn: func(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]logpoller.LogPollerBlock, error) { + return []logpoller.LogPollerBlock{ + { + BlockHash: common.HexToHash("0xcba5cf9e2bb32373c76015384e1098912d9510a72481c78057fcb088209167de"), + }, + }, nil + }, + }, ethCallErrors: map[string]error{ uid1.String(): fmt.Errorf("error"), }, @@ -477,8 +496,9 @@ func TestRegistry_CheckUpkeeps(t *testing.T) { } bs.latestBlock.Store(&tc.latestBlock) e := &EvmRegistry{ - lggr: lggr, - bs: bs, + lggr: lggr, + bs: bs, + poller: tc.poller, } client := new(evmClientMocks.Client) for _, i := range tc.inputs { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go index 115336745b5..b6b123de485 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_test.go @@ -501,7 +501,7 @@ func TestRegistry_refreshLogTriggerUpkeeps(t *testing.T) { var hb types3.HeadBroadcaster var lp logpoller.LogPoller - bs := NewBlockSubscriber(hb, lp, lggr) + bs := NewBlockSubscriber(hb, lp, 1000, lggr) registry := &EvmRegistry{ addr: common.BigToAddress(big.NewInt(1)), diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/services.go b/core/services/ocr2/plugins/ocr2keeper/evm21/services.go index 91479b5e619..79a9da0c68c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/services.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/services.go @@ -78,7 +78,7 @@ func New(addr common.Address, client evm.Chain, mc *models.MercuryCredentials, k logProvider, logRecoverer := logprovider.New(lggr, client.LogPoller(), client.Client(), utilsABI, services.upkeepState, finalityDepth) services.logProvider = logProvider services.logRecoverer = logRecoverer - services.blockSub = NewBlockSubscriber(client.HeadBroadcaster(), client.LogPoller(), lggr) + services.blockSub = NewBlockSubscriber(client.HeadBroadcaster(), client.LogPoller(), finalityDepth, lggr) services.keyring = NewOnchainKeyringV3Wrapper(keyring)