diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go index db50b322147..bcac214d9f2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry_check_pipeline.go @@ -19,6 +19,9 @@ const ( // checkBlockTooOldRange is the number of blocks that can be behind the latest block before // we return a CheckBlockTooOld error checkBlockTooOldRange = 128 + // block range till which a checkBlock is considered fresh where certain RPC issues are treated + // as retryable as they might have flaky race conditions + freshCheckBlockRange = 10 ) type checkResult struct { @@ -31,6 +34,9 @@ func (r *EvmRegistry) CheckUpkeeps(ctx context.Context, keys ...ocr2keepers.Upke for i := range keys { if keys[i].Trigger.BlockNumber == 0 { // check block was not populated, use latest latest := r.bs.latestBlock.Load() + if latest == nil { + return nil, fmt.Errorf("no latest block available") + } copy(keys[i].Trigger.BlockHash[:], latest.Hash[:]) keys[i].Trigger.BlockNumber = latest.Number r.lggr.Debugf("Check upkeep key had no trigger block number, using latest block %v", keys[i].Trigger.BlockNumber) @@ -139,23 +145,43 @@ func (r *EvmRegistry) verifyLogExists(upkeepId *big.Int, p ocr2keepers.UpkeepPay } else { r.lggr.Debugf("log block not provided, querying eth client for tx hash %s for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId) } + + latestBlock := r.bs.latestBlock.Load() + freshCheckBlock := latestBlock != nil && latestBlock.Number-p.Trigger.BlockNumber <= freshCheckBlockRange // query eth client as a fallback bn, bh, err := core.GetTxBlock(r.ctx, r.client, p.Trigger.LogTriggerExtension.TxHash) if err != nil { + r.lggr.Warnf("failed to query tx hash %s for upkeepId %s: %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId, err.Error()) // primitive way of checking errors if strings.Contains(err.Error(), "missing required field") || strings.Contains(err.Error(), "not found") { - return encoding.UpkeepFailureReasonTxHashNoLongerExists, encoding.NoPipelineError, false + // Tx hash was not found on chain, in case of freshCheckBlock it might be a flaky race condition + // otherwise return ineligible upkeep response + if freshCheckBlock { + return encoding.UpkeepFailureReasonNone, encoding.RpcFlakyFailure, true + } else { + return encoding.UpkeepFailureReasonTxHashNoLongerExists, encoding.NoPipelineError, false + } } - r.lggr.Warnf("failed to query tx hash %s for upkeepId %s: %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId, err.Error()) + // All other errors are considered as flaky RPC errors return encoding.UpkeepFailureReasonNone, encoding.RpcFlakyFailure, true } if bn == nil { - r.lggr.Warnf("tx hash %s does not exist on chain for upkeepId %s.", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId) - return encoding.UpkeepFailureReasonTxHashNoLongerExists, encoding.NoPipelineError, false + // Empty receipt was returned, which indicates that the tx hash no longer exists on chain + // in case of freshCheckBlock it might be a flaky race condition + if freshCheckBlock { + return encoding.UpkeepFailureReasonNone, encoding.RpcFlakyFailure, true + } else { + r.lggr.Warnf("tx hash %s does not exist on chain for upkeepId %s.", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId) + return encoding.UpkeepFailureReasonTxHashNoLongerExists, encoding.NoPipelineError, false + } } if bh.Hex() != logBlockHash.Hex() { - r.lggr.Warnf("tx hash %s reorged from expected blockhash %s to %s for upkeepId %s.", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), logBlockHash.Hex(), bh.Hex(), upkeepId) - return encoding.UpkeepFailureReasonTxHashReorged, encoding.NoPipelineError, false + if freshCheckBlock { + return encoding.UpkeepFailureReasonNone, encoding.RpcFlakyFailure, true + } else { + r.lggr.Warnf("tx hash %s reorged from expected blockhash %s to %s for upkeepId %s.", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), logBlockHash.Hex(), bh.Hex(), upkeepId) + return encoding.UpkeepFailureReasonTxHashReorged, encoding.NoPipelineError, false + } } r.lggr.Debugf("tx hash %s exists on chain for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId) return encoding.UpkeepFailureReasonNone, encoding.NoPipelineError, false @@ -243,12 +269,16 @@ func (r *EvmRegistry) checkUpkeeps(ctx context.Context, payloads []ocr2keepers.U index := indices[i] if req.Error != nil { latestBlock := r.bs.latestBlock.Load() + latestBlockNumber := int64(0) + if latestBlock != nil { + latestBlockNumber = int64(latestBlock.Number) + } checkBlock, _, _ := r.getBlockAndUpkeepId(payloads[index].UpkeepID, payloads[index].Trigger) // Exploratory: remove reliance on primitive way of checking errors blockNotFound := (strings.Contains(req.Error.Error(), "header not found") || strings.Contains(req.Error.Error(), "missing trie node")) - if blockNotFound && int64(latestBlock.Number)-checkBlock.Int64() > checkBlockTooOldRange { + if blockNotFound && latestBlockNumber-checkBlock.Int64() > checkBlockTooOldRange { // Check block not found in RPC and it is too old, non-retryable error - r.lggr.Warnf("block not found error encountered in check result for upkeepId %s, check block %d, latest block %d: %s", results[index].UpkeepID.String(), checkBlock.Int64(), int64(latestBlock.Number), req.Error) + r.lggr.Warnf("block not found error encountered in check result for upkeepId %s, check block %d, latest block %d: %s", results[index].UpkeepID.String(), checkBlock.Int64(), latestBlockNumber, req.Error) results[index].Retryable = false results[index].PipelineExecutionState = uint8(encoding.CheckBlockTooOld) } else {