Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: step 3-03-UpdateTxUnstartedToInProgress #12217

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@
}

// countTransactionsByState returns the number of transactions that are in the given state
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {

Check failure on line 132 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 137 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)
return nil
}

Expand All @@ -142,7 +142,7 @@
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions in the given states are considered.
// If no txStates are provided, all transactions are considered.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(

Check failure on line 145 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand All @@ -154,7 +154,7 @@
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(

Check failure on line 157 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
Expand All @@ -163,6 +163,15 @@
return nil
}

// hasTx returns true if the transaction with the given ID is in the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) hasTx(txID int64) bool {
as.RLock()
defer as.RUnlock()

_, ok := as.allTxs[txID]
return ok
}

// findTxs returns all transactions that match the given filters.
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions are considered.
Expand Down Expand Up @@ -212,25 +221,25 @@
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {

Check failure on line 224 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).pruneUnstartedTxQueue is unused (unused)
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {

Check failure on line 228 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 232 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 237 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekInProgressTx is unused (unused)
return nil, nil
}

// addTxToUnstarted adds the given transaction to the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {

Check failure on line 242 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).addTxToUnstarted is unused (unused)
return nil
}

Expand All @@ -239,7 +248,7 @@
// It returns an error if there is already a transaction in progress.
// It returns an error if there is no unstarted transaction to move to in_progress.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress(
txID int64, seq SEQ, broadcastAt time.Time, initialBroadcastAt time.Time,
txID int64, seq *SEQ, broadcastAt *time.Time, initialBroadcastAt *time.Time,
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
as.Lock()
Expand All @@ -255,9 +264,9 @@
}
tx.State = TxInProgress
tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{txAttempt}
tx.Sequence = &seq
tx.BroadcastAt = &broadcastAt
tx.InitialBroadcastAt = &initialBroadcastAt
tx.Sequence = seq
tx.BroadcastAt = broadcastAt
tx.InitialBroadcastAt = initialBroadcastAt

as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt
as.inprogressTx = tx
Expand Down
30 changes: 30 additions & 0 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,36 @@
tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
if tx.Sequence == nil {
return fmt.Errorf("in_progress transaction must have nonce")
}
if tx.State != TxUnstarted {
return fmt.Errorf("update_tx_unstarted_to_in_progress: can only transition to in_progress from unstarted, transaction is currently %s", tx.State)
}
if attempt.State != txmgrtypes.TxAttemptInProgress {
return fmt.Errorf("attempt state must be in_progress")
}

ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[tx.FromAddress]
if !ok {
return nil
}
if !as.hasTx(tx.ID) {
return fmt.Errorf("update_tx_unstarted_to_in_progress: %w: %q", ErrTxnNotFound, tx.ID)
}

// Persist to persistent storage
if err := ms.persistentTxStore.UpdateTxUnstartedToInProgress(ctx, tx, attempt); err != nil {
return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err)
}

// Update in address state in memory
if err := as.moveUnstartedToInProgress(tx.ID, tx.Sequence, tx.BroadcastAt, tx.InitialBroadcastAt, *attempt); err != nil {
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err)
}

return nil
}

Expand Down Expand Up @@ -318,7 +348,7 @@
receiptErrs = errors.Join(receiptErrs, err)
}
if !found {
receiptErrs = fmt.Errorf("save_fetched_receipts: errors occured while moving tx to confirmed: %w", receiptErrs)

Check failure on line 351 in common/txmgr/inmemory_store.go

View workflow job for this annotation

GitHub Actions / lint

`occured` is a misspelling of `occurred` (misspell)
errs = errors.Join(errs, receiptErrs)
}
}
Expand Down
127 changes: 127 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,133 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_UpdateTxUnstartedToInProgress(t *testing.T) {
t.Parallel()

t.Run("successfully updates unstarted tx to inprogress", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

nonce := evmtypes.Nonce(123)
// Insert a transaction into persistent store
inTx := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
inTx.Sequence = &nonce
inTxAttempt := cltest.NewLegacyEthTxAttempt(t, inTx.ID)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

// Update the transaction to in-progress
require.NoError(t, inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx, &inTxAttempt))

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)
assert.Equal(t, commontxmgr.TxInProgress, expTx.State)
assert.Equal(t, 1, len(expTx.TxAttempts))

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]
assertTxEqual(t, expTx, actTx)
assert.Equal(t, commontxmgr.TxInProgress, actTx.State)
})

t.Run("wrong input error scenarios", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

nonce1 := evmtypes.Nonce(1)
nonce2 := evmtypes.Nonce(2)
// Insert a transaction into persistent store
inTx1 := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
inTx2 := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID)
inTx1.Sequence = &nonce1
inTx2.Sequence = &nonce2
inTxAttempt1 := cltest.NewLegacyEthTxAttempt(t, inTx1.ID)
inTxAttempt2 := cltest.NewLegacyEthTxAttempt(t, inTx2.ID)
// Insert the transaction into the in-memory store
//inTx2 := cltest.NewEthTx(fromAddress)
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx1))
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx2))

// sequence nil
inTx1.Sequence = nil
inTx2.Sequence = nil
expErr := persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1)
actErr := inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2)
assert.Equal(t, expErr, actErr)
assert.Error(t, actErr)
assert.Error(t, expErr)
inTx1.Sequence = &nonce1 // reset
inTx2.Sequence = &nonce2 // reset

// tx not in unstarted state
inTx1.State = commontxmgr.TxInProgress
inTx2.State = commontxmgr.TxInProgress
expErr = persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1)
actErr = inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2)
assert.Error(t, actErr)
assert.Error(t, expErr)
inTx1.State = commontxmgr.TxUnstarted // reset
inTx2.State = commontxmgr.TxUnstarted // reset

// tx attempt not in in-progress state
inTxAttempt1.State = txmgrtypes.TxAttemptBroadcast
inTxAttempt2.State = txmgrtypes.TxAttemptBroadcast
expErr = persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1)
actErr = inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2)
assert.Equal(t, expErr, actErr)
assert.Error(t, actErr)
assert.Error(t, expErr)
inTxAttempt1.State = txmgrtypes.TxAttemptInProgress // reset
inTxAttempt2.State = txmgrtypes.TxAttemptInProgress // reset

// wrong from address
inTx1.FromAddress = cltest.NewEIP55Address().Address()
inTx2.FromAddress = cltest.NewEIP55Address().Address()
expErr = persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1)
actErr = inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2)
assert.NoError(t, actErr)
assert.NoError(t, expErr)
inTx1.FromAddress = fromAddress // reset
inTx2.FromAddress = fromAddress // reset
})
}

func TestInMemoryStore_SaveFetchedReceipts(t *testing.T) {
t.Parallel()

Expand Down
Loading