Skip to content

Commit

Permalink
Updated to align transaction status method to ChainWriter expectations
Browse files Browse the repository at this point in the history
  • Loading branch information
amit-momin committed May 28, 2024
1 parent 6bc39ad commit 9f43854
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 38 deletions.
12 changes: 6 additions & 6 deletions common/txmgr/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions common/txmgr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,14 @@ const (
TxConfirmed = txmgrtypes.TxState("confirmed")
TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt")
)

// TransactionStatus are the status we expect every TXM to support and that can be returned by StatusForUUID.
type TransactionStatus int

const (
Unknown TransactionStatus = iota
Unconfirmed
Finalized
Failed
Fatal
)
53 changes: 45 additions & 8 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type TxManager[
FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error)
FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error)
CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error)
GetTransactionStatus(ctx context.Context, transactionID uuid.UUID) (state txmgrtypes.TxState, err error)
GetTransactionStatus(ctx context.Context, transactionID uuid.UUID) (state TransactionStatus, err error)
}

type reset struct {
Expand Down Expand Up @@ -97,10 +97,12 @@ type Txm[
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
pruneQueueAndCreateLock sync.Mutex

chHeads chan HEAD
trigger chan ADDR
reset chan reset
resumeCallback ResumeCallback
chHeads chan HEAD
latestFinalizedBlockNum int64
finalizedBlockNumMu sync.RWMutex
trigger chan ADDR
reset chan reset
resumeCallback ResumeCallback

chStop services.StopChan
chSubbed chan struct{}
Expand Down Expand Up @@ -418,6 +420,12 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
case head := <-b.chHeads:
b.confirmer.mb.Deliver(head)
b.tracker.mb.Deliver(head.BlockNumber())
// Set latest finalized block number
b.finalizedBlockNumMu.Lock()
if head.LatestFinalizedHead() != nil && head.LatestFinalizedHead().BlockNumber() != 0 {
b.latestFinalizedBlockNum = head.LatestFinalizedHead().BlockNumber()
}
b.finalizedBlockNumMu.Unlock()
case reset := <-b.reset:
// This check prevents the weird edge-case where you can select
// into this block after chStop has already been closed and the
Expand Down Expand Up @@ -632,7 +640,8 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTrans
return b.txStore.CountTransactionsByState(ctx, state, b.chainID)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID uuid.UUID) (status txmgrtypes.TxState, err error) {
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID uuid.UUID) (status TransactionStatus, err error) {
// Loads attempts and receipts
tx, err := b.txStore.FindTxWithIdempotencyKey(ctx, transactionID.String(), b.chainID)
if err != nil {
return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s: %w", transactionID.String(), err)
Expand All @@ -641,7 +650,35 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTransac
if tx == nil {
return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s", transactionID.String())
}
return tx.State, tx.GetError()
switch tx.State {
case TxUnconfirmed, TxConfirmedMissingReceipt:
return Unconfirmed, nil
case TxConfirmed:
var receipt txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH]
// Find tx receipt if one exists
for _, attempt := range tx.TxAttempts {
if len(attempt.Receipts) > 0 {
// Tx will only have one receipt
receipt = attempt.Receipts[0]
break
}
}
b.finalizedBlockNumMu.RLock()
defer b.finalizedBlockNumMu.RUnlock()
if receipt != nil && b.latestFinalizedBlockNum != 0 && receipt.GetBlockNumber().Cmp(big.NewInt(b.latestFinalizedBlockNum)) <= 0 {
return Finalized, nil
}
return Unconfirmed, nil
case TxFatalError:
txErr := b.newTxError(tx.GetError())
if txErr != nil && txErr.IsTerminallyStuck() {
return Fatal, tx.GetError()
}
return Failed, tx.GetError()
default:
// Unstarted and InProgress are classified as unknown since they are not supported by the ChainWriter interface
return Unknown, nil
}
}

type NullTxManager[
Expand Down Expand Up @@ -727,7 +764,7 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Cou
return count, errors.New(n.ErrMsg)
}

func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID uuid.UUID) (state txmgrtypes.TxState, err error) {
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID uuid.UUID) (status TransactionStatus, err error) {
return
}

