Skip to content

Commit

Permalink
Handle racy edge cases in tx hash verification
Browse files Browse the repository at this point in the history
  • Loading branch information
infiloop2 committed Sep 24, 2023
1 parent 57dca4d commit 79b956d
Showing 1 changed file with 38 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const (
// checkBlockTooOldRange is the number of blocks that can be behind the latest block before
// we return a CheckBlockTooOld error
checkBlockTooOldRange = 128
// block range till which a checkBlock is considered fresh where certain RPC issues are treated
// as retryable as they might have flaky race conditions
freshCheckBlockRange = 10
)

type checkResult struct {
Expand All @@ -31,6 +34,9 @@ func (r *EvmRegistry) CheckUpkeeps(ctx context.Context, keys ...ocr2keepers.Upke
for i := range keys {
if keys[i].Trigger.BlockNumber == 0 { // check block was not populated, use latest
latest := r.bs.latestBlock.Load()
if latest == nil {
return nil, fmt.Errorf("no latest block available")
}
copy(keys[i].Trigger.BlockHash[:], latest.Hash[:])
keys[i].Trigger.BlockNumber = latest.Number
r.lggr.Debugf("Check upkeep key had no trigger block number, using latest block %v", keys[i].Trigger.BlockNumber)
Expand Down Expand Up @@ -139,23 +145,43 @@ func (r *EvmRegistry) verifyLogExists(upkeepId *big.Int, p ocr2keepers.UpkeepPay
} else {
r.lggr.Debugf("log block not provided, querying eth client for tx hash %s for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId)
}

latestBlock := r.bs.latestBlock.Load()
freshCheckBlock := latestBlock != nil && latestBlock.Number-p.Trigger.BlockNumber <= freshCheckBlockRange
// query eth client as a fallback
bn, bh, err := core.GetTxBlock(r.ctx, r.client, p.Trigger.LogTriggerExtension.TxHash)
if err != nil {
r.lggr.Warnf("failed to query tx hash %s for upkeepId %s: %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId, err.Error())
// primitive way of checking errors
if strings.Contains(err.Error(), "missing required field") || strings.Contains(err.Error(), "not found") {
return encoding.UpkeepFailureReasonTxHashNoLongerExists, encoding.NoPipelineError, false
// Tx hash was not found on chain, in case of freshCheckBlock it might be a flaky race condition
// otherwise return ineligible upkeep response
if freshCheckBlock {
return encoding.UpkeepFailureReasonNone, encoding.RpcFlakyFailure, true
} else {
return encoding.UpkeepFailureReasonTxHashNoLongerExists, encoding.NoPipelineError, false
}
}
r.lggr.Warnf("failed to query tx hash %s for upkeepId %s: %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId, err.Error())
// All other errors are considered as flaky RPC errors
return encoding.UpkeepFailureReasonNone, encoding.RpcFlakyFailure, true
}
if bn == nil {
r.lggr.Warnf("tx hash %s does not exist on chain for upkeepId %s.", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId)
return encoding.UpkeepFailureReasonTxHashNoLongerExists, encoding.NoPipelineError, false
// Empty receipt was returned, which indicates that the tx hash no longer exists on chain
// in case of freshCheckBlock it might be a flaky race condition
if freshCheckBlock {
return encoding.UpkeepFailureReasonNone, encoding.RpcFlakyFailure, true
} else {
r.lggr.Warnf("tx hash %s does not exist on chain for upkeepId %s.", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId)
return encoding.UpkeepFailureReasonTxHashNoLongerExists, encoding.NoPipelineError, false
}
}
if bh.Hex() != logBlockHash.Hex() {
r.lggr.Warnf("tx hash %s reorged from expected blockhash %s to %s for upkeepId %s.", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), logBlockHash.Hex(), bh.Hex(), upkeepId)
return encoding.UpkeepFailureReasonTxHashReorged, encoding.NoPipelineError, false
if freshCheckBlock {
return encoding.UpkeepFailureReasonNone, encoding.RpcFlakyFailure, true
} else {
r.lggr.Warnf("tx hash %s reorged from expected blockhash %s to %s for upkeepId %s.", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), logBlockHash.Hex(), bh.Hex(), upkeepId)
return encoding.UpkeepFailureReasonTxHashReorged, encoding.NoPipelineError, false
}
}
r.lggr.Debugf("tx hash %s exists on chain for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId)
return encoding.UpkeepFailureReasonNone, encoding.NoPipelineError, false
Expand Down Expand Up @@ -243,12 +269,16 @@ func (r *EvmRegistry) checkUpkeeps(ctx context.Context, payloads []ocr2keepers.U
index := indices[i]
if req.Error != nil {
latestBlock := r.bs.latestBlock.Load()
latestBlockNumber := int64(0)
if latestBlock != nil {
latestBlockNumber = int64(latestBlock.Number)
}
checkBlock, _, _ := r.getBlockAndUpkeepId(payloads[index].UpkeepID, payloads[index].Trigger)
// Exploratory: remove reliance on primitive way of checking errors
blockNotFound := (strings.Contains(req.Error.Error(), "header not found") || strings.Contains(req.Error.Error(), "missing trie node"))
if blockNotFound && int64(latestBlock.Number)-checkBlock.Int64() > checkBlockTooOldRange {
if blockNotFound && latestBlockNumber-checkBlock.Int64() > checkBlockTooOldRange {
// Check block not found in RPC and it is too old, non-retryable error
r.lggr.Warnf("block not found error encountered in check result for upkeepId %s, check block %d, latest block %d: %s", results[index].UpkeepID.String(), checkBlock.Int64(), int64(latestBlock.Number), req.Error)
r.lggr.Warnf("block not found error encountered in check result for upkeepId %s, check block %d, latest block %d: %s", results[index].UpkeepID.String(), checkBlock.Int64(), latestBlockNumber, req.Error)
results[index].Retryable = false
results[index].PipelineExecutionState = uint8(encoding.CheckBlockTooOld)
} else {
Expand Down

0 comments on commit 79b956d

Please sign in to comment.