Skip to content

Commit

Permalink
Merge branch 'jtw/step-3-04' into jtw/step-3-04-save-confirmed-missin…
Browse files Browse the repository at this point in the history
…g-receipt-attempt
  • Loading branch information
poopoothegorilla committed Apr 4, 2024
2 parents 7de69a4 + 09e6d61 commit 17fe719
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 2 deletions.
48 changes: 46 additions & 2 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete
as._deleteTxs(txIDs...)
}

// deleteAllTxAttempts removes all attempts for the given transactions.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteAllTxAttempts(txID int64) {
as.Lock()
defer as.Unlock()

as._deleteAllTxAttempts(txID)
}

// deleteTxAttempt removes the attempt with a given ID from the transaction with the given ID.
// It removes the attempts from the hash lookup map and from the transaction.
// If an attempt is not found in the hash lookup map, it is ignored.
Expand All @@ -328,14 +336,35 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete
}
}

// addTxAttempt adds the given attempt to the transaction which matches its TxID.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxAttempt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
as.Lock()
defer as.Unlock()

tx, ok := as.allTxs[txAttempt.TxID]
if !ok || tx == nil {
return fmt.Errorf("no transaction with ID %d", txAttempt.TxID)
}

// add the attempt to the transaction
if tx.TxAttempts == nil {
tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
}
tx.TxAttempts = append(tx.TxAttempts, txAttempt)
// add the attempt to the hash lookup map
as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt

return nil
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 361 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 366 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekInProgressTx is unused (unused)
return nil
}

// addTxToUnstarted adds the given transaction to the unstarted queue.
Expand Down Expand Up @@ -541,6 +570,21 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _findT
return txs
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteAllTxAttempts(
txID int64,
) {
tx, ok := as.allTxs[txID]
if !ok {
return
}

for i := 0; i < len(tx.TxAttempts); i++ {
txAttempt := tx.TxAttempts[i]
delete(as.attemptHashToTxAttempt, txAttempt.Hash)
}
tx.TxAttempts = nil
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txIDs ...int64) {

Check failure on line 588 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._deleteTxs is unused (unused)
for _, txID := range txIDs {
as._deleteTx(txID)
Expand Down
63 changes: 63 additions & 0 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,74 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR
oldAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
if oldAttempt.State != txmgrtypes.TxAttemptInProgress || replacementAttempt.State != txmgrtypes.TxAttemptInProgress {
return fmt.Errorf("expected attempts to be in_progress")
}
if oldAttempt.ID == 0 {
return fmt.Errorf("expected oldAttempt to have an ID")
}

ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
for _, vas := range ms.addressStates {
if vas.hasTx(oldAttempt.TxID) {
as = vas
break
}
}
if as == nil {
return fmt.Errorf("save_replacement_in_progress_attempt: %w: %q", ErrTxnNotFound, oldAttempt.TxID)
}

// Persist to persistent storage
if err := ms.persistentTxStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, replacementAttempt); err != nil {
return fmt.Errorf("save_replacement_in_progress_attempt: %w", err)
}

// Update in memory store
// delete the old attempt
as.deleteTxAttempt(oldAttempt.TxID, oldAttempt.ID)
// add the new attempt
if err := as.addTxAttempt(*replacementAttempt); err != nil {
return fmt.Errorf("save_replacement_in_progress_attempt: failed to add a replacement transaction attempt: %w", err)
}

return nil
}

// UpdateTxFatalError updates a transaction to fatal_error.
func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
if tx.State != TxInProgress && tx.State != TxUnstarted {
return fmt.Errorf("update_tx_fatal_error: can only transition to fatal_error from in_progress, transaction is currently %s", tx.State)
}
if !tx.Error.Valid {
return fmt.Errorf("update_tx_fatal_error: expected error field to be set")
}

ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[tx.FromAddress]
if !ok {
return fmt.Errorf("update_tx_fatal_error: %w", ErrAddressNotFound)
}
if !as.hasTx(tx.ID) {
return fmt.Errorf("update_tx_fatal_error: %w: %q", ErrTxnNotFound, tx.ID)
}

// Persist to persistent storage
if err := ms.persistentTxStore.UpdateTxFatalError(ctx, tx); err != nil {
return fmt.Errorf("update_tx_fatal_error: %w", err)
}

// Update in memory store
// remove all tx attempts for the transaction
as.deleteAllTxAttempts(tx.ID)
// move the transaction to fatal_error state
if err := as.moveTxToFatalError(tx.ID, tx.Error); err != nil {
return fmt.Errorf("update_tx_fatal_error: %w", err)
}

return nil
}

Expand Down
152 changes: 152 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,158 @@ func TestInMemoryStore_SaveConfirmedMissingReceiptAttempt(t *testing.T) {
})
}

func TestInMemoryStore_UpdateTxFatalError(t *testing.T) {
t.Parallel()

t.Run("successfully update transaction", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 13, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

inTx.Error = null.StringFrom("no more toilet paper")
err = inMemoryStore.UpdateTxFatalError(ctx, &inTx)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]

assertTxEqual(t, expTx, actTx)
assert.Equal(t, commontxmgr.TxFatalError, actTx.State)
})
}

func TestInMemoryStore_SaveReplacementInProgressAttempt(t *testing.T) {
t.Parallel()

t.Run("successfully replace tx attempt", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 123, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

oldAttempt := inTx.TxAttempts[0]
newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID)
err = inMemoryStore.SaveReplacementInProgressAttempt(
ctx,
oldAttempt,
&newAttempt,
)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)
fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]
assertTxEqual(t, expTx, actTx)
assert.Equal(t, txmgrtypes.TxAttemptInProgress, actTx.TxAttempts[0].State)
assert.Equal(t, newAttempt.Hash, actTx.TxAttempts[0].Hash)
assert.NotEqual(t, oldAttempt.ID, actTx.TxAttempts[0].ID)
})

t.Run("error parity for in-memory vs persistent store", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

oldAttempt := inTx.TxAttempts[0]
newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID)

t.Run("error when old attempt is not in progress", func(t *testing.T) {
oldAttempt.State = txmgrtypes.TxAttemptBroadcast
expErr := persistentStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt)
actErr := inMemoryStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt)
assert.Equal(t, expErr, actErr)
oldAttempt.State = txmgrtypes.TxAttemptInProgress
})

t.Run("error when new attempt is not in progress", func(t *testing.T) {
newAttempt.State = txmgrtypes.TxAttemptBroadcast
expErr := persistentStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt)
actErr := inMemoryStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt)
assert.Equal(t, expErr, actErr)
newAttempt.State = txmgrtypes.TxAttemptInProgress
})

t.Run("error when old attempt id is 0", func(t *testing.T) {
originalID := oldAttempt.ID
oldAttempt.ID = 0
expErr := persistentStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt)
actErr := inMemoryStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt)
assert.Equal(t, expErr, actErr)
oldAttempt.ID = originalID
})
})
}

func TestInMemoryStore_DeleteInProgressAttempt(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 17fe719

Please sign in to comment.