Expand Down
28 changes: 19 additions & 9 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,16 +1061,26 @@ func (o *evmTxStore) FindTxWithIdempotencyKey(ctx context.Context, idempotencyKe
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
var dbEtx DbEthTx
err = o.q.GetContext(ctx, &dbEtx, `SELECT * FROM evm.txes WHERE idempotency_key = $1 and evm_chain_id = $2`, idempotencyKey, chainID.String())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
err = o.Transact(ctx, true, func(orm *evmTxStore) error {
var dbEtx DbEthTx
err = o.q.GetContext(ctx, &dbEtx, `SELECT * FROM evm.txes WHERE idempotency_key = $1 and evm_chain_id = $2`, idempotencyKey, chainID.String())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return pkgerrors.Wrap(err, "FindTxWithIdempotencyKey failed to load evm.txes")
}
return nil, pkgerrors.Wrap(err, "FindTxWithIdempotencyKey failed to load evm.txes")
}
etx = new(Tx)
dbEtx.ToTx(etx)
etx = new(Tx)
dbEtx.ToTx(etx)
etxArr := []*Tx{etx}
if err = orm.LoadTxesAttempts(ctx, etxArr); err != nil {
return fmt.Errorf("FindTxWithIdempotencyKey failed to load evm.tx_attempts: %w", err)
}
if err = orm.loadEthTxesAttemptsReceipts(ctx, etxArr); err != nil {
return fmt.Errorf("FindTxWithIdempotencyKey failed to load evm.receipts: %w", err)
}
return nil
})
return
}

Expand Down
83 changes: 68 additions & 15 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,18 +599,34 @@ func TestTxm_TxStatusByIdempotencyKey(t *testing.T) {
cfg := evmtest.NewChainScopedConfig(t, gcfg)

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
ethClient.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), nil).Maybe()
feeEstimator := gasmocks.NewEvmFeeEstimator(t)
feeEstimator.On("Start", mock.Anything).Return(nil).Once()
feeEstimator.On("OnNewLongestChain", mock.Anything, mock.Anything).Once()
txm, err := makeTestEvmTxm(t, db, ethClient, feeEstimator, cfg.EVM(), cfg.EVM().GasEstimator(), cfg.EVM().Transactions(), gcfg.Database(), gcfg.Database().Listener(), ethKeyStore)
require.NoError(t, err)
err = txm.Start(ctx)
require.NoError(t, err)

head := &evmtypes.Head{
Hash: utils.NewHash(),
Number: 100,
Parent: &evmtypes.Head{
Hash: utils.NewHash(),
Number: 99,
IsFinalized: true,
},
}
txm.OnNewLongestChain(ctx, head)

t.Run("returns error if transaction not found", func(t *testing.T) {
idempotencyKey := uuid.New()
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.Error(t, err, fmt.Sprintf("failed to find transaction with IdempotencyKey: %s", idempotencyKey))
require.Equal(t, txmgrtypes.TxState(""), state)
require.Equal(t, txmgrcommon.Unknown, state)
})

