Skip to content

Commit

Permalink
Merge branch 'BCI-3055-Add-new-TXM-method-to-allow-products-to-query-…
Browse files Browse the repository at this point in the history
…transaction-status' into zk-overflow-short-term
  • Loading branch information
FelixFan1992 committed May 28, 2024
2 parents 1dc3d29 + 26c8028 commit c7c8377
Show file tree
Hide file tree
Showing 15 changed files with 405 additions and 35 deletions.
5 changes: 5 additions & 0 deletions .changeset/funny-snails-shake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Added API for products to query a transaction's status in the TXM #added
9 changes: 9 additions & 0 deletions common/client/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package client

// Provides error classification to external components in a chain agnostic way
// Only exposes the error types that could be set in the transaction error field
type TxError interface {
error
IsFatal() bool
IsTerminallyStuck() bool
}
6 changes: 3 additions & 3 deletions common/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
InsufficientFunds // Tx was rejected due to insufficient funds.
ExceedsMaxFee // Attempt's fee was higher than the node's limit and got rejected.
FeeOutOfValidRange // This error is returned when we use a fee price suggested from an RPC, but the network rejects the attempt due to an invalid range(mostly used by L2 chains). Retry by requesting a new suggested fee price.
OutOfCounters // The error returned when a transaction is too complex to be proven by zk circuits. This error is mainly returned by zk chains.
TerminallyStuck // The error returned when a transaction is or could get terminally stuck in the mempool without any chance of inclusion.
sendTxReturnCodeLen // tracks the number of errors. Must always be last
)

Expand Down Expand Up @@ -50,8 +50,8 @@ func (c SendTxReturnCode) String() string {
return "ExceedsMaxFee"
case FeeOutOfValidRange:
return "FeeOutOfValidRange"
case OutOfCounters:
return "OutOfCounters"
case TerminallyStuck:
return "TerminallyStuck"
default:
return fmt.Sprintf("SendTxReturnCode(%d)", c)
}
Expand Down
2 changes: 1 addition & 1 deletion common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {
ExpectedTxResult: "not enough keccak counters to continue the execution",
ExpectedCriticalErr: "",
ResultsByCode: sendTxErrors{
OutOfCounters: {errors.New("not enough keccak counters to continue the execution")},
TerminallyStuck: {errors.New("not enough keccak counters to continue the execution")},
},
},
}
Expand Down
30 changes: 30 additions & 0 deletions common/txmgr/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions common/txmgr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,14 @@ const (
TxConfirmed = txmgrtypes.TxState("confirmed")
TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt")
)

// TransactionStatus are the status we expect every TXM to support and that can be returned by StatusForUUID.
type TransactionStatus int

const (
Unknown TransactionStatus = iota
Unconfirmed
Finalized
Failed
Fatal
)
68 changes: 64 additions & 4 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/common/client"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/headtracker"
iutils "github.com/smartcontractkit/chainlink/v2/common/internal/utils"
Expand All @@ -29,6 +30,8 @@ import (
// ResumeCallback is assumed to be idempotent
type ResumeCallback func(ctx context.Context, id uuid.UUID, result interface{}, err error) error

type NewTxError func(err error) client.TxError

// TxManager is the main component of the transaction manager.
// It is also the interface to external callers.
//
Expand Down Expand Up @@ -62,6 +65,7 @@ type TxManager[
FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error)
FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error)
CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error)
GetTransactionStatus(ctx context.Context, transactionID uuid.UUID) (state TransactionStatus, err error)
}

