Skip to content

Commit

Permalink
Merge pull request #12217 from smartcontractkit/jtw/step-3-03-update-…
Browse files Browse the repository at this point in the history
…tx-unstarted-to-in-progress

TXM In-memory: step 3-03-UpdateTxUnstartedToInProgress
  • Loading branch information
poopoothegorilla authored Apr 4, 2024
2 parents fa0ff89 + 698bbf8 commit a4b6104
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 4 deletions.
17 changes: 13 additions & 4 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx
return nil
}

// hasTx returns true if the transaction with the given ID is in the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) hasTx(txID int64) bool {
as.RLock()
defer as.RUnlock()

_, ok := as.allTxs[txID]
return ok
}

// findTxs returns all transactions that match the given filters.
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions are considered.
Expand Down Expand Up @@ -239,7 +248,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxT
// It returns an error if there is already a transaction in progress.
// It returns an error if there is no unstarted transaction to move to in_progress.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress(
txID int64, seq SEQ, broadcastAt time.Time, initialBroadcastAt time.Time,
txID int64, seq *SEQ, broadcastAt *time.Time, initialBroadcastAt *time.Time,
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
as.Lock()
Expand All @@ -255,9 +264,9 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUn
}
tx.State = TxInProgress
tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{txAttempt}
tx.Sequence = &seq
tx.BroadcastAt = &broadcastAt
tx.InitialBroadcastAt = &initialBroadcastAt
tx.Sequence = seq
tx.BroadcastAt = broadcastAt
tx.InitialBroadcastAt = initialBroadcastAt

as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt
as.inprogressTx = tx
Expand Down
30 changes: 30 additions & 0 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,36 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat
tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
if tx.Sequence == nil {
return fmt.Errorf("in_progress transaction must have nonce")
}
if tx.State != TxUnstarted {
return fmt.Errorf("update_tx_unstarted_to_in_progress: can only transition to in_progress from unstarted, transaction is currently %s", tx.State)
}
if attempt.State != txmgrtypes.TxAttemptInProgress {
return fmt.Errorf("attempt state must be in_progress")
}

ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[tx.FromAddress]
if !ok {
return nil
}
if !as.hasTx(tx.ID) {
return fmt.Errorf("update_tx_unstarted_to_in_progress: %w: %q", ErrTxnNotFound, tx.ID)
}

// Persist to persistent storage
if err := ms.persistentTxStore.UpdateTxUnstartedToInProgress(ctx, tx, attempt); err != nil {
return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err)
}

// Update in address state in memory
if err := as.moveUnstartedToInProgress(tx.ID, tx.Sequence, tx.BroadcastAt, tx.InitialBroadcastAt, *attempt); err != nil {
return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err)
}

return nil
}

Expand Down
127 changes: 127 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,133 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_UpdateTxUnstartedToInProgress(t *testing.T) {
t.Parallel()

t.Run("successfully updates unstarted tx to inprogress", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

nonce := evmtypes.Nonce(123)
// Insert a transaction into persistent store
inTx := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
inTx.Sequence = &nonce
inTxAttempt := cltest.NewLegacyEthTxAttempt(t, inTx.ID)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

// Update the transaction to in-progress
require.NoError(t, inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx, &inTxAttempt))

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)
assert.Equal(t, commontxmgr.TxInProgress, expTx.State)
assert.Equal(t, 1, len(expTx.TxAttempts))

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]
assertTxEqual(t, expTx, actTx)
assert.Equal(t, commontxmgr.TxInProgress, actTx.State)
})

t.Run("wrong input error scenarios", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

nonce1 := evmtypes.Nonce(1)
nonce2 := evmtypes.Nonce(2)
// Insert a transaction into persistent store
inTx1 := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
inTx2 := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
inTx1.Sequence = &nonce1
inTx2.Sequence = &nonce2
inTxAttempt1 := cltest.NewLegacyEthTxAttempt(t, inTx1.ID)
inTxAttempt2 := cltest.NewLegacyEthTxAttempt(t, inTx2.ID)
// Insert the transaction into the in-memory store
//inTx2 := cltest.NewEthTx(fromAddress)
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx1))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx2))

// sequence nil
inTx1.Sequence = nil
inTx2.Sequence = nil
expErr := persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1)
actErr := inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2)
assert.Equal(t, expErr, actErr)
assert.Error(t, actErr)
assert.Error(t, expErr)
inTx1.Sequence = &nonce1 // reset
inTx2.Sequence = &nonce2 // reset

// tx not in unstarted state
inTx1.State = commontxmgr.TxInProgress
inTx2.State = commontxmgr.TxInProgress
expErr = persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1)
actErr = inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2)
assert.Error(t, actErr)
assert.Error(t, expErr)
inTx1.State = commontxmgr.TxUnstarted // reset
inTx2.State = commontxmgr.TxUnstarted // reset

// tx attempt not in in-progress state
inTxAttempt1.State = txmgrtypes.TxAttemptBroadcast
inTxAttempt2.State = txmgrtypes.TxAttemptBroadcast
expErr = persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1)
actErr = inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2)
assert.Equal(t, expErr, actErr)
assert.Error(t, actErr)
assert.Error(t, expErr)
inTxAttempt1.State = txmgrtypes.TxAttemptInProgress // reset
inTxAttempt2.State = txmgrtypes.TxAttemptInProgress // reset

// wrong from address
inTx1.FromAddress = cltest.NewEIP55Address().Address()
inTx2.FromAddress = cltest.NewEIP55Address().Address()
expErr = persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1)
actErr = inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2)
assert.NoError(t, actErr)
assert.NoError(t, expErr)
inTx1.FromAddress = fromAddress // reset
inTx2.FromAddress = fromAddress // reset
})
}

func TestInMemoryStore_SaveFetchedReceipts(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit a4b6104

Please sign in to comment.