Skip to content

Commit

Permalink
implement tests for DeleteInProgressAttempt
Browse files Browse the repository at this point in the history
  • Loading branch information
poopoothegorilla committed Mar 13, 2024
1 parent 09e2eac commit 82521c3
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 17 deletions.
25 changes: 25 additions & 0 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,31 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete
as._deleteTxs(txs...)
}

// deleteTxAttempts removes the attempts with the given IDs from the address state.
// It removes the attempts from the hash lookup map and from the transaction.
// If an attempt is not found in the hash lookup map, it is ignored.
// If a transaction is not found in the allTxs map, it is ignored.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxAttempts(txAttempts ...txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
as.Lock()
defer as.Unlock()

for _, txAttempt := range txAttempts {
// remove the attempt from the hash lookup map
delete(as.attemptHashToTxAttempt, txAttempt.Hash)
// remove the attempt from the transaction
if tx := as.allTxs[txAttempt.TxID]; tx != nil {
var removeIndex int
for i := 0; i < len(tx.TxAttempts); i++ {
if tx.TxAttempts[i].ID == txAttempt.ID {
removeIndex = i
break
}
}
tx.TxAttempts = append(tx.TxAttempts[:removeIndex], tx.TxAttempts[removeIndex+1:]...)
}
}
}

// peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {

Check failure on line 273 in common/txmgr/address_state.go

View workflow job for this annotation

GitHub Actions / lint

func (*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]).peekNextUnstartedTx is unused (unused)
return nil, nil
Expand Down
34 changes: 17 additions & 17 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count

func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
if attempt.State != txmgrtypes.TxAttemptInProgress {
return fmt.Errorf("delete_in_progress_attempt: expected attempt to be in_progress")
return fmt.Errorf("DeleteInProgressAttempt: expected attempt state to be in_progress")
}
if attempt.ID == 0 {
return fmt.Errorf("delete_in_progress_attempt: expected attempt to have an ID")
Expand All @@ -276,9 +276,21 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Delet
// Check if fromaddress enabled
ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
as, ok := ms.addressStates[attempt.Tx.FromAddress]
if !ok {
return fmt.Errorf("delete_in_progress_attempt: %w", ErrAddressNotFound)
filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
return true
}
var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
var tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
for _, vas := range ms.addressStates {
txs := vas.findTxs(nil, filter, attempt.TxID)
if len(txs) == 1 {
tx = &txs[0]
as = vas
break
}
}
if tx == nil {
return fmt.Errorf("delete_in_progress_attempt: %w", ErrTxnNotFound)
}

// Persist to persistent storage
Expand All @@ -287,19 +299,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Delet
}

// Update in memory store
filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool {
if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 {
return false
}

for _, a := range tx.TxAttempts {
if a.ID == attempt.ID {
return true
}
}
return false
}
as.deleteTxs(as.findTxs(nil, filter)...)
as.deleteTxAttempts(attempt)

return nil
}
Expand Down
103 changes: 103 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,117 @@
package txmgr_test

import (
"context"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"

evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_DeleteInProgressAttempt(t *testing.T) {
t.Parallel()

t.Run("successfully replace tx attempt", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db, dbcfg)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := context.Background()

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 1, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

oldAttempt := inTx.TxAttempts[0]
err = inMemoryStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt)
require.NoError(t, err)

expTx, err := persistentStore.FindTxWithAttempts(inTx.ID)
require.NoError(t, err)
assert.Equal(t, 0, len(expTx.TxAttempts))

fn := func(tx *evmtxmgr.Tx) bool { return true }
actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID)
require.Equal(t, 1, len(actTxs))
actTx := actTxs[0]
assertTxEqual(t, expTx, actTx)
})

t.Run("error parity for in-memory vs persistent store", func(t *testing.T) {
db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db, dbcfg)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := context.Background()

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

// Insert a transaction into persistent store
inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress)
// Insert the transaction into the in-memory store
require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx))

oldAttempt := inTx.TxAttempts[0]
t.Run("error when attempt is not in progress", func(t *testing.T) {
oldAttempt.State = txmgrtypes.TxAttemptBroadcast
expErr := persistentStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt)
actErr := inMemoryStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt)
assert.Equal(t, expErr, actErr)
oldAttempt.State = txmgrtypes.TxAttemptInProgress
})

t.Run("error when attempt has 0 id", func(t *testing.T) {
originalID := oldAttempt.ID
oldAttempt.ID = 0
expErr := persistentStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt)
actErr := inMemoryStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt)
assert.Equal(t, expErr, actErr)
oldAttempt.ID = originalID
})
})
}

// assertTxEqual asserts that two transactions are equal
func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.ID, act.ID)
Expand Down

0 comments on commit 82521c3

Please sign in to comment.