type reset struct {
Expand Down Expand Up @@ -93,10 +97,12 @@ type Txm[
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
pruneQueueAndCreateLock sync.Mutex

chHeads chan HEAD
trigger chan ADDR
reset chan reset
resumeCallback ResumeCallback
chHeads chan HEAD
latestFinalizedBlockNum int64
finalizedBlockNumMu sync.RWMutex
trigger chan ADDR
reset chan reset
resumeCallback ResumeCallback

chStop services.StopChan
chSubbed chan struct{}
Expand All @@ -109,6 +115,7 @@ type Txm[
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
newTxError NewTxError
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -141,6 +148,7 @@ func NewTxm[
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
newTxErrorFunc NewTxError,
) *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
b := Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
logger: logger.Sugared(lggr),
Expand All @@ -161,6 +169,7 @@ func NewTxm[
confirmer: confirmer,
resender: resender,
tracker: tracker,
newTxError: newTxErrorFunc,
}

if txCfg.ResendAfterThreshold() <= 0 {
Expand Down Expand Up @@ -411,6 +420,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
case head := <-b.chHeads:
b.confirmer.mb.Deliver(head)
b.tracker.mb.Deliver(head.BlockNumber())
// Set latest finalized block number
b.finalizedBlockNumMu.Lock()
if head.LatestFinalizedHead() != nil && head.LatestFinalizedHead().BlockNumber() != 0 {
b.latestFinalizedBlockNum = head.LatestFinalizedHead().BlockNumber()
}
b.finalizedBlockNumMu.Unlock()
case reset := <-b.reset:
// This check prevents the weird edge-case where you can select
// into this block after chStop has already been closed and the
Expand Down Expand Up @@ -625,6 +640,47 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTrans
return b.txStore.CountTransactionsByState(ctx, state, b.chainID)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID uuid.UUID) (status TransactionStatus, err error) {
// Loads attempts and receipts
tx, err := b.txStore.FindTxWithIdempotencyKey(ctx, transactionID.String(), b.chainID)
if err != nil {
return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s: %w", transactionID.String(), err)
}
// This check is required since a no-rows error returns nil err
if tx == nil {
return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s", transactionID.String())
}
switch tx.State {
case TxUnconfirmed, TxConfirmedMissingReceipt:
return Unconfirmed, nil
case TxConfirmed:
var receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH]
// Find tx receipt if one exists
for _, attempt := range tx.TxAttempts {
if len(attempt.Receipts) > 0 {
// Tx will only have one receipt
receipt = attempt.Receipts[0]
break
}
}
b.finalizedBlockNumMu.RLock()
defer b.finalizedBlockNumMu.RUnlock()
if receipt != nil && b.latestFinalizedBlockNum != 0 && receipt.GetBlockNumber().Cmp(big.NewInt(b.latestFinalizedBlockNum)) <= 0 {
return Finalized, nil
}
return Unconfirmed, nil
case TxFatalError:
txErr := b.newTxError(tx.GetError())
if txErr != nil && txErr.IsTerminallyStuck() {
return Fatal, tx.GetError()
}
return Failed, tx.GetError()
default:
// Unstarted and InProgress are classified as unknown since they are not supported by the ChainWriter interface
return Unknown, nil
}
}

type NullTxManager[
CHAIN_ID types.ID,
HEAD types.Head[BLOCK_HASH],
Expand Down Expand Up @@ -708,6 +764,10 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Cou
return count, errors.New(n.ErrMsg)
}

func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID uuid.UUID) (status TransactionStatus, err error) {
return
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueueAndCreateTxn(
ctx context.Context,
txRequest txmgrtypes.TxRequest[ADDR, TX_HASH],
Expand Down
1 change: 1 addition & 0 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type TxStore[
FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) (tx []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Find transactions loaded with transaction attempts and receipts by transaction IDs and states
FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []int64, states []TxState, chainID *big.Int) (tx []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID CHAIN_ID) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
}

// TransactionStore contains the persistence layer methods needed to manage Txs and TxAttempts
Expand Down
31 changes: 25 additions & 6 deletions core/chains/evm/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
TransactionAlreadyMined
Fatal
ServiceUnavailable
OutOfCounters
TerminallyStuck
)

type ClientErrors map[int]*regexp.Regexp
Expand Down Expand Up @@ -246,10 +246,17 @@ var zkSync = ClientErrors{
}

var zkEvm = ClientErrors{
OutOfCounters: regexp.MustCompile(`(?:: |^)not enough .* counters to continue the execution$`),
TerminallyStuck: regexp.MustCompile(`(?:: |^)not enough .* counters to continue the execution$`),
}

var clients = []ClientErrors{parity, geth, arbitrum, metis, substrate, avalanche, nethermind, harmony, besu, erigon, klaytn, celo, zkSync, zkEvm}
const TerminallyStuckMsg = "transaction terminally stuck"

// Tx.Error messages that are set internally so they are not chain or client specific
var internal = ClientErrors{
TerminallyStuck: regexp.MustCompile(TerminallyStuckMsg),
}

var clients = []ClientErrors{parity, geth, arbitrum, metis, substrate, avalanche, nethermind, harmony, besu, erigon, klaytn, celo, zkSync, zkEvm, internal}

// ClientErrorRegexes returns a map of compiled regexes for each error type
func ClientErrorRegexes(errsRegex config.ClientErrors) *ClientErrors {
Expand Down Expand Up @@ -353,9 +360,17 @@ func (s *SendError) IsServiceUnavailable(configErrors *ClientErrors) bool {
return s.is(ServiceUnavailable, configErrors)
}

// IsOutOfCounters is a zk chain specific error returned if the transaction is too complex to prove on zk circuits
func (s *SendError) IsOutOfCounters(configErrors *ClientErrors) bool {
return s.is(OutOfCounters, configErrors)
// IsTerminallyStuck indicates if a transaction was stuck without any chance of inclusion
func (s *SendError) IsTerminallyStuckConfigError(configErrors *ClientErrors) bool {
return s.is(TerminallyStuck, configErrors)
}

func (s *SendError) IsFatal() bool {
return s.Fatal(nil)
}

func (s *SendError) IsTerminallyStuck() bool {
return s.IsTerminallyStuckConfigError(nil)
}

// IsTimeout indicates if the error was caused by an exceeded context deadline
Expand Down Expand Up @@ -399,6 +414,10 @@ func NewSendError(e error) *SendError {
return &SendError{err: pkgerrors.WithStack(e), fatal: fatal}
}

func NewTxError(e error) commonclient.TxError {
return NewSendError(e)
}

// Geth/parity returns these errors if the transaction failed in such a way that:
// 1. It will never be included into a block as a result of this send
// 2. Resending the transaction at a different gas price will never change the outcome
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/tx_simulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestSimulateTx_Default(t *testing.T) {
Data: []byte("0x00"),
}
sendErr := client.SimulateTransaction(ctx, ethClient, logger.TestSugared(t), "", msg)
require.Equal(t, true, sendErr.IsOutOfCounters(nil))
require.Equal(t, true, sendErr.IsTerminallyStuck())
})

t.Run("returns without error if simulation returns non-OOC error", func(t *testing.T) {
Expand Down Expand Up @@ -108,6 +108,6 @@ func TestSimulateTx_Default(t *testing.T) {
Data: []byte("0x00"),
}
sendErr := client.SimulateTransaction(ctx, ethClient, logger.TestSugared(t), "", msg)
require.Equal(t, false, sendErr.IsOutOfCounters(nil))
require.Equal(t, false, sendErr.IsTerminallyStuck())
})
}
3 changes: 2 additions & 1 deletion core/chains/evm/txmgr/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
Expand Down Expand Up @@ -77,7 +78,7 @@ func NewEvmTxm(
resender *Resender,
tracker *Tracker,
) *Txm {
return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker)
return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker, client.NewTxError)
}

