Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: step 3-04-DeleteInProgressAttempt #12235

Merged
35 changes: 35 additions & 0 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@
}

// countTransactionsByState returns the number of transactions that are in the given state
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) countTransactionsByState(txState txmgrtypes.TxState) int {

Check failure on line 132 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).countTransactionsByState is unused (unused)
return 0
}

// findTxWithIdempotencyKey returns the transaction with the given idempotency key. If no transaction is found, nil is returned.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {

Check failure on line 137 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxWithIdempotencyKey is unused (unused)
return nil
}

Expand All @@ -143,7 +143,7 @@
// If no txIDs are provided, all transactions in the given states are considered.
// If no txStates are provided, all transactions are considered.
// Any transaction states that are unknown will cause a panic.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) applyToTxsByState(

Check failure on line 146 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).applyToTxsByState is unused (unused)
txStates []txmgrtypes.TxState,
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -189,7 +189,7 @@
// If no txIDs are provided, all transactions are considered.
// If no txStates are provided, all transactions are considered.
// The txFilter is applied to the transactions and the txAttemptFilter is applied to the attempts.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxAttempts(

Check failure on line 192 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).findTxAttempts is unused (unused)
txStates []txmgrtypes.TxState,
txFilter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
txAttemptFilter func(*txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool,
Expand All @@ -198,6 +198,15 @@
return nil
}

// hasTx returns true if the transaction with the given ID is in the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) hasTx(txID int64) bool {
as.RLock()
defer as.RUnlock()

_, ok := as.allTxs[txID]
return ok
}

// findTxs returns all transactions that match the given filters.
// If txIDs are provided, only the transactions with those IDs are considered.
// If no txIDs are provided, all transactions are considered.
Expand Down Expand Up @@ -244,7 +253,7 @@
}

// pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) {

Check failure on line 256 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).pruneUnstartedTxQueue is unused (unused)
}

// reapConfirmedTxs removes confirmed transactions that are older than the given time threshold and have receipts older than the given block number threshold.
Expand Down Expand Up @@ -286,20 +295,46 @@
}

// deleteTxs removes the transactions with the given IDs from the address state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txIDs ...int64) {

Check failure on line 298 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).deleteTxs is unused (unused)
as.Lock()
defer as.Unlock()

as._deleteTxs(txIDs...)
}

// deleteTxAttempt removes the attempt with a given ID from the transaction with the given ID.
// It removes the attempts from the hash lookup map and from the transaction.
// If an attempt is not found in the hash lookup map, it is ignored.
// If a transaction is not found in the allTxs map, it is ignored.
// No error is returned if the transaction or attempt is not found.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxAttempt(txID, txAttemptID int64) {
as.Lock()
defer as.Unlock()

tx, ok := as.allTxs[txID]
if !ok || tx == nil {
return
}

for i := 0; i < len(tx.TxAttempts); i++ {
txAttempt := tx.TxAttempts[i]
if txAttempt.ID == txAttemptID {
// remove the attempt from the hash lookup map
delete(as.attemptHashToTxAttempt, txAttempt.Hash)
// remove the attempt from the transaction
tx.TxAttempts = append(tx.TxAttempts[:i], tx.TxAttempts[i+1:]...)
break
}
}
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 332 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
}

// peekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 337 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekInProgressTx is unused (unused)
return nil, nil
}

Expand Down Expand Up @@ -453,7 +488,7 @@
return nil
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _applyToTxs(

Check failure on line 491 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._applyToTxs is unused (unused)
txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
fn func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]),
txIDs ...int64,
Expand Down Expand Up @@ -502,7 +537,7 @@
return txs
}

func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txIDs ...int64) {

Check failure on line 540 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE])._deleteTxs is unused (unused)
for _, txID := range txIDs {
as._deleteTx(txID)
}
Expand Down
29 changes: 29 additions & 0 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,35 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count
}

func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
if attempt.State != txmgrtypes.TxAttemptInProgress {
return fmt.Errorf("DeleteInProgressAttempt: expected attempt state to be in_progress")
}
if attempt.ID == 0 {
return fmt.Errorf("DeleteInProgressAttempt: expected attempt to have an id")
}

// Check if fromaddress enabled
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
for _, vas := range ms.addressStates {
amit-momin marked this conversation as resolved.
Show resolved Hide resolved
if vas.hasTx(attempt.TxID) {
as = vas
break
}
}
if as == nil {
return fmt.Errorf("delete_in_progress_attempt: %w: %q", ErrTxnNotFound, attempt.TxID)
}

// Persist to persistent storage
if err := ms.persistentTxStore.DeleteInProgressAttempt(ctx, attempt); err != nil {
return fmt.Errorf("delete_in_progress_attempt: %w", err)
}

// Update in memory store
as.deleteTxAttempt(attempt.TxID, attempt.ID)

return nil
}

Expand Down
90 changes: 90 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,96 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_DeleteInProgressAttempt(t *testing.T) {
t.Parallel()

t.Run("successfully replace tx attempt", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 1, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

oldAttempt := inTx.TxAttempts[0]
err = inMemoryStore.DeleteInProgressAttempt(ctx, oldAttempt)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID)
require.NoError(t, err)
assert.Equal(t, 0, len(expTx.TxAttempts))

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]
assertTxEqual(t, expTx, actTx)
})

t.Run("error parity for in-memory vs persistent store", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := testutils.Context(t)

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

oldAttempt := inTx.TxAttempts[0]
t.Run("error when attempt is not in progress", func(t *testing.T) {
oldAttempt.State = txmgrtypes.TxAttemptBroadcast
expErr := persistentStore.DeleteInProgressAttempt(ctx, oldAttempt)
actErr := inMemoryStore.DeleteInProgressAttempt(ctx, oldAttempt)
assert.Equal(t, expErr, actErr)
oldAttempt.State = txmgrtypes.TxAttemptInProgress
})

t.Run("error when attempt has 0 id", func(t *testing.T) {
originalID := oldAttempt.ID
oldAttempt.ID = 0
expErr := persistentStore.DeleteInProgressAttempt(ctx, oldAttempt)
actErr := inMemoryStore.DeleteInProgressAttempt(ctx, oldAttempt)
assert.Equal(t, expErr, actErr)
oldAttempt.ID = originalID
})
})
}

func TestInMemoryStore_ReapTxHistory(t *testing.T) {
t.Parallel()

Expand Down
Loading