Skip to content

Commit

Permalink
[BCI-3573] - Remove dependence on FinalityDepth in EVM TXM code (#13794)
Browse files Browse the repository at this point in the history
* removing finality within common/txmgr

* remove finality from common/txmgr comment

* latestFinalizedBlockNum support within evm/txmgr txstore

* refactor tests after evm/txmgr txstore changes

* mocks for both common and core/evm

* remove comments that are still referencing to finalityDepth within evm/txmgr

* add changeset

* error if no LatestFinalizedHead was found

* fix mocks version

* fix mocks version

* mock versioning

* mock version

* mock version

* mock version

* mock version

* inject head tracker trimmed API

* fix tests

* mocks

* remove unnecesary tests as finalized head is guaranteed

* ht in shell local

* remove evm dep from confirmer and use generics instead

* make markOldTxesMissingReceiptAsErrored condition inclusive

* use already initialized head tracker within shell local

* fix mocking, fix unit test

* fix a potential bug

* fix bug

* refactor

* address comments

* fix lint

* rename

* have log back

* update comments

* remove nil check

* minor

* fix test

* grammar

* rephrase

---------

Co-authored-by: Joe Huang <[email protected]>
  • Loading branch information
Farber98 and huangzhen1997 authored Aug 9, 2024
1 parent bd648bd commit c330def
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 124 deletions.
5 changes: 5 additions & 0 deletions .changeset/chatty-spiders-double.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

remove dependency on FinalityDepth in EVM TXM code. #internal
66 changes: 47 additions & 19 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ var (
}, []string{"chainID"})
)

type confirmerHeadTracker[HEAD types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] interface {
LatestAndFinalizedBlock(ctx context.Context) (latest, finalized HEAD, err error)
}

// Confirmer is a broad service which performs four different tasks in sequence on every new longest chain
// Step 1: Mark that all currently pending transaction attempts were broadcast before this block
// Step 2: Check pending transactions for receipts
Expand Down Expand Up @@ -133,14 +137,15 @@ type Confirmer[
ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
enabledAddresses []ADDR

mb *mailbox.Mailbox[HEAD]
stopCh services.StopChan
wg sync.WaitGroup
initSync sync.Mutex
isStarted bool

mb *mailbox.Mailbox[HEAD]
stopCh services.StopChan
wg sync.WaitGroup
initSync sync.Mutex
isStarted bool
nConsecutiveBlocksChainTooShort int
isReceiptNil func(R) bool

headTracker confirmerHeadTracker[HEAD, BLOCK_HASH]
}

func NewConfirmer[
Expand All @@ -164,6 +169,7 @@ func NewConfirmer[
lggr logger.Logger,
isReceiptNil func(R) bool,
stuckTxDetector txmgrtypes.StuckTxDetector[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
headTracker confirmerHeadTracker[HEAD, BLOCK_HASH],
) *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
lggr = logger.Named(lggr, "Confirmer")
return &Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
Expand All @@ -181,6 +187,7 @@ func NewConfirmer[
mb: mailbox.NewSingle[HEAD](),
isReceiptNil: isReceiptNil,
stuckTxDetector: stuckTxDetector,
headTracker: headTracker,
}
}

Expand Down Expand Up @@ -297,7 +304,20 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro
return fmt.Errorf("CheckConfirmedMissingReceipt failed: %w", err)
}

