diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index bd4e9a2f3a6..3272780517d 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "sync" "time" "github.com/google/uuid" @@ -47,7 +48,8 @@ type inMemoryStore[ keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + addressStatesLock sync.RWMutex + addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] } // NewInMemoryStore returns a new inMemoryStore @@ -188,10 +190,39 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat // Close closes the inMemoryStore func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { + // Close the event recorder + ms.persistentTxStore.Close() + + // Clear all address states + ms.addressStatesLock.Lock() + for _, as := range ms.addressStates { + as.close() + } + clear(ms.addressStates) + ms.addressStatesLock.Unlock() } // Abandon removes all transactions for a given address func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Abandon(ctx context.Context, chainID CHAIN_ID, addr ADDR) error { + if ms.chainID.String() != chainID.String() { + panic("invalid chain ID") + } + + // Mark all persisted transactions as abandoned + if err := ms.persistentTxStore.Abandon(ctx, chainID, addr); err != nil { + return err + } + + // check that the address exists in the unstarted transactions + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[addr] + if !ok { + as = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](ms.lggr, chainID, addr, ms.maxUnstarted, nil) + ms.addressStates[addr] = as + } + as.abandon() + return nil } diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index a102ee1c996..7bed8d66efe 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,14 +1,80 @@ package txmgr_test import ( + "math/big" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" + + evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_Abandon(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + t.Run("Abandon transactions successfully", func(t *testing.T) { + nTxs := 3 + for i := 0; i < nTxs; i++ { + inTx := cltest.NewEthTx(fromAddress) + // insert the transaction into the persistent store + require.NoError(t, persistentStore.InsertTx(ctx, &inTx)) + // insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + } + + actErr := inMemoryStore.Abandon(ctx, chainID, fromAddress) + expErr := persistentStore.Abandon(ctx, chainID, fromAddress) + require.NoError(t, actErr) + require.NoError(t, expErr) + + expTxs, err := persistentStore.FindTxesByFromAddressAndState(ctx, fromAddress, "fatal_error") + require.NoError(t, err) + require.NotNil(t, expTxs) + require.Equal(t, nTxs, len(expTxs)) + + // Check the in-memory store + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn) + require.NotNil(t, actTxs) + require.Equal(t, nTxs, len(actTxs)) + + for i := 0; i < nTxs; i++ { + assertTxEqual(t, *expTxs[i], actTxs[i]) + } + }) +} + // assertTxEqual asserts that two transactions are equal func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.ID, act.ID)