From 6a1477b83da7d90fbc43204cfdfed7939845bd5a Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 27 Feb 2024 15:45:32 -0500 Subject: [PATCH 1/5] implement Abandon --- common/txmgr/inmemory_store.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index cd783a25210..23e1c2b6175 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -187,6 +187,24 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close // Abandon removes all transactions for a given address func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Abandon(ctx context.Context, chainID CHAIN_ID, addr ADDR) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("abandon: %w", ErrInvalidChainID) + } + + // Mark all persisted transactions as abandoned + if err := ms.txStore.Abandon(ctx, chainID, addr); err != nil { + return err + } + + // check that the address exists in the unstarted transactions + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[addr] + if !ok { + return fmt.Errorf("abandon: %w", ErrAddressNotFound) + } + as.Abandon() + return nil } From 9cfa388933a7f1f80b97da7ad4f27484836383b0 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 27 Feb 2024 15:45:59 -0500 Subject: [PATCH 2/5] implement Close --- common/txmgr/inmemory_store.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 23e1c2b6175..754ba1706f1 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -183,6 +183,16 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat // Close closes the InMemoryStore func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { + // Close the event recorder + ms.txStore.Close() + + // Clear all address states + ms.addressStatesLock.Lock() + for _, as := range ms.addressStates { + as.Close() + } + clear(ms.addressStates) + ms.addressStatesLock.Unlock() } // Abandon removes all transactions for a given address From a864b27dfeb55c4208e8513953e7bfe3655f093a Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 11 Mar 2024 22:42:50 -0400 Subject: [PATCH 3/5] add tests for abandon --- .../evm/txmgr/evm_inmemory_store_test.go | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index a102ee1c996..a8d1c2f4745 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,14 +1,82 @@ package txmgr_test import ( + "context" + "math/big" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" + + evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_Abandon(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + t.Run("saves new in_progress attempt if attempt is new", func(t *testing.T) { + nTxs := 3 + // Insert a transaction into persistent store + for i := 0; i < nTxs; i++ { + inTx := cltest.NewEthTx(fromAddress) + // insert the transaction into the persistent store + require.NoError(t, persistentStore.InsertTx(&inTx)) + // insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + } + + actErr := inMemoryStore.Abandon(testutils.Context(t), chainID, fromAddress) + expErr := persistentStore.Abandon(testutils.Context(t), chainID, fromAddress) + require.NoError(t, actErr) + require.NoError(t, expErr) + + expTxs, err := persistentStore.FindTxesByFromAddressAndState(ctx, fromAddress, "fatal_error") + require.NoError(t, err) + require.NotNil(t, expTxs) + require.Equal(t, nTxs, len(expTxs)) + + // Check that the in-memory store has the new attempt + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn) + require.NotNil(t, actTxs) + require.Equal(t, nTxs, len(actTxs)) + + for i := 0; i < nTxs; i++ { + assertTxEqual(t, *expTxs[i], actTxs[i]) + } + }) +} + // assertTxEqual asserts that two transactions are equal func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.ID, act.ID) From 94b9c6bf1f6df79adc3ffbb3038c6c3814bc8dfa Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 21 Mar 2024 00:03:55 -0400 Subject: [PATCH 4/5] address comments --- common/txmgr/inmemory_store.go | 5 +++-- core/chains/evm/txmgr/evm_inmemory_store_test.go | 9 ++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 88905f463b0..3272780517d 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -205,7 +205,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close // Abandon removes all transactions for a given address func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Abandon(ctx context.Context, chainID CHAIN_ID, addr ADDR) error { if ms.chainID.String() != chainID.String() { - return fmt.Errorf("abandon: %w", ErrInvalidChainID) + panic("invalid chain ID") } // Mark all persisted transactions as abandoned @@ -218,7 +218,8 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Aband defer ms.addressStatesLock.RUnlock() as, ok := ms.addressStates[addr] if !ok { - return fmt.Errorf("abandon: %w", ErrAddressNotFound) + as = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](ms.lggr, chainID, addr, ms.maxUnstarted, nil) + ms.addressStates[addr] = as } as.abandon() diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index a8d1c2f4745..c36da4d3929 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -26,7 +26,7 @@ func TestInMemoryStore_Abandon(t *testing.T) { db := pgtest.NewSqlxDB(t) _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) - persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + persistentStore := cltest.NewTestTxStore(t, db) kst := cltest.NewKeyStore(t, db, dbcfg) _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) @@ -44,13 +44,12 @@ func TestInMemoryStore_Abandon(t *testing.T) { ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) require.NoError(t, err) - t.Run("saves new in_progress attempt if attempt is new", func(t *testing.T) { + t.Run("Abandon transactions successfully", func(t *testing.T) { nTxs := 3 - // Insert a transaction into persistent store for i := 0; i < nTxs; i++ { inTx := cltest.NewEthTx(fromAddress) // insert the transaction into the persistent store - require.NoError(t, persistentStore.InsertTx(&inTx)) + require.NoError(t, persistentStore.InsertTx(ctx, &inTx)) // insert the transaction into the in-memory store require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) } @@ -65,7 +64,7 @@ func TestInMemoryStore_Abandon(t *testing.T) { require.NotNil(t, expTxs) require.Equal(t, nTxs, len(expTxs)) - // Check that the in-memory store has the new attempt + // Check the in-memory store fn := func(tx *evmtxmgr.Tx) bool { return true } actTxs := inMemoryStore.XXXTestFindTxs(nil, fn) require.NotNil(t, actTxs) From 23dbf19d3d4227476fb38bd0f523b637b370c8f9 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 21 Mar 2024 00:06:31 -0400 Subject: [PATCH 5/5] cleanup --- core/chains/evm/txmgr/evm_inmemory_store_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index c36da4d3929..7bed8d66efe 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,7 +1,6 @@ package txmgr_test import ( - "context" "math/big" "testing" @@ -33,7 +32,7 @@ func TestInMemoryStore_Abandon(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) lggr := logger.TestSugared(t) chainID := ethClient.ConfiguredChainID() - ctx := context.Background() + ctx := testutils.Context(t) inMemoryStore, err := commontxmgr.NewInMemoryStore[ *big.Int, @@ -54,8 +53,8 @@ func TestInMemoryStore_Abandon(t *testing.T) { require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) } - actErr := inMemoryStore.Abandon(testutils.Context(t), chainID, fromAddress) - expErr := persistentStore.Abandon(testutils.Context(t), chainID, fromAddress) + actErr := inMemoryStore.Abandon(ctx, chainID, fromAddress) + expErr := persistentStore.Abandon(ctx, chainID, fromAddress) require.NoError(t, actErr) require.NoError(t, expErr)