Skip to content

Commit

Permalink
Update ZK overflow detection logic (#14132)
Browse files Browse the repository at this point in the history
* Updated ZK overflow detection to skip tx with non-broadcast attempts and delayed detection for zkEVM

* Added changeset

* Added assumption violation log

* Updated XLayer to use the same detection logic as zkEVM

* Fixed test
  • Loading branch information
amit-momin authored Aug 16, 2024
1 parent 69a0090 commit 2e314cd
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .changeset/big-dots-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Updated ZK overflow detection to skip transactions with non-broadcasted attempts. Delayed detection for zkEVM using the MinAttempts config. Updated XLayer to use the same detection logic as zkEVM. #internal
12 changes: 10 additions & 2 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,16 @@ func (c *Chain) ValidateConfig() (err error) {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "Transactions.AutoPurge.DetectionApiUrl", Value: c.Transactions.AutoPurge.DetectionApiUrl.Scheme, Msg: "must be http or https"})
}
}
case chaintype.ChainZkEvm:
// No other configs are needed
case chaintype.ChainZkEvm, chaintype.ChainXLayer:
// MinAttempts is an optional config that can be used to delay the stuck tx detection for zkEVM or XLayer
// If MinAttempts is set, BumpThreshold cannot be 0
if c.Transactions.AutoPurge.MinAttempts != nil && *c.Transactions.AutoPurge.MinAttempts != 0 {
if c.GasEstimator.BumpThreshold == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "GasEstimator.BumpThreshold", Msg: fmt.Sprintf("must be set if Transactions.AutoPurge.MinAttempts is set for %s", chainType)})
} else if *c.GasEstimator.BumpThreshold == 0 {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "GasEstimator.BumpThreshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if Transactions.AutoPurge.MinAttempts is set for %s", chainType)})
}
}
default:
// Bump Threshold is required because the stuck tx heuristic relies on a minimum number of bump attempts to exist
if c.GasEstimator.BumpThreshold == nil {
Expand Down
53 changes: 44 additions & 9 deletions core/chains/evm/txmgr/stuck_tx_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type stuckTxDetectorConfig interface {
}

type stuckTxDetector struct {
lggr logger.Logger
lggr logger.SugaredLogger
chainID *big.Int
chainType chaintype.ChainType
maxPrice *assets.Wei
Expand All @@ -64,7 +64,7 @@ func NewStuckTxDetector(lggr logger.Logger, chainID *big.Int, chainType chaintyp
t.DisableCompression = true
httpClient := &http.Client{Transport: t}
return &stuckTxDetector{
lggr: lggr,
lggr: logger.Sugared(lggr),
chainID: chainID,
chainType: chainType,
maxPrice: maxPrice,
Expand Down Expand Up @@ -128,7 +128,7 @@ func (d *stuckTxDetector) DetectStuckTransactions(ctx context.Context, enabledAd
switch d.chainType {
case chaintype.ChainScroll:
return d.detectStuckTransactionsScroll(ctx, txs)
case chaintype.ChainZkEvm:
case chaintype.ChainZkEvm, chaintype.ChainXLayer:
return d.detectStuckTransactionsZkEVM(ctx, txs)
default:
return d.detectStuckTransactionsHeuristic(ctx, txs, blockNum)
Expand All @@ -153,11 +153,28 @@ func (d *stuckTxDetector) FindUnconfirmedTxWithLowestNonce(ctx context.Context,
}
}

// Build list of potentially stuck tx but exclude any that are already marked for purge
// Build list of potentially stuck tx but exclude any that are already marked for purge or have non-broadcasted attempts
var stuckTxs []Tx
for _, tx := range lowestNonceTxMap {
// Attempts are loaded newest to oldest so one marked for purge will always be first
if len(tx.TxAttempts) > 0 && !tx.TxAttempts[0].IsPurgeAttempt {
if len(tx.TxAttempts) == 0 {
d.lggr.AssumptionViolationw("encountered an unconfirmed transaction without an attempt", "tx", tx)
continue
}
// Check the transaction's attempts in case any are already marked for purge or if any are not broadcasted
// We can only have one non-broadcasted attempt for a transaction at a time
// Skip purge detection until all attempts are broadcasted to avoid conflicts with the purge attempt
var foundPurgeAttempt, foundNonBroadcastAttempt bool
for _, attempt := range tx.TxAttempts {
if attempt.IsPurgeAttempt {
foundPurgeAttempt = true
break
}
if attempt.State != types.TxAttemptBroadcast {
foundNonBroadcastAttempt = true
break
}
}
if !foundPurgeAttempt && !foundNonBroadcastAttempt {
stuckTxs = append(stuckTxs, tx)
}
}
Expand Down Expand Up @@ -322,14 +339,32 @@ func (d *stuckTxDetector) detectStuckTransactionsScroll(ctx context.Context, txs
// Uses eth_getTransactionByHash to detect that a transaction has been discarded due to overflow
// Currently only used by zkEVM but if other chains follow the same behavior in the future
func (d *stuckTxDetector) detectStuckTransactionsZkEVM(ctx context.Context, txs []Tx) ([]Tx, error) {
txReqs := make([]rpc.BatchElem, len(txs))
minAttempts := 0
if d.cfg.MinAttempts() != nil {
minAttempts = int(*d.cfg.MinAttempts())
}
// Check transactions have MinAttempts to ensure it has enough time to return results for getTransactionByHash
// zkEVM has a significant delay between broadcasting a transaction and getting a proper result from the RPC
var filteredTx []Tx
for _, tx := range txs {
if len(tx.TxAttempts) >= minAttempts {
filteredTx = append(filteredTx, tx)
}
}

// No transactions to process
if len(filteredTx) == 0 {
return filteredTx, nil
}

txReqs := make([]rpc.BatchElem, len(filteredTx))
txHashMap := make(map[common.Hash]Tx)
txRes := make([]*map[string]interface{}, len(txs))
txRes := make([]*map[string]interface{}, len(filteredTx))

// Build batch request elems to perform
// Does not need to be separated out into smaller batches
// Max number of transactions to check is equal to the number of enabled addresses which is a relatively small amount
for i, tx := range txs {
for i, tx := range filteredTx {
latestAttemptHash := tx.TxAttempts[0].Hash
var result map[string]interface{}
txReqs[i] = rpc.BatchElem{
Expand Down
56 changes: 55 additions & 1 deletion core/chains/evm/txmgr/stuck_tx_detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,30 @@ func TestStuckTxDetector_FindPotentialStuckTxs(t *testing.T) {
require.NoError(t, err)
require.Len(t, stuckTxs, 0)
})

t.Run("excludes transactions with a in-progress attempt", func(t *testing.T) {
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress)
attempt := cltest.NewLegacyEthTxAttempt(t, etx.ID)
attempt.TxFee.Legacy = assets.NewWeiI(2)
attempt.State = txmgrtypes.TxAttemptInProgress
require.NoError(t, txStore.InsertTxAttempt(ctx, &attempt))
stuckTxs, err := stuckTxDetector.FindUnconfirmedTxWithLowestNonce(ctx, []common.Address{fromAddress})
require.NoError(t, err)
require.Len(t, stuckTxs, 0)
})

t.Run("excludes transactions with an insufficient funds attempt", func(t *testing.T) {
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress)
attempt := cltest.NewLegacyEthTxAttempt(t, etx.ID)
attempt.TxFee.Legacy = assets.NewWeiI(2)
attempt.State = txmgrtypes.TxAttemptInsufficientFunds
require.NoError(t, txStore.InsertTxAttempt(ctx, &attempt))
stuckTxs, err := stuckTxDetector.FindUnconfirmedTxWithLowestNonce(ctx, []common.Address{fromAddress})
require.NoError(t, err)
require.Len(t, stuckTxs, 0)
})
}

func TestStuckTxDetector_DetectStuckTransactionsHeuristic(t *testing.T) {
Expand Down Expand Up @@ -271,8 +295,9 @@ func TestStuckTxDetector_DetectStuckTransactionsZkEVM(t *testing.T) {
enabled: true,
}
blockNum := int64(100)
stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, chaintype.ChainZkEvm, assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient)

t.Run("returns empty list if no stuck transactions identified", func(t *testing.T) {
stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, chaintype.ChainZkEvm, assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient)
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
tx := mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, 0, fromAddress, 1, blockNum, tenGwei)
attempts := tx.TxAttempts[0]
Expand All @@ -292,6 +317,7 @@ func TestStuckTxDetector_DetectStuckTransactionsZkEVM(t *testing.T) {
})

t.Run("returns stuck transactions discarded by chain", func(t *testing.T) {
stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, chaintype.ChainZkEvm, assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient)
// Insert tx that will be mocked as stuck
_, fromAddress1 := cltest.MustInsertRandomKey(t, ethKeyStore)
mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, 0, fromAddress1, 1, blockNum, tenGwei)
Expand All @@ -316,6 +342,34 @@ func TestStuckTxDetector_DetectStuckTransactionsZkEVM(t *testing.T) {
// Expect only 1 tx to return as stuck due to nil eth_getTransactionByHash response
require.Len(t, txs, 1)
})

t.Run("skips stuck tx detection for transactions that do not have enough attempts", func(t *testing.T) {
autoPurgeCfg.minAttempts = ptr(uint32(2))
stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, chaintype.ChainZkEvm, assets.NewWei(assets.NewEth(100).ToInt()), autoPurgeCfg, feeEstimator, txStore, ethClient)
// Insert tx with enough attempts for detection
_, fromAddress1 := cltest.MustInsertRandomKey(t, ethKeyStore)
etx1 := mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, 0, fromAddress1, 1, blockNum, tenGwei)
attempt := cltest.NewLegacyEthTxAttempt(t, etx1.ID)
attempt.TxFee.Legacy = assets.NewWeiI(2)
attempt.State = txmgrtypes.TxAttemptBroadcast
require.NoError(t, txStore.InsertTxAttempt(ctx, &attempt))

// Insert tx that will be skipped for too few attempts
_, fromAddress2 := cltest.MustInsertRandomKey(t, ethKeyStore)
mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, 0, fromAddress2, 1, blockNum, tenGwei)

// Return nil response for a tx and a normal response for the other
ethClient.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool {
return len(b) == 1
})).Return(nil).Run(func(args mock.Arguments) {
elems := args.Get(1).([]rpc.BatchElem)
elems[0].Result = nil // Return nil to signal discarded tx
}).Once()

txs, err := stuckTxDetector.DetectStuckTransactions(ctx, []common.Address{fromAddress1, fromAddress2}, blockNum)
require.NoError(t, err)
require.Len(t, txs, 1)
})
}

func TestStuckTxDetector_DetectStuckTransactionsScroll(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,8 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
require.NoError(t, err)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, commontypes.Fatal, state)
require.Error(t, err, evmclient.TerminallyStuckMsg)
require.Error(t, err)
require.Equal(t, evmclient.TerminallyStuckMsg, err.Error())

// Test a terminally stuck client error returns Fatal
nonce = evmtypes.Nonce(1)
Expand All @@ -825,7 +826,8 @@ func TestTxm_GetTransactionStatus(t *testing.T) {
require.NoError(t, err)
state, err = txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, commontypes.Fatal, state)
require.Error(t, err, evmclient.TerminallyStuckMsg)
require.Error(t, err)
require.Equal(t, terminallyStuckClientError, err.Error())
})

t.Run("returns failed for fatal error state with other error", func(t *testing.T) {
Expand Down

0 comments on commit 2e314cd

Please sign in to comment.