Skip to content

Commit

Permalink
Merge pull request #12226 from smartcontractkit/jtw/step-3-04-update-…
Browse files Browse the repository at this point in the history
…tx-fatal-error

TXM In-memory: step 3-04-UpdateTxFatalError
  • Loading branch information
poopoothegorilla authored Apr 4, 2024
2 parents 493ffed + 33b085b commit 09e6d61
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
23 changes: 23 additions & 0 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete
as._deleteTxs(txIDs...)
}

// deleteAllTxAttempts removes all attempts for the given transactions.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteAllTxAttempts(txID int64) {
as.Lock()
defer as.Unlock()

as._deleteAllTxAttempts(txID)
}

// deleteTxAttempt removes the attempt with a given ID from the transaction with the given ID.
// It removes the attempts from the hash lookup map and from the transaction.
// If an attempt is not found in the hash lookup map, it is ignored.
Expand Down Expand Up @@ -558,6 +566,21 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _findT
return txs
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteAllTxAttempts(
txID int64,
) {
tx, ok := as.allTxs[txID]
if !ok {
return
}

for i := 0; i < len(tx.TxAttempts); i++ {
txAttempt := tx.TxAttempts[i]
delete(as.attemptHashToTxAttempt, txAttempt.Hash)
}
tx.TxAttempts = nil
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txIDs ...int64) {

Check failure on line 584 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._deleteTxs is unused (unused)
for _, txID := range txIDs {
as._deleteTx(txID)
Expand Down
30 changes: 30 additions & 0 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,36 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR

// UpdateTxFatalError updates a transaction to fatal_error.
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
if tx.State != TxInProgress && tx.State != TxUnstarted {
return fmt.Errorf("update_tx_fatal_error: can only transition to fatal_error from in_progress, transaction is currently %s", tx.State)
}
if !tx.Error.Valid {
return fmt.Errorf("update_tx_fatal_error: expected error field to be set")
}

ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[tx.FromAddress]
if !ok {
return fmt.Errorf("update_tx_fatal_error: %w", ErrAddressNotFound)
}
if !as.hasTx(tx.ID) {
return fmt.Errorf("update_tx_fatal_error: %w: %q", ErrTxnNotFound, tx.ID)
}

// Persist to persistent storage
if err := ms.persistentTxStore.UpdateTxFatalError(ctx, tx); err != nil {
return fmt.Errorf("update_tx_fatal_error: %w", err)
}

// Update in memory store
// remove all tx attempts for the transaction
as.deleteAllTxAttempts(tx.ID)
// move the transaction to fatal_error state
if err := as.moveTxToFatalError(tx.ID, tx.Error); err != nil {
return fmt.Errorf("update_tx_fatal_error: %w", err)
}

return nil
}

Expand Down
46 changes: 46 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,52 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_UpdateTxFatalError(t *testing.T) {
t.Parallel()

t.Run("successfully update transaction", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 13, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

inTx.Error = null.StringFrom("no more toilet paper")
err = inMemoryStore.UpdateTxFatalError(ctx, &inTx)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]

assertTxEqual(t, expTx, actTx)
assert.Equal(t, commontxmgr.TxFatalError, actTx.State)
})
}

func TestInMemoryStore_SaveReplacementInProgressAttempt(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 09e6d61

Please sign in to comment.