Skip to content

Commit

Permalink
add finality depth contraint in block subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixFan1992 committed Sep 19, 2023
1 parent 2d5e9c1 commit 6585b86
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 29 deletions.
17 changes: 11 additions & 6 deletions core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ type BlockSubscriber struct {
latestBlock atomic.Pointer[ocr2keepers.BlockKey]
blockHistorySize int64
blockSize int64
finalityDepth uint32
lggr logger.Logger
}

var _ ocr2keepers.BlockSubscriber = &BlockSubscriber{}

func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr logger.Logger) *BlockSubscriber {
func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, finalityDepth uint32, lggr logger.Logger) *BlockSubscriber {
return &BlockSubscriber{
threadCtrl: utils.NewThreadControl(),
hb: hb,
Expand All @@ -65,6 +66,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr
blocks: map[int64]string{},
blockHistorySize: blockHistorySize,
blockSize: lookbackDepth,
finalityDepth: finalityDepth,
latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{},
lggr: lggr.Named("BlockSubscriber"),
}
Expand Down Expand Up @@ -226,10 +228,13 @@ func (bs *BlockSubscriber) processHead(h *evmtypes.Head) {
// head parent is a linked list with EVM finality depth
// when re-org happens, new heads will have pointers to the new blocks
i := int64(0)
for cp := h; ; cp = cp.Parent {
if cp == nil || bs.blocks[cp.Number] == cp.Hash.Hex() {
break
}
for cp := h; cp != nil; cp = cp.Parent {
// we don't stop when a matching (block number/hash) entry is seen in the map because parent linked list may be
// cut short during a re-org if head broadcaster backfill is not complete. This can cause some re-orged blocks
// left in the map. for example, re-org happens for block 98, 99, 100. next head 101 from broadcaster has parent list
// of 100, so block 100 and 101 are updated. when next head 102 arrives, it has full parent history of finality depth.
// if we stop when we see a block number/hash match, we won't look back and correct block 98 and 99.
// hence, we make a compromise here and check previous max(finality depth, blockSize) blocks and update the map.
existingHash, ok := bs.blocks[cp.Number]
if !ok {
bs.lggr.Debugf("filling block %d with new hash %s", cp.Number, cp.Hash.Hex())
Expand All @@ -238,7 +243,7 @@ func (bs *BlockSubscriber) processHead(h *evmtypes.Head) {
}
bs.blocks[cp.Number] = cp.Hash.Hex()
i++
if i > bs.blockSize {
if i > int64(bs.finalityDepth) || i > bs.blockSize {
break
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import (

const historySize = 4
const blockSize = int64(4)
const finality = uint32(4)

func TestBlockSubscriber_Subscribe(t *testing.T) {
lggr := logger.TestLogger(t)
var hb types.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
subId, _, err := bs.Subscribe()
Expand All @@ -47,7 +48,7 @@ func TestBlockSubscriber_Unsubscribe(t *testing.T) {
var hb types.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
subId, _, err := bs.Subscribe()
Expand All @@ -65,7 +66,7 @@ func TestBlockSubscriber_Unsubscribe_Failure(t *testing.T) {
var hb types.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
err := bs.Unsubscribe(2)
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestBlockSubscriber_GetBlockRange(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
lp := new(mocks.LogPoller)
lp.On("LatestBlock", mock.Anything).Return(tc.LatestBlock, tc.LatestBlockErr)
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
blocks, err := bs.getBlockRange(testutils.Context(t))
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestBlockSubscriber_InitializeBlocks(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
lp := new(mocks.LogPoller)
lp.On("GetBlocksRange", mock.Anything, tc.Blocks, mock.Anything).Return(tc.PollerBlocks, tc.Error)
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
err := bs.initializeBlocks(testutils.Context(t), tc.Blocks)
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestBlockSubscriber_BuildHistory(t *testing.T) {

for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
bs.blocks = tc.Blocks
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestBlockSubscriber_Cleanup(t *testing.T) {

for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
bs.blocks = tc.Blocks
Expand Down Expand Up @@ -300,7 +301,7 @@ func TestBlockSubscriber_Start(t *testing.T) {

lp.On("GetBlocksRange", mock.Anything, blocks, mock.Anything).Return(pollerBlocks, nil)

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
err := bs.Start(context.Background())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,21 @@ func (r *EvmRegistry) getBlockHash(blockNumber *big.Int) (common.Hash, error) {

// verifyCheckBlock checks that the check block and hash are valid, returns the pipeline execution state and retryable
func (r *EvmRegistry) verifyCheckBlock(_ context.Context, checkBlock, upkeepId *big.Int, checkHash common.Hash) (state encoding.PipelineExecutionState, retryable bool) {
var h string
var ok bool
// verify check block number and hash are valid
h, ok = r.bs.queryBlocksMap(checkBlock.Int64())
if !ok {
r.lggr.Warnf("check block %s does not exist in block subscriber for upkeepId %s, querying eth client", checkBlock, upkeepId)
b, err := r.getBlockHash(checkBlock)
if err != nil {
r.lggr.Warnf("failed to query block %s: %s", checkBlock, err.Error())
return encoding.RpcFlakyFailure, true
}
h = b.Hex()
h, ok := r.bs.queryBlocksMap(checkBlock.Int64())
// if this block number/hash combo exists in block subscriber, this check block and hash still exist on chain and are valid
// the block hash in block subscriber might be slightly outdated, if it doesn't match then we fetch the latest from RPC.
if ok && h == checkHash.Hex() {
r.lggr.Debugf("check block hash %s exists on chain at block number %d for upkeepId %s", checkHash.Hex(), checkBlock, upkeepId)
return encoding.NoPipelineError, false
}
r.lggr.Warnf("check block %s does not exist in block subscriber or hash does not match for upkeepId %s. this may be caused by block subscriber outdated due to re-org, querying eth client to confirm", checkBlock, upkeepId)
b, err := r.getBlockHash(checkBlock)
if err != nil {
r.lggr.Warnf("failed to query block %s: %s", checkBlock, err.Error())
return encoding.RpcFlakyFailure, true
}
if checkHash.Hex() != h {
if checkHash.Hex() != b.Hex() {
r.lggr.Warnf("check block %s hash do not match. %s from block subscriber vs %s from trigger for upkeepId %s", checkBlock, h, checkHash.Hex(), upkeepId)
return encoding.CheckBlockInvalid, false
}
Expand All @@ -126,11 +127,15 @@ func (r *EvmRegistry) verifyLogExists(upkeepId *big.Int, p ocr2keepers.UpkeepPay
// if log block number is populated, check log block number and block hash
if logBlockNumber != 0 {
h, ok := r.bs.queryBlocksMap(logBlockNumber)
// if this block number/hash combo exists in block subscriber, this block and tx still exists on chain and is valid
// the block hash in block subscriber might be slightly outdated, if it doesn't match then we fetch the latest from RPC.
if ok && h == logBlockHash.Hex() {
r.lggr.Debugf("tx hash %s exists on chain at block number %d, block hash %s for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), logBlockHash.Hex(), logBlockNumber, upkeepId)
return encoding.UpkeepFailureReasonNone, encoding.NoPipelineError, false
}
r.lggr.Debugf("log block %d does not exist in block subscriber for upkeepId %s, querying eth client", logBlockNumber, upkeepId)
// if this block does not exist in the block subscriber, the block which this log lived on was probably re-orged
// hence, check eth client for this log's tx hash to confirm
r.lggr.Debugf("log block %d does not exist in block subscriber or block hash does not match for upkeepId %s. this may be caused by block subscriber outdated due to re-org, querying eth client to confirm", logBlockNumber, upkeepId)
} else {
r.lggr.Debugf("log block not provided, querying eth client for tx hash %s for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) {
Trigger: ocr2keepers.NewTrigger(500, common.HexToHash("0x5bff03de234fe771ac0d685f9ee0fb0b757ea02ec9e6f10e8e2ee806db1b6b83")),
WorkID: "work",
},
poller: &mockLogPoller{
GetBlocksRangeFn: func(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]logpoller.LogPollerBlock, error) {
return []logpoller.LogPollerBlock{
{
BlockHash: common.HexToHash("0xcba5cf9e2bb32373c76015384e1098912d9510a72481c78057fcb088209167de"),
},
}, nil
},
},
blocks: map[int64]string{
500: "0xa518faeadcc423338c62572da84dda35fe44b34f521ce88f6081b703b250cca4",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func TestRegistry_refreshLogTriggerUpkeeps(t *testing.T) {
var hb types3.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, 1000, lggr)

registry := &EvmRegistry{
addr: common.BigToAddress(big.NewInt(1)),
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ocr2keeper/evm21/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(addr common.Address, client evm.Chain, mc *models.MercuryCredentials, k
logProvider, logRecoverer := logprovider.New(lggr, client.LogPoller(), client.Client(), utilsABI, services.upkeepState, finalityDepth)
services.logProvider = logProvider
services.logRecoverer = logRecoverer
services.blockSub = NewBlockSubscriber(client.HeadBroadcaster(), client.LogPoller(), lggr)
services.blockSub = NewBlockSubscriber(client.HeadBroadcaster(), client.LogPoller(), finalityDepth, lggr)

services.keyring = NewOnchainKeyringV3Wrapper(keyring)

Expand Down

0 comments on commit 6585b86

Please sign in to comment.