// NewEvmResender creates a new concrete EvmResender
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3219,7 +3219,7 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, dbTx)
require.Equal(t, txmgrcommon.TxFatalError, dbTx.State)
require.Equal(t, "transaction terminally stuck", dbTx.Error.String)
require.Equal(t, client.TerminallyStuckMsg, dbTx.Error.String)
})
}

Expand Down
28 changes: 19 additions & 9 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,16 +1061,26 @@ func (o *evmTxStore) FindTxWithIdempotencyKey(ctx context.Context, idempotencyKe
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
var dbEtx DbEthTx
err = o.q.GetContext(ctx, &dbEtx, `SELECT * FROM evm.txes WHERE idempotency_key = $1 and evm_chain_id = $2`, idempotencyKey, chainID.String())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
err = o.Transact(ctx, true, func(orm *evmTxStore) error {
var dbEtx DbEthTx
err = o.q.GetContext(ctx, &dbEtx, `SELECT * FROM evm.txes WHERE idempotency_key = $1 and evm_chain_id = $2`, idempotencyKey, chainID.String())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return pkgerrors.Wrap(err, "FindTxWithIdempotencyKey failed to load evm.txes")
}
return nil, pkgerrors.Wrap(err, "FindTxWithIdempotencyKey failed to load evm.txes")
}
etx = new(Tx)
dbEtx.ToTx(etx)
etx = new(Tx)
dbEtx.ToTx(etx)
etxArr := []*Tx{etx}
if err = orm.LoadTxesAttempts(ctx, etxArr); err != nil {
return fmt.Errorf("FindTxWithIdempotencyKey failed to load evm.tx_attempts: %w", err)
}
if err = orm.loadEthTxesAttemptsReceipts(ctx, etxArr); err != nil {
return fmt.Errorf("FindTxWithIdempotencyKey failed to load evm.receipts: %w", err)
}
return nil
})
return
}

Expand Down
Loading

0 comments on commit c7c8377

Please sign in to comment.