if err := ec.CheckForReceipts(ctx, head.BlockNumber()); err != nil {
_, latestFinalizedHead, err := ec.headTracker.LatestAndFinalizedBlock(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve latest finalized head: %w", err)
}

if !latestFinalizedHead.IsValid() {
return fmt.Errorf("latest finalized head is not valid")
}

if latestFinalizedHead.BlockNumber() > head.BlockNumber() {
ec.lggr.Debugw("processHead received old block", "latestFinalizedHead", latestFinalizedHead.BlockNumber(), "headNum", head.BlockNumber(), "time", time.Since(mark), "id", "confirmer")
}

if err := ec.CheckForReceipts(ctx, head.BlockNumber(), latestFinalizedHead.BlockNumber()); err != nil {
return fmt.Errorf("CheckForReceipts failed: %w", err)
}

Expand All @@ -318,7 +338,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro
ec.lggr.Debugw("Finished RebroadcastWhereNecessary", "headNum", head.BlockNumber(), "time", time.Since(mark), "id", "confirmer")
mark = time.Now()

if err := ec.EnsureConfirmedTransactionsInLongestChain(ctx, head); err != nil {
if err := ec.EnsureConfirmedTransactionsInLongestChain(ctx, head, latestFinalizedHead.BlockNumber()); err != nil {
return fmt.Errorf("EnsureConfirmedTransactionsInLongestChain failed: %w", err)
}

Expand Down Expand Up @@ -395,8 +415,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che
return
}

// CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64) error {
// CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number.
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64) error {
attempts, err := ec.txStore.FindTxAttemptsRequiringReceiptFetch(ctx, ec.chainID)
if err != nil {
return fmt.Errorf("FindTxAttemptsRequiringReceiptFetch failed: %w", err)
Expand Down Expand Up @@ -443,7 +463,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che
return fmt.Errorf("unable to mark txes as 'confirmed_missing_receipt': %w", err)
}

if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, ec.chainConfig.FinalityDepth(), ec.chainID); err != nil {
if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, latestFinalizedBlockNum, ec.chainID); err != nil {
return fmt.Errorf("unable to confirm buried unconfirmed txes': %w", err)
}
return nil
Expand Down Expand Up @@ -1004,22 +1024,30 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
}
}

// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the depth
// EnsureConfirmedTransactionsInLongestChain finds all confirmed txes up to the earliest head
// of the given chain and ensures that every one has a receipt with a block hash that is
// in the given chain.
//
// If any of the confirmed transactions does not have a receipt in the chain, it has been
// re-org'd out and will be rebroadcast.
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH]) error {
if head.ChainLength() < ec.chainConfig.FinalityDepth() {
logArgs := []interface{}{
"chainLength", head.ChainLength(), "finalityDepth", ec.chainConfig.FinalityDepth(),
}
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) EnsureConfirmedTransactionsInLongestChain(ctx context.Context, head types.Head[BLOCK_HASH], latestFinalizedHeadNumber int64) error {
logArgs := []interface{}{
"chainLength", head.ChainLength(), "latestFinalizedHead number", latestFinalizedHeadNumber,
}

if head.BlockNumber() < latestFinalizedHeadNumber {
errMsg := "current head is shorter than latest finalized head"
ec.lggr.Errorw(errMsg, append(logArgs, "head block number", head.BlockNumber())...)
return errors.New(errMsg)
}

calculatedFinalityDepth := uint32(head.BlockNumber() - latestFinalizedHeadNumber)
if head.ChainLength() < calculatedFinalityDepth {
if ec.nConsecutiveBlocksChainTooShort > logAfterNConsecutiveBlocksChainTooShort {
warnMsg := "Chain length supplied for re-org detection was shorter than FinalityDepth. Re-org protection is not working properly. This could indicate a problem with the remote RPC endpoint, a compatibility issue with a particular blockchain, a bug with this particular blockchain, heads table being truncated too early, remote node out of sync, or something else. If this happens a lot please raise a bug with the Chainlink team including a log output sample and details of the chain and RPC endpoint you are using."
warnMsg := "Chain length supplied for re-org detection was shorter than the depth from the latest head to the finalized head. Re-org protection is not working properly. This could indicate a problem with the remote RPC endpoint, a compatibility issue with a particular blockchain, a bug with this particular blockchain, heads table being truncated too early, remote node out of sync, or something else. If this happens a lot please raise a bug with the Chainlink team including a log output sample and details of the chain and RPC endpoint you are using."
ec.lggr.Warnw(warnMsg, append(logArgs, "nConsecutiveBlocksChainTooShort", ec.nConsecutiveBlocksChainTooShort)...)
} else {
logMsg := "Chain length supplied for re-org detection was shorter than FinalityDepth"
logMsg := "Chain length supplied for re-org detection was shorter than the depth from the latest head to the finalized head"
ec.lggr.Debugw(logMsg, append(logArgs, "nConsecutiveBlocksChainTooShort", ec.nConsecutiveBlocksChainTooShort)...)
}
ec.nConsecutiveBlocksChainTooShort++
Expand Down
22 changes: 11 additions & 11 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type TransactionStore[
HasInProgressTransaction(ctx context.Context, account ADDR, chainID CHAIN_ID) (exists bool, err error)
LoadTxAttempts(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) (err error)
MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error
MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, latestFinalizedBlockNum int64, chainID CHAIN_ID) error
PreloadTxes(ctx context.Context, attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error
SaveInProgressAttempt(ctx context.Context, attempt *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error
Expand Down
13 changes: 9 additions & 4 deletions core/chains/evm/txmgr/builder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package txmgr

import (
"context"
"math/big"
"time"

Expand All @@ -13,12 +14,15 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/chaintype"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/keystore"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
)

type latestAndFinalizedBlockHeadTracker interface {
LatestAndFinalizedBlock(ctx context.Context) (latest, finalized *evmtypes.Head, err error)
}

// NewTxm constructs the necessary dependencies for the EvmTxm (broadcaster, confirmer, etc) and returns a new EvmTxManager
func NewTxm(
ds sqlutil.DataSource,
Expand All @@ -33,7 +37,7 @@ func NewTxm(
logPoller logpoller.LogPoller,
keyStore keystore.Eth,
estimator gas.EvmFeeEstimator,
headTracker httypes.HeadTracker,
headTracker latestAndFinalizedBlockHeadTracker,
) (txm TxManager,
err error,
) {
Expand All @@ -55,7 +59,7 @@ func NewTxm(
evmBroadcaster := NewEvmBroadcaster(txStore, txmClient, txmCfg, feeCfg, txConfig, listenerConfig, keyStore, txAttemptBuilder, lggr, checker, chainConfig.NonceAutoSync(), chainConfig.ChainType())
evmTracker := NewEvmTracker(txStore, keyStore, chainID, lggr)
stuckTxDetector := NewStuckTxDetector(lggr, client.ConfiguredChainID(), chainConfig.ChainType(), fCfg.PriceMax(), txConfig.AutoPurge(), estimator, txStore, client)
evmConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr, stuckTxDetector)
evmConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr, stuckTxDetector, headTracker)
evmFinalizer := NewEvmFinalizer(lggr, client.ConfiguredChainID(), chainConfig.RPCDefaultBatchSize(), txStore, client, headTracker)
var evmResender *Resender
if txConfig.ResendAfterThreshold() > 0 {
Expand Down Expand Up @@ -116,8 +120,9 @@ func NewEvmConfirmer(
txAttemptBuilder TxAttemptBuilder,
lggr logger.Logger,
stuckTxDetector StuckTxDetector,
headTracker latestAndFinalizedBlockHeadTracker,
) *Confirmer {
return txmgr.NewConfirmer(txStore, client, chainConfig, feeConfig, txConfig, dbConfig, keystore, txAttemptBuilder, lggr, func(r *evmtypes.Receipt) bool { return r == nil }, stuckTxDetector)
return txmgr.NewConfirmer(txStore, client, chainConfig, feeConfig, txConfig, dbConfig, keystore, txAttemptBuilder, lggr, func(r *evmtypes.Receipt) bool { return r == nil }, stuckTxDetector, headTracker)
}

// NewEvmTracker instantiates a new EVM tracker for abandoned transactions
Expand Down
Loading

0 comments on commit c330def

Please sign in to comment.