diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index a9e5ebf0aac..1a06336aec7 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -1,6 +1,7 @@ package txmgr import ( + "fmt" "sync" "time" @@ -112,7 +113,7 @@ func newAddressState[ case TxFatalError: as.fatalErroredTxs[tx.ID] = &tx default: - panic("unknown transaction state") + panic(fmt.Sprintf("unknown transaction state: %q", tx.State)) } as.allTxs[tx.ID] = &tx if tx.IdempotencyKey != nil { @@ -141,11 +142,46 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions in the given states are considered. // If no txStates are provided, all transactions are considered. +// Any transaction states that are unknown will cause a panic. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState( txStates []txmgrtypes.TxState, fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), txIDs ...int64, ) { + as.Lock() + defer as.Unlock() + + // if txStates is empty then apply the filter to only the as.allTransactions map + if len(txStates) == 0 { + as._applyToTxs(as.allTxs, fn, txIDs...) + return + } + + for _, txState := range txStates { + switch txState { + case TxInProgress: + if as.inprogressTx != nil { + fn(as.inprogressTx) + } + case TxUnconfirmed: + as._applyToTxs(as.unconfirmedTxs, fn, txIDs...) + case TxConfirmedMissingReceipt: + as._applyToTxs(as.confirmedMissingReceiptTxs, fn, txIDs...) + case TxConfirmed: + as._applyToTxs(as.confirmedTxs, fn, txIDs...) + case TxFatalError: + as._applyToTxs(as.fatalErroredTxs, fn, txIDs...) + case TxUnstarted: + nfn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + if tx.State == TxUnstarted { + fn(tx) + } + } + as._applyToTxs(as.allTxs, nfn, txIDs...) + default: + panic(fmt.Sprintf("unknown transaction state: %q", txState)) + } + } } // findTxAttempts returns all attempts for the given transactions that match the given filters. @@ -162,6 +198,15 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx return nil } +// hasTx returns true if the transaction with the given ID is in the address state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) hasTx(txID int64) bool { + as.RLock() + defer as.RUnlock() + + _, ok := as.allTxs[txID] + return ok +} + // findTxs returns all transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. @@ -171,15 +216,145 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, txIDs ...int64, ) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { - return nil + as.RLock() + defer as.RUnlock() + + // if txStates is empty then apply the filter to only the as.allTransactions map + if len(txStates) == 0 { + return as._findTxs(as.allTxs, filter, txIDs...) + } + + var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, txState := range txStates { + switch txState { + case TxInProgress: + if as.inprogressTx != nil && filter(as.inprogressTx) { + txs = append(txs, *as.inprogressTx) + } + case TxUnconfirmed: + txs = append(txs, as._findTxs(as.unconfirmedTxs, filter, txIDs...)...) + case TxConfirmedMissingReceipt: + txs = append(txs, as._findTxs(as.confirmedMissingReceiptTxs, filter, txIDs...)...) + case TxConfirmed: + txs = append(txs, as._findTxs(as.confirmedTxs, filter, txIDs...)...) + case TxFatalError: + txs = append(txs, as._findTxs(as.fatalErroredTxs, filter, txIDs...)...) + case TxUnstarted: + fn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.State == TxUnstarted && filter(tx) + } + txs = append(txs, as._findTxs(as.allTxs, fn, txIDs...)...) + default: + panic(fmt.Sprintf("unknown transaction state: %q", txState)) + } + } + + return txs } // pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) { } +// reapConfirmedTxs removes confirmed transactions that are older than the given time threshold and have receipts older than the given block number threshold. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reapConfirmedTxs(minBlockNumberToKeep int64, timeThreshold time.Time) { + as.Lock() + defer as.Unlock() + + for _, tx := range as.confirmedTxs { + if len(tx.TxAttempts) == 0 { + continue + } + if tx.CreatedAt.After(timeThreshold) { + continue + } + + for i := 0; i < len(tx.TxAttempts); i++ { + if len(tx.TxAttempts[i].Receipts) == 0 { + continue + } + if tx.TxAttempts[i].Receipts[0].GetBlockNumber() == nil || tx.TxAttempts[i].Receipts[0].GetBlockNumber().Int64() >= minBlockNumberToKeep { + continue + } + as._deleteTx(tx.ID) + } + } +} + +// reapFatalErroredTxs removes fatal errored transactions that are older than the given time threshold. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reapFatalErroredTxs(timeThreshold time.Time) { + as.Lock() + defer as.Unlock() + + for _, tx := range as.fatalErroredTxs { + if tx.CreatedAt.After(timeThreshold) { + continue + } + as._deleteTx(tx.ID) + } +} + // deleteTxs removes the transactions with the given IDs from the address state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txIDs ...int64) { + as.Lock() + defer as.Unlock() + + as._deleteTxs(txIDs...) +} + +// deleteAllTxAttempts removes all attempts for the given transactions. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteAllTxAttempts(txID int64) { + as.Lock() + defer as.Unlock() + + as._deleteAllTxAttempts(txID) +} + +// deleteTxAttempt removes the attempt with a given ID from the transaction with the given ID. +// It removes the attempts from the hash lookup map and from the transaction. +// If an attempt is not found in the hash lookup map, it is ignored. +// If a transaction is not found in the allTxs map, it is ignored. +// No error is returned if the transaction or attempt is not found. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxAttempt(txID, txAttemptID int64) { + as.Lock() + defer as.Unlock() + + tx, ok := as.allTxs[txID] + if !ok || tx == nil { + return + } + + for i := 0; i < len(tx.TxAttempts); i++ { + txAttempt := tx.TxAttempts[i] + if txAttempt.ID == txAttemptID { + // remove the attempt from the hash lookup map + delete(as.attemptHashToTxAttempt, txAttempt.Hash) + // remove the attempt from the transaction + tx.TxAttempts = append(tx.TxAttempts[:i], tx.TxAttempts[i+1:]...) + break + } + } +} + +// addTxAttempt adds the given attempt to the transaction which matches its TxID. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxAttempt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + as.Lock() + defer as.Unlock() + + tx, ok := as.allTxs[txAttempt.TxID] + if !ok || tx == nil { + return fmt.Errorf("no transaction with ID %d", txAttempt.TxID) + } + + // add the attempt to the transaction + if tx.TxAttempts == nil { + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + } + tx.TxAttempts = append(tx.TxAttempts, txAttempt) + // add the attempt to the hash lookup map + as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt + + return nil } // peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. @@ -188,8 +363,8 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNe } // peekInProgressTx returns the in-progress transaction without removing it from the in-progress state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { - return nil, nil +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + return nil } // addTxToUnstarted adds the given transaction to the unstarted queue. @@ -228,23 +403,234 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUn } // moveTxToFatalError moves a transaction to the fatal error state. +// It returns an error if there is no transaction with the given ID. +// It returns an error if the transaction is not in an expected state. +// Unknown transaction states will cause a panic this includes Unconfirmed and Confirmed transactions. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxToFatalError( txID int64, txError null.String, ) error { + as.Lock() + defer as.Unlock() + + tx := as.allTxs[txID] + if tx == nil { + return fmt.Errorf("move_tx_to_fatal_error: no transaction with ID %d", txID) + } + originalState := tx.State + + // Move the transaction to the fatal error state + as._moveTxToFatalError(tx, txError) + + // Remove the transaction from its original state + switch originalState { + case TxUnstarted: + _ = as.unstartedTxs.RemoveTxByID(txID) + case TxInProgress: + as.inprogressTx = nil + case TxConfirmedMissingReceipt: + delete(as.confirmedMissingReceiptTxs, tx.ID) + case TxFatalError: + // Already in fatal error state + return nil + default: + panic(fmt.Sprintf("unknown transaction state: %q", tx.State)) + } + return nil } // moveUnconfirmedToConfirmedMissingReceipt moves the unconfirmed transaction to the confirmed missing receipt state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +// If there is no unconfirmed transaction with the given ID, an error is returned. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnconfirmedToConfirmedMissingReceipt(txID int64) error { + as.Lock() + defer as.Unlock() + + tx, ok := as.unconfirmedTxs[txID] + if !ok || tx == nil { + return fmt.Errorf("move_unconfirmed_to_confirmed_missing_receipt: no unconfirmed transaction with ID %d", txID) + } + as._moveUnconfirmedToConfirmedMissingReceipt(tx) + return nil } -// moveInProgressToConfirmedMissingReceipt moves the in-progress transaction to the confirmed missing receipt state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToConfirmedMissingReceipt(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { +// moveTxAttemptToBroadcast moves the transaction attempt to the broadcast state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveTxAttemptToBroadcast( + txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + broadcastAt time.Time, +) error { + as.Lock() + defer as.Unlock() + + tx, ok := as.allTxs[txAttempt.TxID] + if !ok || tx == nil { + return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: no transaction with ID %d", txAttempt.TxID) + } + + var found bool + for i := 0; i < len(tx.TxAttempts); i++ { + if tx.TxAttempts[i].ID == txAttempt.ID { + tx.TxAttempts[i].State = txmgrtypes.TxAttemptBroadcast + as.attemptHashToTxAttempt[txAttempt.Hash] = &tx.TxAttempts[i] + found = true + break + } + } + if !found { + return fmt.Errorf("move_in_progress_to_confirmed_missing_receipt: transaction with ID %d has no attempt with ID: %q", txAttempt.TxID, txAttempt.ID) + } + if tx.BroadcastAt != nil && tx.BroadcastAt.Before(broadcastAt) { + tx.BroadcastAt = &broadcastAt + } + as._moveUnconfirmedToConfirmedMissingReceipt(tx) + return nil } // moveConfirmedToUnconfirmed moves the confirmed transaction to the unconfirmed state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveConfirmedToUnconfirmed(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + as.Lock() + defer as.Unlock() + + if txAttempt.State != txmgrtypes.TxAttemptBroadcast { + return fmt.Errorf("attempt must be in broadcast state") + } + + tx, ok := as.confirmedTxs[txAttempt.TxID] + if !ok || tx == nil { + return fmt.Errorf("no confirmed transaction with ID %d", txAttempt.TxID) + } + if len(tx.TxAttempts) == 0 { + return fmt.Errorf("no attempts for transaction with ID %d", txAttempt.TxID) + } + tx.State = TxUnconfirmed + + // Delete the receipt from the attempt + for i := 0; i < len(tx.TxAttempts); i++ { + if tx.TxAttempts[i].ID == txAttempt.ID { + tx.TxAttempts[i].Receipts = nil + tx.TxAttempts[i].State = txmgrtypes.TxAttemptInProgress + tx.TxAttempts[i].BroadcastBeforeBlockNum = nil + break + } + } + + as.unconfirmedTxs[tx.ID] = tx + delete(as.confirmedTxs, tx.ID) + return nil } + +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _applyToTxs( + txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]), + txIDs ...int64, +) { + // if txIDs is not empty then only apply the filter to those transactions + if len(txIDs) > 0 { + for _, txID := range txIDs { + tx := txIDsToTx[txID] + if tx != nil { + fn(tx) + } + } + return + } + + // if txIDs is empty then apply the filter to all transactions + for _, tx := range txIDsToTx { + fn(tx) + } +} + +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _findTxs( + txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txIDs ...int64, +) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // if txIDs is not empty then only apply the filter to those transactions + if len(txIDs) > 0 { + for _, txID := range txIDs { + tx := txIDsToTx[txID] + if tx != nil && filter(tx) { + txs = append(txs, *tx) + } + } + return txs + } + + // if txIDs is empty then apply the filter to all transactions + for _, tx := range txIDsToTx { + if filter(tx) { + txs = append(txs, *tx) + } + } + + return txs +} + +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteAllTxAttempts( + txID int64, +) { + tx, ok := as.allTxs[txID] + if !ok { + return + } + + for i := 0; i < len(tx.TxAttempts); i++ { + txAttempt := tx.TxAttempts[i] + delete(as.attemptHashToTxAttempt, txAttempt.Hash) + } + tx.TxAttempts = nil +} + +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txIDs ...int64) { + for _, txID := range txIDs { + as._deleteTx(txID) + } +} + +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTx(txID int64) { + tx, ok := as.allTxs[txID] + if !ok { + return + } + + for i := 0; i < len(tx.TxAttempts); i++ { + txAttemptHash := tx.TxAttempts[i].Hash + delete(as.attemptHashToTxAttempt, txAttemptHash) + } + if tx.IdempotencyKey != nil { + delete(as.idempotencyKeyToTx, *tx.IdempotencyKey) + } + if as.inprogressTx != nil && as.inprogressTx.ID == txID { + as.inprogressTx = nil + } + as.unstartedTxs.RemoveTxByID(txID) + delete(as.unconfirmedTxs, txID) + delete(as.confirmedMissingReceiptTxs, txID) + delete(as.confirmedTxs, txID) + delete(as.fatalErroredTxs, txID) + delete(as.allTxs, txID) +} + +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveTxToFatalError( + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + txError null.String, +) { + tx.State = TxFatalError + tx.Sequence = nil + tx.BroadcastAt = nil + tx.InitialBroadcastAt = nil + tx.Error = txError + as.fatalErroredTxs[tx.ID] = tx +} + +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveUnconfirmedToConfirmedMissingReceipt( + tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], +) { + tx.State = TxConfirmedMissingReceipt + as.confirmedMissingReceiptTxs[tx.ID] = tx + delete(as.unconfirmedTxs, tx.ID) +} diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index bd4e9a2f3a6..7b1f270529d 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "sync" "time" "github.com/google/uuid" @@ -47,7 +48,8 @@ type inMemoryStore[ keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + addressStatesLock sync.RWMutex + addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] } // NewInMemoryStore returns a new inMemoryStore @@ -79,11 +81,21 @@ func NewInMemoryStore[ ms.maxUnstarted = 10000 } + addressesToTxs := map[ADDR][]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + // populate all enabled addresses + enabledAddresses, err := keyStore.EnabledAddressesForChain(ctx, chainID) + if err != nil { + return nil, fmt.Errorf("new_in_memory_store: %w", err) + } + for _, addr := range enabledAddresses { + addressesToTxs[addr] = []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + } + txs, err := persistentTxStore.GetAllTransactions(ctx, chainID) if err != nil { return nil, fmt.Errorf("address_state: initialization: %w", err) } - addressesToTxs := map[ADDR][]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, tx := range txs { at, exists := addressesToTxs[tx.FromAddress] if !exists { @@ -178,11 +190,74 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR oldAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { + if oldAttempt.State != txmgrtypes.TxAttemptInProgress || replacementAttempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("expected attempts to be in_progress") + } + if oldAttempt.ID == 0 { + return fmt.Errorf("expected oldAttempt to have an ID") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + for _, vas := range ms.addressStates { + if vas.hasTx(oldAttempt.TxID) { + as = vas + break + } + } + if as == nil { + return fmt.Errorf("save_replacement_in_progress_attempt: %w: %q", ErrTxnNotFound, oldAttempt.TxID) + } + + // Persist to persistent storage + if err := ms.persistentTxStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, replacementAttempt); err != nil { + return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) + } + + // Update in memory store + // delete the old attempt + as.deleteTxAttempt(oldAttempt.TxID, oldAttempt.ID) + // add the new attempt + if err := as.addTxAttempt(*replacementAttempt); err != nil { + return fmt.Errorf("save_replacement_in_progress_attempt: failed to add a replacement transaction attempt: %w", err) + } + return nil } // UpdateTxFatalError updates a transaction to fatal_error. func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + if tx.State != TxInProgress && tx.State != TxUnstarted { + return fmt.Errorf("update_tx_fatal_error: can only transition to fatal_error from in_progress, transaction is currently %s", tx.State) + } + if !tx.Error.Valid { + return fmt.Errorf("update_tx_fatal_error: expected error field to be set") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[tx.FromAddress] + if !ok { + return fmt.Errorf("update_tx_fatal_error: %w", ErrAddressNotFound) + } + if !as.hasTx(tx.ID) { + return fmt.Errorf("update_tx_fatal_error: %w: %q", ErrTxnNotFound, tx.ID) + } + + // Persist to persistent storage + if err := ms.persistentTxStore.UpdateTxFatalError(ctx, tx); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + + // Update in memory store + // remove all tx attempts for the transaction + as.deleteAllTxAttempts(tx.ID) + // move the transaction to fatal_error state + if err := as.moveTxToFatalError(tx.ID, tx.Error); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + return nil } @@ -264,6 +339,23 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prune } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + panic("invalid chain ID") + } + + // Persist to persistent storage + if err := ms.persistentTxStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID); err != nil { + return err + } + + // Update in memory store + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + as.reapConfirmedTxs(minBlockNumberToKeep, timeThreshold) + as.reapFatalErroredTxs(timeThreshold) + } + return nil } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { @@ -271,6 +363,35 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("DeleteInProgressAttempt: expected attempt state to be in_progress") + } + if attempt.ID == 0 { + return fmt.Errorf("DeleteInProgressAttempt: expected attempt to have an id") + } + + // Check if fromaddress enabled + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + for _, vas := range ms.addressStates { + if vas.hasTx(attempt.TxID) { + as = vas + break + } + } + if as == nil { + return fmt.Errorf("delete_in_progress_attempt: %w: %q", ErrTxnNotFound, attempt.TxID) + } + + // Persist to persistent storage + if err := ms.persistentTxStore.DeleteInProgressAttempt(ctx, attempt); err != nil { + return fmt.Errorf("delete_in_progress_attempt: %w", err) + } + + // Update in memory store + as.deleteTxAttempt(attempt.TxID, attempt.ID) + return nil } @@ -319,7 +440,31 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prelo return nil } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], broadcastAt time.Time) error { - return nil + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("expected state to be in_progress") + } + + var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + for _, vas := range ms.addressStates { + if vas.hasTx(attempt.TxID) { + as = vas + break + } + } + if as == nil { + return fmt.Errorf("save_confirmed_missing_receipt_attempt: %w: %q", ErrTxnNotFound, attempt.TxID) + } + + // Persist to persistent storage + if err := ms.persistentTxStore.SaveConfirmedMissingReceiptAttempt(ctx, timeout, attempt, broadcastAt); err != nil { + return err + } + + // Update in memory store + return as.moveTxAttemptToBroadcast(*attempt, broadcastAt) } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil @@ -331,7 +476,20 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveS return nil } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxForRebroadcast(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], etxAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { - return nil + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[etx.FromAddress] + if !ok { + return nil + } + + // Persist to persistent storage + if err := ms.persistentTxStore.UpdateTxForRebroadcast(ctx, etx, etxAttempt); err != nil { + return err + } + + // Update in memory store + return as.moveConfirmedToUnconfirmed(etxAttempt) } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) { return false, nil @@ -341,10 +499,137 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT return nil, nil } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllConfirmedMissingReceipt(ctx context.Context, chainID CHAIN_ID) error { - return nil + if ms.chainID.String() != chainID.String() { + panic("invalid chain ID") + } + + // Persist to persistent storage + if err := ms.persistentTxStore.MarkAllConfirmedMissingReceipt(ctx, chainID); err != nil { + return err + } + + // Update in memory store + var errs error + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + // Get the max confirmed sequence + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { return true } + states := []txmgrtypes.TxState{TxConfirmed} + txs := as.findTxs(states, filter) + var maxConfirmedSequence SEQ + for _, tx := range txs { + if tx.Sequence == nil { + continue + } + if (*tx.Sequence).Int64() > maxConfirmedSequence.Int64() { + maxConfirmedSequence = *tx.Sequence + } + } + + // Mark all unconfirmed txs with a sequence less than the max confirmed sequence as confirmed_missing_receipt + filter = func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.Sequence == nil { + return false + } + + return (*tx.Sequence).Int64() < maxConfirmedSequence.Int64() + } + states = []txmgrtypes.TxState{TxUnconfirmed} + txs = as.findTxs(states, filter) + for _, tx := range txs { + if err := as.moveUnconfirmedToConfirmedMissingReceipt(tx.ID); err != nil { + err = fmt.Errorf("mark_all_confirmed_missing_receipt: address: %s: %w", as.fromAddress, err) + errs = errors.Join(errs, err) + } + } + } + + return errs } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID CHAIN_ID) error { - return nil + if ms.chainID.String() != chainID.String() { + panic(fmt.Sprintf(ErrInvalidChainID.Error()+": %s", chainID.String())) + } + + // Persist to persistent storage + if err := ms.persistentTxStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, finalityDepth, chainID); err != nil { + return err + } + + // Update in memory store + type result struct { + ID int64 + Sequence SEQ + FromAddress ADDR + MaxBroadcastBeforeBlockNum int64 + TxHashes []TX_HASH + } + var results []result + cutoff := blockNum - int64(finalityDepth) + if cutoff <= 0 { + return nil + } + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if len(tx.TxAttempts) == 0 { + return false + } + var maxBroadcastBeforeBlockNum int64 + for i := 0; i < len(tx.TxAttempts); i++ { + txAttempt := tx.TxAttempts[i] + if txAttempt.BroadcastBeforeBlockNum == nil { + continue + } + if *txAttempt.BroadcastBeforeBlockNum > maxBroadcastBeforeBlockNum { + maxBroadcastBeforeBlockNum = *txAttempt.BroadcastBeforeBlockNum + } + } + return maxBroadcastBeforeBlockNum < cutoff + } + var errs error + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + states := []txmgrtypes.TxState{TxConfirmedMissingReceipt} + txs := as.findTxs(states, filter) + for _, tx := range txs { + if err := as.moveTxToFatalError(tx.ID, null.StringFrom(ErrCouldNotGetReceipt.Error())); err != nil { + err = fmt.Errorf("mark_old_txes_missing_receipt_as_errored: address: %s: %w", as.fromAddress, err) + errs = errors.Join(errs, err) + continue + } + hashes := make([]TX_HASH, len(tx.TxAttempts)) + maxBroadcastBeforeBlockNum := int64(0) + for i, attempt := range tx.TxAttempts { + hashes[i] = attempt.Hash + if attempt.BroadcastBeforeBlockNum != nil { + if *attempt.BroadcastBeforeBlockNum > maxBroadcastBeforeBlockNum { + maxBroadcastBeforeBlockNum = *attempt.BroadcastBeforeBlockNum + } + } + } + rr := result{ + ID: tx.ID, + Sequence: *tx.Sequence, + FromAddress: tx.FromAddress, + MaxBroadcastBeforeBlockNum: maxBroadcastBeforeBlockNum, + TxHashes: hashes, + } + results = append(results, rr) + } + } + + for _, r := range results { + ms.lggr.Criticalw(fmt.Sprintf("eth_tx with ID %v expired without ever getting a receipt for any of our attempts. "+ + "Current block height is %v, transaction was broadcast before block height %v. This transaction may not have not been sent and will be marked as fatally errored. "+ + "This can happen if there is another instance of chainlink running that is using the same private key, or if "+ + "an external wallet has been used to send a transaction from account %s with nonce %v."+ + " Please note that Chainlink requires exclusive ownership of it's private keys and sharing keys across multiple"+ + " chainlink instances, or using the chainlink keys with an external wallet is NOT SUPPORTED and WILL lead to missed transactions", + r.ID, blockNum, r.MaxBroadcastBeforeBlockNum, r.FromAddress, r.Sequence), "ethTxID", r.ID, "sequence", r.Sequence, "fromAddress", r.FromAddress, "txHashes", r.TxHashes) + } + + return errs } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deepCopyTx( diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index a102ee1c996..42667103e11 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,14 +1,671 @@ package txmgr_test import ( + "math/big" "testing" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v4" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + + evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_SaveConfirmedMissingReceiptAttempt(t *testing.T) { + t.Parallel() + + t.Run("updates attempt to 'broadcast' and transaction to 'confirm_missing_receipt'", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertUnconfirmedEthTxWithAttemptState(t, persistentStore, 1, fromAddress, txmgrtypes.TxAttemptInProgress) + now := time.Now() + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + defaultDuration := 5 * time.Second + err = inMemoryStore.SaveConfirmedMissingReceiptAttempt(ctx, defaultDuration, &inTx.TxAttempts[0], now) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + + assertTxEqual(t, expTx, actTx) + assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State) + assert.Equal(t, commontxmgr.TxConfirmedMissingReceipt, actTx.State) + }) +} + +func TestInMemoryStore_UpdateTxFatalError(t *testing.T) { + t.Parallel() + + t.Run("successfully update transaction", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 13, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + inTx.Error = null.StringFrom("no more toilet paper") + err = inMemoryStore.UpdateTxFatalError(ctx, &inTx) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + + assertTxEqual(t, expTx, actTx) + assert.Equal(t, commontxmgr.TxFatalError, actTx.State) + }) +} + +func TestInMemoryStore_SaveReplacementInProgressAttempt(t *testing.T) { + t.Parallel() + + t.Run("successfully replace tx attempt", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 123, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID) + err = inMemoryStore.SaveReplacementInProgressAttempt( + ctx, + oldAttempt, + &newAttempt, + ) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + assertTxEqual(t, expTx, actTx) + assert.Equal(t, txmgrtypes.TxAttemptInProgress, actTx.TxAttempts[0].State) + assert.Equal(t, newAttempt.Hash, actTx.TxAttempts[0].Hash) + assert.NotEqual(t, oldAttempt.ID, actTx.TxAttempts[0].ID) + }) + + t.Run("error parity for in-memory vs persistent store", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID) + + t.Run("error when old attempt is not in progress", func(t *testing.T) { + oldAttempt.State = txmgrtypes.TxAttemptBroadcast + expErr := persistentStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + actErr := inMemoryStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.State = txmgrtypes.TxAttemptInProgress + }) + + t.Run("error when new attempt is not in progress", func(t *testing.T) { + newAttempt.State = txmgrtypes.TxAttemptBroadcast + expErr := persistentStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + actErr := inMemoryStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + assert.Equal(t, expErr, actErr) + newAttempt.State = txmgrtypes.TxAttemptInProgress + }) + + t.Run("error when old attempt id is 0", func(t *testing.T) { + originalID := oldAttempt.ID + oldAttempt.ID = 0 + expErr := persistentStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + actErr := inMemoryStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.ID = originalID + }) + }) +} + +func TestInMemoryStore_DeleteInProgressAttempt(t *testing.T) { + t.Parallel() + + t.Run("successfully replace tx attempt", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 1, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + err = inMemoryStore.DeleteInProgressAttempt(ctx, oldAttempt) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + assert.Equal(t, 0, len(expTx.TxAttempts)) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + assertTxEqual(t, expTx, actTx) + }) + + t.Run("error parity for in-memory vs persistent store", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + t.Run("error when attempt is not in progress", func(t *testing.T) { + oldAttempt.State = txmgrtypes.TxAttemptBroadcast + expErr := persistentStore.DeleteInProgressAttempt(ctx, oldAttempt) + actErr := inMemoryStore.DeleteInProgressAttempt(ctx, oldAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.State = txmgrtypes.TxAttemptInProgress + }) + + t.Run("error when attempt has 0 id", func(t *testing.T) { + originalID := oldAttempt.ID + oldAttempt.ID = 0 + expErr := persistentStore.DeleteInProgressAttempt(ctx, oldAttempt) + actErr := inMemoryStore.DeleteInProgressAttempt(ctx, oldAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.ID = originalID + }) + }) +} + +func TestInMemoryStore_ReapTxHistory(t *testing.T) { + t.Parallel() + + t.Run("reap all confirmed txs", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx_0 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 7, 1, fromAddress) + r_0 := mustInsertEthReceipt(t, persistentStore, 1, utils.NewHash(), inTx_0.TxAttempts[0].Hash) + inTx_0.TxAttempts[0].Receipts = append(inTx_0.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_0)) + inTx_1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 8, 2, fromAddress) + r_1 := mustInsertEthReceipt(t, persistentStore, 2, utils.NewHash(), inTx_1.TxAttempts[0].Hash) + inTx_1.TxAttempts[0].Receipts = append(inTx_1.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_1)) + inTx_2 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 9, 3, fromAddress) + r_2 := mustInsertEthReceipt(t, persistentStore, 3, utils.NewHash(), inTx_2.TxAttempts[0].Hash) + inTx_2.TxAttempts[0].Receipts = append(inTx_2.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_2)) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_0)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_1)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_2)) + + minBlockNumberToKeep := int64(3) + timeThreshold := inTx_2.CreatedAt + expErr := persistentStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID) + actErr := inMemoryStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID) + require.NoError(t, expErr) + require.NoError(t, actErr) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + // Check that the transactions were reaped in persistent store + expTx_0, err := persistentStore.FindTxWithAttempts(ctx, inTx_0.ID) + require.Error(t, err) + require.Equal(t, int64(0), expTx_0.ID) + expTx_1, err := persistentStore.FindTxWithAttempts(ctx, inTx_1.ID) + require.Error(t, err) + require.Equal(t, int64(0), expTx_1.ID) + // Check that the transactions were reaped in in-memory store + actTxs_0 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_0.ID) + require.Equal(t, 0, len(actTxs_0)) + actTxs_1 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_1.ID) + require.Equal(t, 0, len(actTxs_1)) + + // Check that the transaction was not reaped + expTx_2, err := persistentStore.FindTxWithAttempts(ctx, inTx_2.ID) + require.NoError(t, err) + require.Equal(t, inTx_2.ID, expTx_2.ID) + actTxs_2 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_2.ID) + require.Equal(t, 1, len(actTxs_2)) + assertTxEqual(t, expTx_2, actTxs_2[0]) + }) + t.Run("reap all fatal error txs", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx_0 := cltest.NewEthTx(fromAddress) + inTx_0.Error = null.StringFrom("something exploded") + inTx_0.State = commontxmgr.TxFatalError + inTx_0.CreatedAt = time.Unix(1000, 0) + require.NoError(t, persistentStore.InsertTx(ctx, &inTx_0)) + inTx_1 := cltest.NewEthTx(fromAddress) + inTx_1.Error = null.StringFrom("something exploded") + inTx_1.State = commontxmgr.TxFatalError + inTx_1.CreatedAt = time.Unix(2000, 0) + require.NoError(t, persistentStore.InsertTx(ctx, &inTx_1)) + inTx_2 := cltest.NewEthTx(fromAddress) + inTx_2.Error = null.StringFrom("something exploded") + inTx_2.State = commontxmgr.TxFatalError + inTx_2.CreatedAt = time.Unix(3000, 0) + require.NoError(t, persistentStore.InsertTx(ctx, &inTx_2)) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_0)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_1)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_2)) + + minBlockNumberToKeep := int64(3) + timeThreshold := time.Unix(2500, 0) // Only reap txs created before this time + expErr := persistentStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID) + actErr := inMemoryStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID) + require.NoError(t, expErr) + require.NoError(t, actErr) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + // Check that the transactions were reaped in persistent store + expTx_0, err := persistentStore.FindTxWithAttempts(ctx, inTx_0.ID) + require.Error(t, err) + require.Equal(t, int64(0), expTx_0.ID) + expTx_1, err := persistentStore.FindTxWithAttempts(ctx, inTx_1.ID) + require.Error(t, err) + require.Equal(t, int64(0), expTx_1.ID) + // Check that the transactions were reaped in in-memory store + actTxs_0 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_0.ID) + require.Equal(t, 0, len(actTxs_0)) + actTxs_1 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_1.ID) + require.Equal(t, 0, len(actTxs_1)) + + // Check that the transaction was not reaped + expTx_2, err := persistentStore.FindTxWithAttempts(ctx, inTx_2.ID) + require.NoError(t, err) + require.Equal(t, inTx_2.ID, expTx_2.ID) + actTxs_2 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_2.ID) + require.Equal(t, 1, len(actTxs_2)) + assertTxEqual(t, expTx_2, actTxs_2[0]) + }) +} + +func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) { + t.Parallel() + blockNum := int64(10) + finalityDepth := uint32(2) + + t.Run("successfully mark errored transaction", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, persistentStore, 1, 7, time.Now(), fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + err = inMemoryStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, finalityDepth, chainID) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + + assertTxEqual(t, expTx, actTx) + assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State) + assert.Equal(t, commontxmgr.TxFatalError, actTx.State) + }) +} + +func TestInMemoryStore_UpdateTxForRebroadcast(t *testing.T) { + t.Parallel() + + t.Run("delete all receipts for transaction", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertConfirmedEthTxWithReceipt(t, persistentStore, fromAddress, 777, 1) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + txAttempt := inTx.TxAttempts[0] + err = inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + require.Len(t, expTx.TxAttempts, 1) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + + assertTxEqual(t, expTx, actTx) + assert.Equal(t, commontxmgr.TxUnconfirmed, actTx.State) + assert.Equal(t, txmgrtypes.TxAttemptInProgress, actTx.TxAttempts[0].State) + assert.Nil(t, actTx.TxAttempts[0].BroadcastBeforeBlockNum) + assert.Equal(t, 0, len(actTx.TxAttempts[0].Receipts)) + }) + + t.Run("error parity for in-memory vs persistent store", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertConfirmedEthTxWithReceipt(t, persistentStore, fromAddress, 777, 1) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + txAttempt := inTx.TxAttempts[0] + + t.Run("error when attempt is not in Broadcast state", func(t *testing.T) { + txAttempt.State = txmgrtypes.TxAttemptInProgress + expErr := persistentStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt) + actErr := inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt) + assert.Error(t, expErr) + assert.Error(t, actErr) + txAttempt.State = txmgrtypes.TxAttemptBroadcast + }) + t.Run("error when transaction is not in confirmed state", func(t *testing.T) { + inTx.State = commontxmgr.TxUnconfirmed + expErr := persistentStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt) + actErr := inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt) + assert.Error(t, expErr) + assert.Error(t, actErr) + inTx.State = commontxmgr.TxConfirmed + }) + t.Run("wrong fromAddress has no error", func(t *testing.T) { + inTx.FromAddress = common.Address{} + expErr := persistentStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt) + actErr := inMemoryStore.UpdateTxForRebroadcast(ctx, inTx, txAttempt) + assert.Equal(t, expErr, actErr) + assert.Nil(t, actErr) + inTx.FromAddress = fromAddress + }) + }) +} + +func TestInMemoryStore_MarkAllConfirmedMissingReceipt(t *testing.T) { + t.Parallel() + + t.Run("successfully mark all confirmed missing receipt", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // create transaction 0 that is unconfirmed (block 7) + // Insert a transaction into persistent store + blocknum := int64(7) + inTx_0 := cltest.MustInsertUnconfirmedEthTx(t, persistentStore, 0, fromAddress) + inTxAttempt_0 := newBroadcastLegacyEthTxAttempt(t, inTx_0.ID, int64(1)) + inTxAttempt_0.BroadcastBeforeBlockNum = &blocknum + require.NoError(t, persistentStore.InsertTxAttempt(ctx, &inTxAttempt_0)) + assert.Equal(t, commontxmgr.TxUnconfirmed, inTx_0.State) + // Insert the transaction into the in-memory store + inTx_0.TxAttempts = []evmtxmgr.TxAttempt{inTxAttempt_0} + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_0)) + + // create transaction 1 that is confirmed (block 77) + inTx_1 := mustInsertConfirmedEthTxBySaveFetchedReceipts(t, persistentStore, fromAddress, 1, 77, *chainID) + assert.Equal(t, commontxmgr.TxConfirmed, inTx_1.State) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_1)) + + // mark transaction 0 as confirmed missing receipt + err = inMemoryStore.MarkAllConfirmedMissingReceipt(ctx, chainID) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx_0.ID) + require.NoError(t, err) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_0.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + assert.Equal(t, commontxmgr.TxConfirmedMissingReceipt, actTx.State) + assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State) + assertTxEqual(t, expTx, actTx) + }) +} + // assertTxEqual asserts that two transactions are equal func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.ID, act.ID) @@ -20,7 +677,12 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.Value, act.Value) assert.Equal(t, exp.FeeLimit, act.FeeLimit) assert.Equal(t, exp.Error, act.Error) - assert.Equal(t, exp.BroadcastAt, act.BroadcastAt) + if exp.BroadcastAt != nil { + require.NotNil(t, act.BroadcastAt) + assert.Equal(t, exp.BroadcastAt.Unix(), act.BroadcastAt.Unix()) + } else { + assert.Equal(t, exp.BroadcastAt, act.BroadcastAt) + } assert.Equal(t, exp.InitialBroadcastAt, act.InitialBroadcastAt) assert.Equal(t, exp.CreatedAt, act.CreatedAt) assert.Equal(t, exp.State, act.State) @@ -33,7 +695,7 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.SignalCallback, act.SignalCallback) assert.Equal(t, exp.CallbackCompleted, act.CallbackCompleted) - require.Len(t, exp.TxAttempts, len(act.TxAttempts)) + require.Equal(t, len(exp.TxAttempts), len(act.TxAttempts)) for i := 0; i < len(exp.TxAttempts); i++ { assertTxAttemptEqual(t, exp.TxAttempts[i], act.TxAttempts[i]) }