diff --git a/common/client/mock_rpc_test.go b/common/client/mock_rpc_test.go index d87a02d47c1..abe876e4fcd 100644 --- a/common/client/mock_rpc_test.go +++ b/common/client/mock_rpc_test.go @@ -366,6 +366,34 @@ func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS return r0, r1 } +// FinalizedBlock provides a mock function with given fields: ctx +func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD]) FinalizedBlock(ctx context.Context) (HEAD, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for FinalizedBlock") + } + + var r0 HEAD + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (HEAD, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) HEAD); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(HEAD) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // LINKBalance provides a mock function with given fields: ctx, accountAddress, linkAddress func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD]) LINKBalance(ctx context.Context, accountAddress ADDR, linkAddress ADDR) (*assets.Link, error) { ret := _m.Called(ctx, accountAddress, linkAddress) diff --git a/common/client/multi_node.go b/common/client/multi_node.go index 7d55784e68f..93e27ada447 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -633,3 +633,11 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP } return n.RPC().TransactionReceipt(ctx, txHash) } + +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) FinalizedBlock(ctx context.Context) (head HEAD, wee error) { + n, err := c.selectNode() + if err != nil { + return head, err + } + return n.RPC().FinalizedBlock(ctx) +} diff --git a/common/client/types.go b/common/client/types.go index 32d4da98b50..4f11292eb0d 100644 --- a/common/client/types.go +++ b/common/client/types.go @@ -5,6 +5,7 @@ import ( "math/big" "github.com/smartcontractkit/chainlink-common/pkg/assets" + feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/common/types" ) @@ -113,6 +114,7 @@ type clientAPI[ BlockByNumber(ctx context.Context, number *big.Int) (HEAD, error) BlockByHash(ctx context.Context, hash BLOCK_HASH) (HEAD, error) LatestBlockHeight(context.Context) (*big.Int, error) + FinalizedBlock(ctx context.Context) (HEAD, error) // Events FilterEvents(ctx context.Context, query EVENT_OPS) ([]EVENT, error) diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 2b63cfdc00d..80d0ab4e3e3 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -912,12 +912,38 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han } } -// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the depth -// of the given chain and ensures that every one has a receipt with a block hash that is -// in the given chain. -// -// If any of the confirmed transactions does not have a receipt in the chain, it has been -// re-org'd out and will be rebroadcast. +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findFinalizedHeadInChain(ctx context.Context, head types.Head[BLOCK_HASH]) (types.Head[BLOCK_HASH], error) { + if ec.chainConfig.FinalityTagEnabled() { + finalizedHash, finalizedBlockNumber, err := ec.client.FinalizedBlockHash(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get finalized block hash: %w", err) + } + + for head != nil { + if head.BlockNumber() == finalizedBlockNumber.Int64() && finalizedHash == head.BlockHash() { + return head, nil + } + head = head.GetParent() + } + + return nil, nil + } + + finalizedBlock := head.BlockNumber() - int64(ec.chainConfig.FinalityDepth()) + for head != nil { + if head.BlockNumber() == finalizedBlock { + return head, nil + } + head = head.GetParent() + } + + return nil, nil +} + +// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes tries to find them in the provided chain or fetches +// it from an RPC. +// If any of the confirmed transactions does not have a receipt in the chain, it has been re-org'd out and will be rebroadcast. +// If finalized receipt was found marks transaction and the receipt as finalized. func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH]) error { if head.ChainLength() < ec.chainConfig.FinalityDepth() { logArgs := []interface{}{ @@ -935,19 +961,20 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens ec.nConsecutiveBlocksChainTooShort = 0 } - // fetch all confirmed transactions. - confirmedTxs, err := ec.txStore.FindConfirmedTransactions(ctx, ec.chainID) + missingTxs, err := ec.markFinalizedUsingLocalChain(ctx, head, ec.chainConfig.RPCDefaultBatchSize()) if err != nil { - return fmt.Errorf("FindConfirmedTransactions failed: %w", err) + return fmt.Errorf("failed to find tx missing from the local chain: %w", err) } - finalizedIDs, err := ec.markFinalized(ctx, head, confirmedTxs) + // Tx might be missing from the chain because it was included into block that is deeper than HistoryDepth. + // Mark finalized txes and returns IDs of transactions whose ID's we failed to find + missingTxIDs, err := ec.fetchAndSaveFinalizedReceipts(ctx, missingTxs) if err != nil { return err } - for _, tx := range confirmedTxs { - if _, ok := finalizedIDs[tx.ID]; ok || hasReceiptInLongestChain(*tx, head) || hasReceiptHigherThanHead(*tx, head) { + for _, tx := range missingTxs { + if _, ok := missingTxIDs[tx.ID]; !ok { continue } @@ -960,7 +987,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens // It is safe to process separate keys concurrently // NOTE: This design will block one key if another takes a really long time to execute var wg sync.WaitGroup - errors := []error{} + var errs []error var errMu sync.Mutex wg.Add(len(ec.enabledAddresses)) for _, address := range ec.enabledAddresses { @@ -968,7 +995,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens defer wg.Done() if err := ec.handleAnyInProgressAttempts(ctx, fromAddress, head.BlockNumber()); err != nil { errMu.Lock() - errors = append(errors, err) + errs = append(errs, err) errMu.Unlock() ec.lggr.Errorw("Error in handleAnyInProgressAttempts", "err", err, "fromAddress", fromAddress) } @@ -977,27 +1004,79 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens wg.Wait() - return multierr.Combine(errors...) + return multierr.Combine(errs...) } -func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalized(ctx context.Context, head types.Head[BLOCK_HASH], - txs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (map[int64]TX_HASH, error) { - finalizedTxs := make(map[int64]TX_HASH, len(txs)) +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedUsingLocalChain(ctx context.Context, + head types.Head[BLOCK_HASH], batchSize uint32) ( + []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + // fetch all confirmed transactions. + confirmedTxs, err := ec.txStore.FindConfirmedTransactions(ctx, ec.chainID) + if err != nil { + return nil, fmt.Errorf("FindConfirmedTransactions failed: %w", err) + } + + finalizedBHead, err := ec.findFinalizedHeadInChain(ctx, head) + if err != nil { + return nil, fmt.Errorf("failed to find finalized block in chain: %w", err) + } + + txsMissingInChain := make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, len(confirmedTxs)) + finalizedAttempts := make([]int64, 0, batchSize) + for _, tx := range confirmedTxs { + // transaction is finalized + if finalizedAttempt := findAttemptWithReceiptInChain(*tx, finalizedBHead); finalizedAttempt != nil { + finalizedAttempts = append(finalizedAttempts, finalizedAttempt.ID) + + if uint32(len(finalizedAttempts)) >= batchSize { + err := ec.txStore.MarkFinalized(ctx, finalizedAttempts) + if err != nil { + return nil, fmt.Errorf("failed to mark attempts as finalized") + } + finalizedAttempts = finalizedAttempts[:0] + } + continue + } + + // tx is present in chain + if confirmedReceipt := findAttemptWithReceiptInChain(*tx, head); confirmedReceipt != nil { + continue + } + + // for some reason one of the receipts has block number from the future - it's safer to wait + if hasReceiptHigherThanHead(*tx, head) { + continue + } + + txsMissingInChain = append(txsMissingInChain, tx) + } - if len(txs) == 0 { - ec.lggr.Debug("no transactions to mark finalized") - return finalizedTxs, nil + if len(finalizedAttempts) >= 0 { + err := ec.txStore.MarkFinalized(ctx, finalizedAttempts) + if err != nil { + return nil, fmt.Errorf("failed to mark attempts as finalized") + } } + return txsMissingInChain, nil +} + +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fetchAndSaveFinalizedReceipts( + ctx context.Context, + txs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) (map[int64]struct{}, error) { + + missingTxs := make(map[int64]struct{}, len(txs)) + attemptsBatch := make([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, ec.chainConfig.RPCDefaultBatchSize()) for _, tx := range txs { + missingTxs[tx.ID] = struct{}{} for _, attempt := range tx.TxAttempts { - // TODO: BCI-2605 skip attempts that were broadcasted after the finalized block attemptsBatch = append(attemptsBatch, attempt) // save one for block fetching if uint32(len(attemptsBatch)+1) >= ec.chainConfig.RPCDefaultBatchSize() { - err := ec.markFinalizedBatch(ctx, attemptsBatch, finalizedTxs) + err := ec.fetchAndSaveFinalizedReceiptsBatch(ctx, attemptsBatch, missingTxs) if err != nil { return nil, fmt.Errorf("failed to mark txs finalized: %w", err) } @@ -1007,19 +1086,19 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar } if len(attemptsBatch) > 0 { - err := ec.markFinalizedBatch(ctx, attemptsBatch, finalizedTxs) + err := ec.fetchAndSaveFinalizedReceiptsBatch(ctx, attemptsBatch, missingTxs) if err != nil { return nil, fmt.Errorf("failed to mark txs finalized: %w", err) } } - return finalizedTxs, nil + return missingTxs, nil } -func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markFinalizedBatch( +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fetchAndSaveFinalizedReceiptsBatch( ctx context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], - finalizedTxs map[int64]TX_HASH, + missingTxs map[int64]struct{}, ) error { rpcFinalizedBlock, receipts, txErrs, err := ec.client.BatchGetReceiptsWithFinalizedBlock(ctx, attempts, @@ -1032,7 +1111,6 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar const errMsg = "invariant violation expected number of attempts to match number of resulting receipts/errors" ec.lggr.With("receipts", receipts, "attempts", attempts, "txErrs", txErrs).Criticalw(errMsg) return errors.New(errMsg) - } finalizedReceipts := make([]R, 0, len(attempts)) @@ -1048,18 +1126,14 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar continue } + delete(missingTxs, attempt.TxID) if receipt.GetBlockNumber().Cmp(rpcFinalizedBlock) >= 0 { - l.With("batch_request_finalized_block", rpcFinalizedBlock).Debug("attempt is not finalized") + l.With("rpc_finalized_block", rpcFinalizedBlock).Debug("attempt is not finalized") continue } - finalizedReceipts = append(finalizedReceipts, receipt) - if anotherTxHash, ok := finalizedTxs[attempt.TxID]; ok { - const errMsg = "found two finalized attempts for single tx" - l.With("another_tx_hash", anotherTxHash).Critical(errMsg) - return errors.New(errMsg) - } - finalizedTxs[attempts[i].TxID] = attempts[i].Hash + finalizedReceipts = append(finalizedReceipts, receipts[i]) + missingTxs[attempts[i].TxID] = struct{}{} } err = ec.txStore.SaveFinalizedReceipts(ctx, finalizedReceipts, ec.chainID) @@ -1089,26 +1163,26 @@ func hasReceiptHigherThanHead[ return false } -func hasReceiptInLongestChain[ +func findAttemptWithReceiptInChain[ CHAIN_ID types.ID, ADDR types.Hashable, TX_HASH, BLOCK_HASH types.Hashable, SEQ types.Sequence, FEE feetypes.Fee, -](etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) bool { - for { +](etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + for head != nil { for _, attempt := range etx.TxAttempts { for _, receipt := range attempt.Receipts { if receipt.GetBlockHash().String() == head.BlockHash().String() && receipt.GetBlockNumber().Int64() == head.BlockNumber() { - return true + return &attempt } } } - if head.GetParent() == nil { - return false - } + head = head.GetParent() } + + return nil } func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markForRebroadcast(etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head types.Head[BLOCK_HASH]) error { diff --git a/common/txmgr/types/client.go b/common/txmgr/types/client.go index 652e21f9b0a..32fad092663 100644 --- a/common/txmgr/types/client.go +++ b/common/txmgr/types/client.go @@ -37,6 +37,7 @@ type TxmClient[ attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], useFinalityTag bool, finalityDepth uint32) ( finalizedBlock *big.Int, txReceipt []R, txErr []error, funcErr error) + FinalizedBlockHash(ctx context.Context) (BLOCK_HASH, *big.Int, error) } // TransactionClient contains the methods for building, simulating, broadcasting transactions diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index eb65ff94f6c..e62d9d2db3a 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -872,6 +872,24 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllConf return r0 } +// MarkFinalized provides a mock function with given fields: ctx, attempts +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkFinalized(ctx context.Context, attempts []int64) error { + ret := _m.Called(ctx, attempts) + + if len(ret) == 0 { + panic("no return value specified for MarkFinalized") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { + r0 = rf(ctx, attempts) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // MarkOldTxesMissingReceiptAsErrored provides a mock function with given fields: ctx, blockNum, finalityDepth, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { ret := _m.Called(ctx, blockNum, finalityDepth, chainID) @@ -990,9 +1008,9 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFetched return r0 } -// SaveFinalizedReceipts provides a mock function with given fields: ctx, r, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFinalizedReceipts(ctx context.Context, r []R, chainID CHAIN_ID) error { - ret := _m.Called(ctx, r, chainID) +// SaveFinalizedReceipts provides a mock function with given fields: ctx, receipts, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFinalizedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) error { + ret := _m.Called(ctx, receipts, chainID) if len(ret) == 0 { panic("no return value specified for SaveFinalizedReceipts") @@ -1000,7 +1018,7 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveFinaliz var r0 error if rf, ok := ret.Get(0).(func(context.Context, []R, CHAIN_ID) error); ok { - r0 = rf(ctx, r, chainID) + r0 = rf(ctx, receipts, chainID) } else { r0 = ret.Error(0) } diff --git a/common/txmgr/types/mocks/txm_client.go b/common/txmgr/types/mocks/txm_client.go index ce66ca6e93b..970c593fd88 100644 --- a/common/txmgr/types/mocks/txm_client.go +++ b/common/txmgr/types/mocks/txm_client.go @@ -218,6 +218,43 @@ func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Configure return r0 } +// FinalizedBlockHash provides a mock function with given fields: ctx +func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FinalizedBlockHash(ctx context.Context) (BLOCK_HASH, *big.Int, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for FinalizedBlockHash") + } + + var r0 BLOCK_HASH + var r1 *big.Int + var r2 error + if rf, ok := ret.Get(0).(func(context.Context) (BLOCK_HASH, *big.Int, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) BLOCK_HASH); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(BLOCK_HASH) + } + + if rf, ok := ret.Get(1).(func(context.Context) *big.Int); ok { + r1 = rf(ctx) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*big.Int) + } + } + + if rf, ok := ret.Get(2).(func(context.Context) error); ok { + r2 = rf(ctx) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // PendingSequenceAt provides a mock function with given fields: ctx, addr func (_m *TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PendingSequenceAt(ctx context.Context, addr ADDR) (SEQ, error) { ret := _m.Called(ctx, addr) diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index cad3df9df94..b1942eb7e16 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -40,7 +40,8 @@ type TxStore[ // Update tx to mark that its callback has been signaled UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error SaveFetchedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) (err error) - SaveFinalizedReceipts(ctx context.Context, r []R, chainID CHAIN_ID) (err error) + SaveFinalizedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) (err error) + MarkFinalized(ctx context.Context, attempts []int64) error // additional methods for tx store management CheckTxQueueCapacity(ctx context.Context, fromAddress ADDR, maxQueuedTransactions uint64, chainID CHAIN_ID) (err error) diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index 5dd70992382..5d01ad3d743 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -274,3 +274,7 @@ func (c *chainClient) TransactionReceipt(ctx context.Context, txHash common.Hash //return rpc.TransactionReceipt(ctx, txHash) return rpc.TransactionReceiptGeth(ctx, txHash) } + +func (c *chainClient) FinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { + return c.multiNode.FinalizedBlock(ctx) +} diff --git a/core/chains/evm/client/client.go b/core/chains/evm/client/client.go index 61635c59c6b..5d629c90256 100644 --- a/core/chains/evm/client/client.go +++ b/core/chains/evm/client/client.go @@ -63,6 +63,7 @@ type Client interface { HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error) HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error) SubscribeNewHead(ctx context.Context, ch chan<- *evmtypes.Head) (ethereum.Subscription, error) + FinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) SendTransactionReturnCode(ctx context.Context, tx *types.Transaction, fromAddress common.Address) (commonclient.SendTxReturnCode, error) @@ -280,7 +281,11 @@ func (client *client) LatestBlockHeight(ctx context.Context) (*big.Int, error) { func (client *client) HeadByNumber(ctx context.Context, number *big.Int) (head *evmtypes.Head, err error) { hex := ToBlockNumArg(number) - err = client.pool.CallContext(ctx, &head, "eth_getBlockByNumber", hex, false) + return client.headByNumber(ctx, hex) +} + +func (client *client) headByNumber(ctx context.Context, num string) (head *evmtypes.Head, err error) { + err = client.pool.CallContext(ctx, &head, "eth_getBlockByNumber", num, false) if err != nil { return nil, err } @@ -292,6 +297,10 @@ func (client *client) HeadByNumber(ctx context.Context, number *big.Int) (head * return } +func (client *client) FinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) { + return client.headByNumber(ctx, rpc.FinalizedBlockNumber.String()) +} + func (client *client) HeadByHash(ctx context.Context, hash common.Hash) (head *evmtypes.Head, err error) { err = client.pool.CallContext(ctx, &head, "eth_getBlockByHash", hash.Hex(), false) if err != nil { diff --git a/core/chains/evm/client/mocks/client.go b/core/chains/evm/client/mocks/client.go index 0b45894cf28..19bbeeb2a52 100644 --- a/core/chains/evm/client/mocks/client.go +++ b/core/chains/evm/client/mocks/client.go @@ -367,6 +367,36 @@ func (_m *Client) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]typ return r0, r1 } +// FinalizedBlock provides a mock function with given fields: ctx +func (_m *Client) FinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for FinalizedBlock") + } + + var r0 *evmtypes.Head + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*evmtypes.Head, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *evmtypes.Head); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*evmtypes.Head) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // HeadByHash provides a mock function with given fields: ctx, n func (_m *Client) HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error) { ret := _m.Called(ctx, n) diff --git a/core/chains/evm/client/null_client.go b/core/chains/evm/client/null_client.go index e3bb1defd0d..8941e82d1e0 100644 --- a/core/chains/evm/client/null_client.go +++ b/core/chains/evm/client/null_client.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/assets" "github.com/smartcontractkit/chainlink-common/pkg/logger" + commonclient "github.com/smartcontractkit/chainlink/v2/common/client" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -67,6 +68,11 @@ func (nc *NullClient) HeadByHash(ctx context.Context, h common.Hash) (*evmtypes. return nil, nil } +func (nc *NullClient) FinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { + nc.lggr.Debug("FinalizedBlock") + return nil, nil +} + type nullSubscription struct { lggr logger.Logger } diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index ce3a67162ed..c7f37178598 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -479,9 +479,17 @@ func (r *rpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (header return } +func (client *rpcClient) FinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) { + return client.blockByNumber(ctx, rpc.FinalizedBlockNumber.String()) +} + func (r *rpcClient) BlockByNumber(ctx context.Context, number *big.Int) (head *evmtypes.Head, err error) { hex := ToBlockNumArg(number) - err = r.CallContext(ctx, &head, "eth_getBlockByNumber", hex, false) + return r.blockByNumber(ctx, hex) +} + +func (r *rpcClient) blockByNumber(ctx context.Context, number string) (head *evmtypes.Head, err error) { + err = r.CallContext(ctx, &head, "eth_getBlockByNumber", number, false) if err != nil { return nil, err } diff --git a/core/chains/evm/client/simulated_backend_client.go b/core/chains/evm/client/simulated_backend_client.go index bd2e959d9bc..63512d2f70c 100644 --- a/core/chains/evm/client/simulated_backend_client.go +++ b/core/chains/evm/client/simulated_backend_client.go @@ -196,13 +196,17 @@ func (c *SimulatedBackendClient) HeadByNumber(ctx context.Context, n *big.Int) ( } else if header == nil { return nil, ethereum.NotFound } + return c.headerToHead(header), nil +} + +func (c *SimulatedBackendClient) headerToHead(header *types.Header) *evmtypes.Head { return &evmtypes.Head{ EVMChainID: ubig.NewI(c.chainId.Int64()), Hash: header.Hash(), Number: header.Number.Int64(), ParentHash: header.ParentHash, Timestamp: time.Unix(int64(header.Time), 0), - }, nil + } } // HeadByHash returns our own header type. @@ -213,13 +217,7 @@ func (c *SimulatedBackendClient) HeadByHash(ctx context.Context, h common.Hash) } else if header == nil { return nil, ethereum.NotFound } - return &evmtypes.Head{ - EVMChainID: ubig.NewI(c.chainId.Int64()), - Hash: header.Hash(), - Number: header.Number.Int64(), - ParentHash: header.ParentHash, - Timestamp: time.Unix(int64(header.Time), 0), - }, nil + return c.headerToHead(header), nil } // BlockByNumber returns a geth block type. @@ -623,6 +621,11 @@ func (c *SimulatedBackendClient) ethGetHeaderByNumber(ctx context.Context, resul return nil } +func (c *SimulatedBackendClient) FinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) { + header := c.b.Blockchain().CurrentFinalBlock() + return c.headerToHead(header), nil +} + func toCallMsg(params map[string]interface{}) ethereum.CallMsg { var callMsg ethereum.CallMsg diff --git a/core/chains/evm/txmgr/client.go b/core/chains/evm/txmgr/client.go index 650f97fdb56..797c82b4d0e 100644 --- a/core/chains/evm/txmgr/client.go +++ b/core/chains/evm/txmgr/client.go @@ -150,6 +150,15 @@ func newGetBatchReceiptsReq(attempts []TxAttempt) (reqs []rpc.BatchElem, txRecei return reqs, txReceipts, txErrs } +func (c *evmTxmClient) FinalizedBlockHash(ctx context.Context) (common.Hash, *big.Int, error) { + head, err := c.client.FinalizedBlock(ctx) + if err != nil || head == nil { + return common.Hash{}, big.NewInt(0), err + } + + return head.Hash, big.NewInt(head.BlockNumber()), nil +} + // TODO: test me func (c *evmTxmClient) BatchGetReceiptsWithFinalizedBlock(ctx context.Context, attempts []TxAttempt, useFinalityTag bool, finalityDepth uint32) ( finalizedBlock *big.Int, txReceipt []*evmtypes.Receipt, txErr []error, funcErr error) { diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index 34336ae3cd3..21559b42da0 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -2683,11 +2683,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { require.NoError(t, txStore.DeleteAll(testutils.Context(t))) etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 2, 1, fromAddress) - receipt := mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) - - ethClient.On("BatchGetReceiptsWithFinalizedBlock", mock.Anything, mock.Anything, - config.EVM().FinalityTagEnabled(), config.EVM().FinalityDepth(), - ).Return(big.NewInt(0), []*evmtypes.Receipt{txmgr.DbReceiptToEvmReceipt(&receipt)}, []error{nil}, nil).Once() + _ = mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) // Do the thing require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) @@ -2770,7 +2766,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { // Include one within head height but a different block hash mustInsertEthReceipt(t, txStore, head.Parent.Number, utils.NewHash(), attemptHash) - // none of the receipts are found on markFinalized step + // none of the receipts are found ethClient.On("BatchGetReceiptsWithFinalizedBlock", mock.Anything, mock.Anything, config.EVM().FinalityTagEnabled(), config.EVM().FinalityDepth(), ).Return(big.NewInt(0), []*evmtypes.Receipt{nil}, []error{nil}, nil).Once() @@ -2809,7 +2805,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { // Receipt is within head height but a different block hash mustInsertEthReceipt(t, txStore, head.Parent.Number, utils.NewHash(), attempt3.Hash) - // none of the receipts are found on markFinalized step + // none of the receipts are found ethClient.On("BatchGetReceiptsWithFinalizedBlock", mock.Anything, mock.Anything, config.EVM().FinalityTagEnabled(), config.EVM().FinalityDepth(), ).Return(big.NewInt(0), []*evmtypes.Receipt{nil, nil, nil}, []error{nil, nil, nil}, nil).Once() @@ -2845,11 +2841,6 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { // Add receipt that is higher than head mustInsertEthReceipt(t, txStore, head.Number+1, utils.NewHash(), attempt.Hash) - // none of the receipts are found on markFinalized step - ethClient.On("BatchGetReceiptsWithFinalizedBlock", mock.Anything, mock.Anything, - config.EVM().FinalityTagEnabled(), config.EVM().FinalityDepth(), - ).Return(big.NewInt(0), []*evmtypes.Receipt{nil}, []error{nil}, nil) - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(testutils.Context(t), &head)) etx, err := txStore.FindTxWithAttempts(etx.ID) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index b02ae66ff7b..fb4f9c93551 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -906,14 +906,36 @@ func (o *evmTxStore) saveFetchedReceipts(ctx context.Context, r []*evmtypes.Rece return pkgerrors.Wrap(err, "SaveFetchedReceipts failed to save receipts") } -func (o *evmTxStore) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) (err error) { +func (o *evmTxStore) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) error { return o.saveFetchedReceipts(ctx, r, chainID, "confirmed", txmgrtypes.TxAttemptBroadcast) } -func (o *evmTxStore) SaveFinalizedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) (err error) { +func (o *evmTxStore) SaveFinalizedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) error { return o.saveFetchedReceipts(ctx, r, chainID, "finalized", txmgrtypes.TxAttemptFinalized) } +func (o *evmTxStore) MarkFinalized(ctx context.Context, attempts []int64) error { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + if len(attempts) == 0 { + return nil + } + + stmt := ` + WITH updated_eth_tx_attempts AS ( + UPDATE evm.tx_attempts SET state = 'finalized' WHERE id = ANY ($1) RETURNING evm.tx_attempts.eth_tx_id + ) + UPDATE evm.txes + SET state = 'finalized' + FROM updated_eth_tx_attempts + WHERE updated_eth_tx_attempts.eth_tx_id = evm.txes.id + ` + err := qq.ExecQ(stmt, pq.Array(attempts)) + return pkgerrors.Wrap(err, "MarkFinalized failed to update state") +} + // MarkAllConfirmedMissingReceipt // It is possible that we can fail to get a receipt for all evm.tx_attempts // even though a transaction with this nonce has long since been confirmed (we diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 7d0e3b28e52..ee943f30c7f 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -500,7 +500,44 @@ func TestORM_FindTxAttemptsRequiringReceiptFetch(t *testing.T) { assert.Equal(t, etx0.TxAttempts[0].ID, attempts[0].ID) } -func TestORM_SaveFetchedReceipts(t *testing.T) { +func TestORM_MarkFinalized(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + blockNum := int64(42) + etx0 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 0, blockNum) + newAttempt := cltest.NewLegacyEthTxAttempt(t, etx0.ID) + newAttempt.BroadcastBeforeBlockNum = &blockNum + newAttempt.State = txmgrtypes.TxAttemptBroadcast + // ensure that gas prices are unique + newAttempt.TxFee.Legacy = newAttempt.TxFee.Legacy.Add(assets.NewWei(big.NewInt(10))) + require.NoError(t, txStore.InsertTxAttempt(&newAttempt)) + + etx1 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 1, blockNum) + + err := txStore.MarkFinalized(testutils.Context(t), []int64{etx0.TxAttempts[0].ID}) + require.NoError(t, err) + + etx0, err = txStore.FindTxWithAttempts(etx0.ID) + require.NoError(t, err) + require.Len(t, etx0.TxAttempts, 2) + // as we've finalized attempt with lower price - it will be second + require.Equal(t, txmgrtypes.TxAttemptBroadcast, etx0.TxAttempts[0].State) + require.Equal(t, txmgrtypes.TxAttemptFinalized, etx0.TxAttempts[1].State) + require.Equal(t, txmgrcommon.TxFinalized, etx0.State) + + etx1, err = txStore.FindTxWithAttempts(etx1.ID) + require.NoError(t, err) + require.Len(t, etx1.TxAttempts, 1) + require.Equal(t, txmgrcommon.TxConfirmed, etx1.State) +} + +func TestORM_SaveReceipts(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 17180159cf9..665d1606d7e 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -993,6 +993,24 @@ func (_m *EvmTxStore) MarkAllConfirmedMissingReceipt(ctx context.Context, chainI return r0 } +// MarkFinalized provides a mock function with given fields: ctx, attempts +func (_m *EvmTxStore) MarkFinalized(ctx context.Context, attempts []int64) error { + ret := _m.Called(ctx, attempts) + + if len(ret) == 0 { + panic("no return value specified for MarkFinalized") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok { + r0 = rf(ctx, attempts) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // MarkOldTxesMissingReceiptAsErrored provides a mock function with given fields: ctx, blockNum, finalityDepth, chainID func (_m *EvmTxStore) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID *big.Int) error { ret := _m.Called(ctx, blockNum, finalityDepth, chainID) @@ -1111,9 +1129,9 @@ func (_m *EvmTxStore) SaveFetchedReceipts(ctx context.Context, receipts []*evmty return r0 } -// SaveFinalizedReceipts provides a mock function with given fields: ctx, r, chainID -func (_m *EvmTxStore) SaveFinalizedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) error { - ret := _m.Called(ctx, r, chainID) +// SaveFinalizedReceipts provides a mock function with given fields: ctx, receipts, chainID +func (_m *EvmTxStore) SaveFinalizedReceipts(ctx context.Context, receipts []*evmtypes.Receipt, chainID *big.Int) error { + ret := _m.Called(ctx, receipts, chainID) if len(ret) == 0 { panic("no return value specified for SaveFinalizedReceipts") @@ -1121,7 +1139,7 @@ func (_m *EvmTxStore) SaveFinalizedReceipts(ctx context.Context, r []*evmtypes.R var r0 error if rf, ok := ret.Get(0).(func(context.Context, []*evmtypes.Receipt, *big.Int) error); ok { - r0 = rf(ctx, r, chainID) + r0 = rf(ctx, receipts, chainID) } else { r0 = ret.Error(0) }