Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add finality depth contraint in block subscriber #10686

Merged
merged 8 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ type BlockSubscriber struct {
latestBlock atomic.Pointer[ocr2keepers.BlockKey]
blockHistorySize int64
blockSize int64
finalityDepth uint32
lggr logger.Logger
}

var _ ocr2keepers.BlockSubscriber = &BlockSubscriber{}

func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr logger.Logger) *BlockSubscriber {
func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, finalityDepth uint32, lggr logger.Logger) *BlockSubscriber {
return &BlockSubscriber{
threadCtrl: utils.NewThreadControl(),
hb: hb,
Expand All @@ -65,6 +66,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr
blocks: map[int64]string{},
blockHistorySize: blockHistorySize,
blockSize: lookbackDepth,
finalityDepth: finalityDepth,
latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{},
lggr: lggr.Named("BlockSubscriber"),
}
Expand Down Expand Up @@ -226,10 +228,13 @@ func (bs *BlockSubscriber) processHead(h *evmtypes.Head) {
// head parent is a linked list with EVM finality depth
// when re-org happens, new heads will have pointers to the new blocks
i := int64(0)
for cp := h; ; cp = cp.Parent {
if cp == nil || bs.blocks[cp.Number] == cp.Hash.Hex() {
break
}
for cp := h; cp != nil; cp = cp.Parent {
// we don't stop when a matching (block number/hash) entry is seen in the map because parent linked list may be
// cut short during a re-org if head broadcaster backfill is not complete. This can cause some re-orged blocks
// left in the map. for example, re-org happens for block 98, 99, 100. next head 101 from broadcaster has parent list
// of 100, so block 100 and 101 are updated. when next head 102 arrives, it has full parent history of finality depth.
// if we stop when we see a block number/hash match, we won't look back and correct block 98 and 99.
// hence, we make a compromise here and check previous max(finality depth, blockSize) blocks and update the map.
existingHash, ok := bs.blocks[cp.Number]
if !ok {
bs.lggr.Debugf("filling block %d with new hash %s", cp.Number, cp.Hash.Hex())
Expand All @@ -238,7 +243,7 @@ func (bs *BlockSubscriber) processHead(h *evmtypes.Head) {
}
bs.blocks[cp.Number] = cp.Hash.Hex()
i++
if i > bs.blockSize {
if i > int64(bs.finalityDepth) || i > bs.blockSize {
break
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import (

const historySize = 4
const blockSize = int64(4)
const finality = uint32(4)

func TestBlockSubscriber_Subscribe(t *testing.T) {
lggr := logger.TestLogger(t)
var hb types.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
subId, _, err := bs.Subscribe()
Expand All @@ -47,7 +48,7 @@ func TestBlockSubscriber_Unsubscribe(t *testing.T) {
var hb types.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
subId, _, err := bs.Subscribe()
Expand All @@ -65,7 +66,7 @@ func TestBlockSubscriber_Unsubscribe_Failure(t *testing.T) {
var hb types.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
err := bs.Unsubscribe(2)
Expand Down Expand Up @@ -97,7 +98,7 @@ func TestBlockSubscriber_GetBlockRange(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
lp := new(mocks.LogPoller)
lp.On("LatestBlock", mock.Anything).Return(tc.LatestBlock, tc.LatestBlockErr)
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
blocks, err := bs.getBlockRange(testutils.Context(t))
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestBlockSubscriber_InitializeBlocks(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
lp := new(mocks.LogPoller)
lp.On("GetBlocksRange", mock.Anything, tc.Blocks, mock.Anything).Return(tc.PollerBlocks, tc.Error)
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
err := bs.initializeBlocks(testutils.Context(t), tc.Blocks)
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestBlockSubscriber_BuildHistory(t *testing.T) {

for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
bs.blocks = tc.Blocks
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestBlockSubscriber_Cleanup(t *testing.T) {

for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
bs.blocks = tc.Blocks
Expand Down Expand Up @@ -300,7 +301,7 @@ func TestBlockSubscriber_Start(t *testing.T) {

lp.On("GetBlocksRange", mock.Anything, blocks, mock.Anything).Return(pollerBlocks, nil)

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, finality, lggr)
bs.blockHistorySize = historySize
bs.blockSize = blockSize
err := bs.Start(context.Background())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,21 @@ func (r *EvmRegistry) getBlockHash(blockNumber *big.Int) (common.Hash, error) {

// verifyCheckBlock checks that the check block and hash are valid, returns the pipeline execution state and retryable
func (r *EvmRegistry) verifyCheckBlock(_ context.Context, checkBlock, upkeepId *big.Int, checkHash common.Hash) (state encoding.PipelineExecutionState, retryable bool) {
var h string
var ok bool
// verify check block number and hash are valid
h, ok = r.bs.queryBlocksMap(checkBlock.Int64())
if !ok {
r.lggr.Warnf("check block %s does not exist in block subscriber for upkeepId %s, querying eth client", checkBlock, upkeepId)
b, err := r.getBlockHash(checkBlock)
if err != nil {
r.lggr.Warnf("failed to query block %s: %s", checkBlock, err.Error())
return encoding.RpcFlakyFailure, true
}
h = b.Hex()
h, ok := r.bs.queryBlocksMap(checkBlock.Int64())
// if this block number/hash combo exists in block subscriber, this check block and hash still exist on chain and are valid
FelixFan1992 marked this conversation as resolved.
Show resolved Hide resolved
// the block hash in block subscriber might be slightly outdated, if it doesn't match then we fetch the latest from RPC.
if ok && h == checkHash.Hex() {
r.lggr.Debugf("check block hash %s exists on chain at block number %d for upkeepId %s", checkHash.Hex(), checkBlock, upkeepId)
return encoding.NoPipelineError, false
}
r.lggr.Warnf("check block %s does not exist in block subscriber or hash does not match for upkeepId %s. this may be caused by block subscriber outdated due to re-org, querying eth client to confirm", checkBlock, upkeepId)
b, err := r.getBlockHash(checkBlock)
if err != nil {
r.lggr.Warnf("failed to query block %s: %s", checkBlock, err.Error())
return encoding.RpcFlakyFailure, true
}
if checkHash.Hex() != h {
if checkHash.Hex() != b.Hex() {
r.lggr.Warnf("check block %s hash do not match. %s from block subscriber vs %s from trigger for upkeepId %s", checkBlock, h, checkHash.Hex(), upkeepId)
return encoding.CheckBlockInvalid, false
}
Expand All @@ -126,11 +127,15 @@ func (r *EvmRegistry) verifyLogExists(upkeepId *big.Int, p ocr2keepers.UpkeepPay
// if log block number is populated, check log block number and block hash
if logBlockNumber != 0 {
h, ok := r.bs.queryBlocksMap(logBlockNumber)
// if this block number/hash combo exists in block subscriber, this block and tx still exists on chain and is valid
infiloop2 marked this conversation as resolved.
Show resolved Hide resolved
FelixFan1992 marked this conversation as resolved.
Show resolved Hide resolved
// the block hash in block subscriber might be slightly outdated, if it doesn't match then we fetch the latest from RPC.
if ok && h == logBlockHash.Hex() {
r.lggr.Debugf("tx hash %s exists on chain at block number %d, block hash %s for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), logBlockHash.Hex(), logBlockNumber, upkeepId)
return encoding.UpkeepFailureReasonNone, encoding.NoPipelineError, false
}
r.lggr.Debugf("log block %d does not exist in block subscriber for upkeepId %s, querying eth client", logBlockNumber, upkeepId)
// if this block does not exist in the block subscriber, the block which this log lived on was probably re-orged
// hence, check eth client for this log's tx hash to confirm
r.lggr.Debugf("log block %d does not exist in block subscriber or block hash does not match for upkeepId %s. this may be caused by block subscriber outdated due to re-org, querying eth client to confirm", logBlockNumber, upkeepId)
} else {
r.lggr.Debugf("log block not provided, querying eth client for tx hash %s for upkeepId %s", hexutil.Encode(p.Trigger.LogTriggerExtension.TxHash[:]), upkeepId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ func TestRegistry_VerifyCheckBlock(t *testing.T) {
Trigger: ocr2keepers.NewTrigger(500, common.HexToHash("0x5bff03de234fe771ac0d685f9ee0fb0b757ea02ec9e6f10e8e2ee806db1b6b83")),
WorkID: "work",
},
poller: &mockLogPoller{
GetBlocksRangeFn: func(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]logpoller.LogPollerBlock, error) {
return []logpoller.LogPollerBlock{
{
BlockHash: common.HexToHash("0xcba5cf9e2bb32373c76015384e1098912d9510a72481c78057fcb088209167de"),
},
}, nil
},
},
blocks: map[int64]string{
500: "0xa518faeadcc423338c62572da84dda35fe44b34f521ce88f6081b703b250cca4",
},
Expand Down Expand Up @@ -388,6 +397,7 @@ func TestRegistry_CheckUpkeeps(t *testing.T) {
err error
ethCalls map[string]bool
receipts map[string]*types.Receipt
poller logpoller.LogPoller
ethCallErrors map[string]error
}{
{
Expand Down Expand Up @@ -463,6 +473,15 @@ func TestRegistry_CheckUpkeeps(t *testing.T) {
uid1.String(): true,
},
receipts: map[string]*types.Receipt{},
poller: &mockLogPoller{
GetBlocksRangeFn: func(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]logpoller.LogPollerBlock, error) {
return []logpoller.LogPollerBlock{
{
BlockHash: common.HexToHash("0xcba5cf9e2bb32373c76015384e1098912d9510a72481c78057fcb088209167de"),
},
}, nil
},
},
ethCallErrors: map[string]error{
uid1.String(): fmt.Errorf("error"),
},
Expand All @@ -477,8 +496,9 @@ func TestRegistry_CheckUpkeeps(t *testing.T) {
}
bs.latestBlock.Store(&tc.latestBlock)
e := &EvmRegistry{
lggr: lggr,
bs: bs,
lggr: lggr,
bs: bs,
poller: tc.poller,
}
client := new(evmClientMocks.Client)
for _, i := range tc.inputs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func TestRegistry_refreshLogTriggerUpkeeps(t *testing.T) {
var hb types3.HeadBroadcaster
var lp logpoller.LogPoller

bs := NewBlockSubscriber(hb, lp, lggr)
bs := NewBlockSubscriber(hb, lp, 1000, lggr)

registry := &EvmRegistry{
addr: common.BigToAddress(big.NewInt(1)),
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ocr2keeper/evm21/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(addr common.Address, client evm.Chain, mc *models.MercuryCredentials, k
logProvider, logRecoverer := logprovider.New(lggr, client.LogPoller(), client.Client(), utilsABI, services.upkeepState, finalityDepth)
services.logProvider = logProvider
services.logRecoverer = logRecoverer
services.blockSub = NewBlockSubscriber(client.HeadBroadcaster(), client.LogPoller(), lggr)
services.blockSub = NewBlockSubscriber(client.HeadBroadcaster(), client.LogPoller(), finalityDepth, lggr)

services.keyring = NewOnchainKeyringV3Wrapper(keyring)

Expand Down
Loading