t.Run("returns unstarted state", func(t *testing.T) {
t.Run("returns unknown for unstarted state", func(t *testing.T) {
idempotencyKey := uuid.New()
idempotencyKeyStr := idempotencyKey.String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
Expand All @@ -625,10 +641,10 @@ func TestTxm_TxStatusByIdempotencyKey(t *testing.T) {
require.NoError(t, err)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.NoError(t, err)
require.Equal(t, txmgrcommon.TxUnstarted, state)
require.Equal(t, txmgrcommon.Unknown, state)
})

t.Run("returns in-progress state", func(t *testing.T) {
t.Run("returns unknown for in-progress state", func(t *testing.T) {
idempotencyKey := uuid.New()
idempotencyKeyStr := idempotencyKey.String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
Expand All @@ -645,10 +661,10 @@ func TestTxm_TxStatusByIdempotencyKey(t *testing.T) {
require.NoError(t, err)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.NoError(t, err)
require.Equal(t, txmgrcommon.TxInProgress, state)
require.Equal(t, txmgrcommon.Unknown, state)
})

t.Run("returns unconfirmed state", func(t *testing.T) {
t.Run("returns unconfirmed for unconfirmed state", func(t *testing.T) {
idempotencyKey := uuid.New()
idempotencyKeyStr := idempotencyKey.String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
Expand All @@ -668,10 +684,10 @@ func TestTxm_TxStatusByIdempotencyKey(t *testing.T) {
require.NoError(t, err)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.NoError(t, err)
require.Equal(t, txmgrcommon.TxUnconfirmed, state)
require.Equal(t, txmgrcommon.Unconfirmed, state)
})

t.Run("returns confirmed state", func(t *testing.T) {
t.Run("returns unconfirmed for confirmed state newer than finalized block", func(t *testing.T) {
idempotencyKey := uuid.New()
idempotencyKeyStr := idempotencyKey.String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
Expand All @@ -689,12 +705,49 @@ func TestTxm_TxStatusByIdempotencyKey(t *testing.T) {
}
err := txStore.InsertTx(ctx, tx)
require.NoError(t, err)
tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKeyStr, testutils.FixtureChainID)
require.NoError(t, err)
attempt := cltest.NewLegacyEthTxAttempt(t, tx.ID)
err = txStore.InsertTxAttempt(ctx, &attempt)
require.NoError(t, err)
// Insert receipt for unfinalized block num
mustInsertEthReceipt(t, txStore, head.Number, head.Hash, attempt.Hash)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.NoError(t, err)
require.Equal(t, txmgrcommon.Unconfirmed, state)
})

t.Run("returns finalized for confirmed state older than finalized block", func(t *testing.T) {
idempotencyKey := uuid.New()
idempotencyKeyStr := idempotencyKey.String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
nonce := evmtypes.Nonce(0)
broadcast := time.Now()
tx := &txmgr.Tx{
Sequence: &nonce,
IdempotencyKey: &idempotencyKeyStr,
FromAddress: fromAddress,
EncodedPayload: []byte{1, 2, 3},
FeeLimit: feeLimit,
State: txmgrcommon.TxConfirmed,
BroadcastAt: &broadcast,
InitialBroadcastAt: &broadcast,
}
err := txStore.InsertTx(ctx, tx)
require.NoError(t, err)
tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKeyStr, testutils.FixtureChainID)
require.NoError(t, err)
attempt := cltest.NewLegacyEthTxAttempt(t, tx.ID)
err = txStore.InsertTxAttempt(ctx, &attempt)
require.NoError(t, err)
// Insert receipt for finalized block num
mustInsertEthReceipt(t, txStore, head.Parent.Number, head.Parent.Hash, attempt.Hash)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.NoError(t, err)
require.Equal(t, txmgrcommon.TxConfirmed, state)
require.Equal(t, txmgrcommon.Finalized, state)
})

t.Run("returns confirmed missing receipt state", func(t *testing.T) {
t.Run("returns unconfirmed for confirmed missing receipt state", func(t *testing.T) {
idempotencyKey := uuid.New()
idempotencyKeyStr := idempotencyKey.String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
Expand All @@ -714,10 +767,10 @@ func TestTxm_TxStatusByIdempotencyKey(t *testing.T) {
require.NoError(t, err)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.NoError(t, err)
require.Equal(t, txmgrcommon.TxConfirmedMissingReceipt, state)
require.Equal(t, txmgrcommon.Unconfirmed, state)
})

t.Run("returns fatal error state with terminally stuck error", func(t *testing.T) {
t.Run("returns fatal for fatal error state with terminally stuck error", func(t *testing.T) {
idempotencyKey := uuid.New()
idempotencyKeyStr := idempotencyKey.String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
Expand All @@ -737,11 +790,11 @@ func TestTxm_TxStatusByIdempotencyKey(t *testing.T) {
err := txStore.InsertTx(ctx, tx)
require.NoError(t, err)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, txmgrcommon.TxFatalError, state)
require.Equal(t, txmgrcommon.Fatal, state)
require.Error(t, err, client.TerminallyStuckMsg)
})

t.Run("returns fatal error state with other error", func(t *testing.T) {
t.Run("returns failed for fatal error state with other error", func(t *testing.T) {
idempotencyKey := uuid.New()
idempotencyKeyStr := idempotencyKey.String()
_, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore)
Expand All @@ -757,7 +810,7 @@ func TestTxm_TxStatusByIdempotencyKey(t *testing.T) {
err := txStore.InsertTx(ctx, tx)
require.NoError(t, err)
state, err := txm.GetTransactionStatus(ctx, idempotencyKey)
require.Equal(t, txmgrcommon.TxFatalError, state)
require.Equal(t, txmgrcommon.Failed, state)
require.Error(t, err, errorMsg)
})
}
Expand Down

0 comments on commit 9f43854

Please sign in to comment.