diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 730809e8dda..8c0e1ce06a9 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -70,6 +70,11 @@ type TestEvmTxStore interface { InsertTxAttempt(attempt *TxAttempt) error LoadTxesAttempts(etxs []*Tx, qopts ...pg.QOpt) error GetFatalTransactions(ctx context.Context) (txes []*Tx, err error) + GetAllTxes(ctx context.Context) (txes []*Tx, err error) + GetAllTxAttempts(ctx context.Context) (attempts []TxAttempt, err error) + CountTxesByStateAndSubject(ctx context.Context, state txmgrtypes.TxState, subject uuid.UUID) (count int, err error) + FindTxesByFromAddressAndState(ctx context.Context, fromAddress common.Address, state string) (txes []*Tx, err error) + UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error } type evmTxStore struct { @@ -2010,6 +2015,66 @@ func (o *evmTxStore) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Co return txes, pkgerrors.Wrap(err, "FindTxesWithAttemptsAndReceiptsByIdsAndState failed") } +// For testing only, get all txes in the DB +func (o *evmTxStore) GetAllTxes(ctx context.Context) (txes []*Tx, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + var dbEtxs []DbEthTx + sql := "SELECT * FROM evm.txes" + err = qq.Select(&dbEtxs, sql) + txes = make([]*Tx, len(dbEtxs)) + dbEthTxsToEvmEthTxPtrs(dbEtxs, txes) + return txes, err +} + +// For testing only, get all tx attempts in the DB +func (o *evmTxStore) GetAllTxAttempts(ctx context.Context) (attempts []TxAttempt, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + var dbAttempts []DbEthTxAttempt + sql := "SELECT * FROM evm.tx_attempts" + err = qq.Select(&dbAttempts, sql) + attempts = dbEthTxAttemptsToEthTxAttempts(dbAttempts) + return attempts, err +} + +func (o *evmTxStore) CountTxesByStateAndSubject(ctx context.Context, state txmgrtypes.TxState, subject uuid.UUID) (count int, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + sql := "SELECT COUNT(*) FROM evm.txes WHERE state = $1 AND subject = $2" + err = qq.Get(&count, sql, state, subject) + return count, err +} + +func (o *evmTxStore) FindTxesByFromAddressAndState(ctx context.Context, fromAddress common.Address, state string) (txes []*Tx, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + sql := "SELECT * FROM evm.txes WHERE from_address = $1 AND state = $2" + var dbEtxs []DbEthTx + err = qq.Select(&dbEtxs, sql, fromAddress, state) + txes = make([]*Tx, len(dbEtxs)) + dbEthTxsToEvmEthTxPtrs(dbEtxs, txes) + return txes, err +} + +func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + sql := "UPDATE evm.tx_attempts SET broadcast_before_block_num = $1 WHERE eth_tx_id = $2" + _, err := qq.Exec(sql, blockNum, id) + return err +} + // Returns a context that contains the values of the provided context, // and which is canceled when either the provided contextg or TxStore parent context is canceled. func (o *evmTxStore) mergeContexts(ctx context.Context) (context.Context, context.CancelFunc) { diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 0b4287b6f6c..6c19026c1da 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1811,7 +1811,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) { db := pgtest.NewSqlxDB(t) cfg := newTestChainScopedConfig(t) - txStore := newTxStore(t, db, cfg.Database()) + txStore := txmgr.NewTxStore(db, logger.Test(t), cfg.Database()) ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() evmtest.NewEthClientMockWithDefaultChain(t) _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) @@ -1822,7 +1822,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) { for i := 0; i < 5; i++ { mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1)) } - testutils.AssertCountPerSubject(t, db, int64(5), subject1) + AssertCountPerSubject(t, txStore, int64(5), subject1) }) t.Run("prunes if queue has exceeded capacity", func(t *testing.T) { @@ -1831,6 +1831,13 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) { for i := 0; i < 5; i++ { mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2)) } - testutils.AssertCountPerSubject(t, db, int64(3), subject2) + AssertCountPerSubject(t, txStore, int64(3), subject2) }) } + +func AssertCountPerSubject(t *testing.T, txStore txmgr.TestEvmTxStore, expected int64, subject uuid.UUID) { + t.Helper() + count, err := txStore.CountTxesByStateAndSubject(testutils.Context(t), "unstarted", subject) + require.NoError(t, err) + require.Equal(t, int(expected), count) +} diff --git a/core/cmd/evm_transaction_commands_test.go b/core/cmd/evm_transaction_commands_test.go index 6c079a12495..5c80a7d74a0 100644 --- a/core/cmd/evm_transaction_commands_test.go +++ b/core/cmd/evm_transaction_commands_test.go @@ -19,6 +19,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/store/models" ) @@ -156,7 +158,8 @@ func TestShell_SendEther_From_Txm(t *testing.T) { ) client, r := app.NewShellAndRenderer() db := app.GetSqlxDB() - + cfg := pgtest.NewQConfig(false) + txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg) set := flag.NewFlagSet("sendether", 0) flagSetApplyFromAction(client.SendEther, set, "") @@ -170,21 +173,26 @@ func TestShell_SendEther_From_Txm(t *testing.T) { assert.NoError(t, client.SendEther(c)) - dbEvmTx := txmgr.DbEthTx{} - require.NoError(t, db.Get(&dbEvmTx, `SELECT * FROM evm.txes`)) - require.Equal(t, "100.500000000000000000", dbEvmTx.Value.String()) - require.Equal(t, fromAddress, dbEvmTx.FromAddress) - require.Equal(t, to, dbEvmTx.ToAddress.String()) + evmTxes, err := txStore.GetAllTxes(testutils.Context(t)) + require.NoError(t, err) + require.Len(t, evmTxes, 1) + evmTx := evmTxes[0] + value := assets.Eth(evmTx.Value) + require.Equal(t, "100.500000000000000000", value.String()) + require.Equal(t, fromAddress, evmTx.FromAddress) + require.Equal(t, to, evmTx.ToAddress.String()) output := *r.Renders[0].(*cmd.EthTxPresenter) - assert.Equal(t, &dbEvmTx.FromAddress, output.From) - assert.Equal(t, &dbEvmTx.ToAddress, output.To) - assert.Equal(t, dbEvmTx.Value.String(), output.Value) - assert.Equal(t, fmt.Sprintf("%d", *dbEvmTx.Nonce), output.Nonce) - - var dbEvmTxAttempt txmgr.DbEthTxAttempt - require.NoError(t, db.Get(&dbEvmTxAttempt, `SELECT * FROM evm.tx_attempts`)) - assert.Equal(t, dbEvmTxAttempt.Hash, output.Hash) + assert.Equal(t, &evmTx.FromAddress, output.From) + assert.Equal(t, &evmTx.ToAddress, output.To) + assert.Equal(t, value.String(), output.Value) + assert.Equal(t, fmt.Sprintf("%d", *evmTx.Sequence), output.Nonce) + + attempts, err := txStore.GetAllTxAttempts(testutils.Context(t)) + require.NoError(t, err) + require.Len(t, attempts, 1) + assert.Equal(t, attempts[0].Hash, output.Hash) + } func TestShell_SendEther_From_Txm_WEI(t *testing.T) { @@ -216,6 +224,8 @@ func TestShell_SendEther_From_Txm_WEI(t *testing.T) { ) client, r := app.NewShellAndRenderer() db := app.GetSqlxDB() + cfg := pgtest.NewQConfig(false) + txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg) set := flag.NewFlagSet("sendether", 0) flagSetApplyFromAction(client.SendEther, set, "") @@ -236,19 +246,23 @@ func TestShell_SendEther_From_Txm_WEI(t *testing.T) { assert.NoError(t, client.SendEther(c)) - dbEvmTx := txmgr.DbEthTx{} - require.NoError(t, db.Get(&dbEvmTx, `SELECT * FROM evm.txes`)) - require.Equal(t, "1.000000000000000000", dbEvmTx.Value.String()) - require.Equal(t, fromAddress, dbEvmTx.FromAddress) - require.Equal(t, to, dbEvmTx.ToAddress.String()) + evmTxes, err := txStore.GetAllTxes(testutils.Context(t)) + require.NoError(t, err) + require.Len(t, evmTxes, 1) + evmTx := evmTxes[0] + value := assets.Eth(evmTx.Value) + require.Equal(t, "1.000000000000000000", value.String()) + require.Equal(t, fromAddress, evmTx.FromAddress) + require.Equal(t, to, evmTx.ToAddress.String()) output := *r.Renders[0].(*cmd.EthTxPresenter) - assert.Equal(t, &dbEvmTx.FromAddress, output.From) - assert.Equal(t, &dbEvmTx.ToAddress, output.To) - assert.Equal(t, dbEvmTx.Value.String(), output.Value) - assert.Equal(t, fmt.Sprintf("%d", *dbEvmTx.Nonce), output.Nonce) - - var dbEvmTxAttempt txmgr.DbEthTxAttempt - require.NoError(t, db.Get(&dbEvmTxAttempt, `SELECT * FROM evm.tx_attempts`)) - assert.Equal(t, dbEvmTxAttempt.Hash, output.Hash) + assert.Equal(t, &evmTx.FromAddress, output.From) + assert.Equal(t, &evmTx.ToAddress, output.To) + assert.Equal(t, value.String(), output.Value) + assert.Equal(t, fmt.Sprintf("%d", *evmTx.Sequence), output.Nonce) + + attempts, err := txStore.GetAllTxAttempts(testutils.Context(t)) + require.NoError(t, err) + require.Len(t, attempts, 1) + assert.Equal(t, attempts[0].Hash, output.Hash) } diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index fac2d7f040b..fe4cb5000c9 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -418,7 +418,7 @@ func TestShell_RebroadcastTransactions_OutsideRange_Txm(t *testing.T) { assert.NoError(t, c.RebroadcastTransactions(ctx)) - cltest.AssertEthTxAttemptCountStays(t, app.GetSqlxDB(), 1) + cltest.AssertEthTxAttemptCountStays(t, txStore, 1) }) } } diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 88abc3de5c6..72fb5c5d5de 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -972,16 +972,14 @@ func AssertPipelineRunsStays(t testing.TB, pipelineSpecID int32, db *sqlx.DB, wa } // AssertEthTxAttemptCountStays asserts that the number of tx attempts remains at the provided value -func AssertEthTxAttemptCountStays(t testing.TB, db *sqlx.DB, want int) []int64 { +func AssertEthTxAttemptCountStays(t testing.TB, txStore txmgr.TestEvmTxStore, want int) []int64 { g := gomega.NewWithT(t) var txaIds []int64 - var err error - g.Consistently(func() []int64 { - txaIds = make([]int64, 0) - err = db.Select(&txaIds, `SELECT ID FROM evm.tx_attempts ORDER BY id ASC`) + g.Consistently(func() []txmgr.TxAttempt { + attempts, err := txStore.GetAllTxAttempts(testutils.Context(t)) assert.NoError(t, err) - return txaIds + return attempts }, AssertNoActionTimeout, DBPollingInterval).Should(gomega.HaveLen(want)) return txaIds } diff --git a/core/internal/testutils/testutils.go b/core/internal/testutils/testutils.go index 6ffd873d092..81c09b1c362 100644 --- a/core/internal/testutils/testutils.go +++ b/core/internal/testutils/testutils.go @@ -425,15 +425,6 @@ func AssertCount(t *testing.T, db *sqlx.DB, tableName string, expected int64) { require.Equal(t, expected, count) } -func AssertCountPerSubject(t *testing.T, db *sqlx.DB, expected int64, subject uuid.UUID) { - t.Helper() - var count int64 - err := db.Get(&count, `SELECT COUNT(*) FROM evm.txes - WHERE state = 'unstarted' AND subject = $1;`, subject) - require.NoError(t, err) - require.Equal(t, expected, count) -} - func NewTestFlagSet() *flag.FlagSet { return flag.NewFlagSet("test", flag.PanicOnError) } diff --git a/core/services/keeper/upkeep_executer_test.go b/core/services/keeper/upkeep_executer_test.go index 123b1dc0de1..0c28ea44e98 100644 --- a/core/services/keeper/upkeep_executer_test.go +++ b/core/services/keeper/upkeep_executer_test.go @@ -334,7 +334,12 @@ func Test_UpkeepExecuter_PerformsUpkeep_Error(t *testing.T) { executer.OnNewLongestChain(testutils.Context(t), &head) g.Eventually(wasCalled.Load).Should(gomega.Equal(true)) - cltest.AssertCountStays(t, db, "evm.txes", 0) + + cfg := pgtest.NewQConfig(false) + txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg) + txes, err := txStore.GetAllTxes(testutils.Context(t)) + require.NoError(t, err) + require.Len(t, txes, 0) } func ptr[T any](t T) *T { return &t } diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/promreporter/prom_reporter_test.go index 1cebba2faf9..820c247ea87 100644 --- a/core/services/promreporter/prom_reporter_test.go +++ b/core/services/promreporter/prom_reporter_test.go @@ -112,7 +112,7 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) { etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress) cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress) cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress) - require.NoError(t, utils.JustError(db.Exec(`UPDATE evm.tx_attempts SET broadcast_before_block_num = 7 WHERE eth_tx_id = $1`, etx.ID))) + require.NoError(t, txStore.UpdateTxAttemptBroadcastBeforeBlockNum(testutils.Context(t), etx.ID, 7)) head := newHead() reporter.OnNewLongestChain(testutils.Context(t), &head) diff --git a/core/services/vrf/delegate_test.go b/core/services/vrf/delegate_test.go index d957e3c721e..ec0cee01d1f 100644 --- a/core/services/vrf/delegate_test.go +++ b/core/services/vrf/delegate_test.go @@ -403,11 +403,13 @@ func TestDelegate_InvalidLog(t *testing.T) { } } - // Ensure we have NOT queued up an eth transaction - var ethTxes []txmgr.DbEthTx - err = vuni.prm.GetQ().Select(ðTxes, `SELECT * FROM evm.txes;`) + db := pgtest.NewSqlxDB(t) + cfg := pgtest.NewQConfig(false) + txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg) + + txes, err := txStore.GetAllTxes(testutils.Context(t)) require.NoError(t, err) - require.Len(t, ethTxes, 0) + require.Len(t, txes, 0) } func TestFulfilledCheck(t *testing.T) { diff --git a/core/web/eth_keys_controller_test.go b/core/web/eth_keys_controller_test.go index d0ad27262f2..739af5820c9 100644 --- a/core/web/eth_keys_controller_test.go +++ b/core/web/eth_keys_controller_test.go @@ -16,6 +16,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" webpresenters "github.com/smartcontractkit/chainlink/v2/core/web/presenters" @@ -411,10 +412,12 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) { }) assert.NoError(t, err) - var count int - err = app.GetSqlxDB().Get(&count, `SELECT count(*) FROM evm.txes WHERE from_address = $1 AND state = 'fatal_error'`, addr) + db := app.GetSqlxDB() + txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg.Database()) + + txes, err := txStore.FindTxesByFromAddressAndState(testutils.Context(t), addr, "fatal_error") require.NoError(t, err) - assert.Equal(t, 0, count) + require.Len(t, txes, 0) client := app.NewHTTPClient(nil) chainURL := url.URL{Path: "/v2/keys/evm/chain"} @@ -437,10 +440,12 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) { assert.Equal(t, cltest.FixtureChainID.String(), updatedKey.EVMChainID.String()) assert.Equal(t, false, updatedKey.Disabled) - var s string - err = app.GetSqlxDB().Get(&s, `SELECT error FROM evm.txes WHERE from_address = $1 AND state = 'fatal_error'`, addr) + txes, err = txStore.FindTxesByFromAddressAndState(testutils.Context(t), addr, "fatal_error") require.NoError(t, err) - assert.Equal(t, "abandoned", s) + require.Len(t, txes, 1) + + tx := txes[0] + assert.Equal(t, "abandoned", tx.Error.String) } func TestETHKeysController_ChainFailure_InvalidAbandon(t *testing.T) { diff --git a/core/web/evm_transfer_controller_test.go b/core/web/evm_transfer_controller_test.go index c41219e1894..decde7eeb7e 100644 --- a/core/web/evm_transfer_controller_test.go +++ b/core/web/evm_transfer_controller_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/jmoiron/sqlx" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -17,6 +19,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/store/models" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -68,7 +72,7 @@ func TestTransfersController_CreateSuccess_From(t *testing.T) { assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Len(t, errors.Errors, 0) - cltest.AssertCount(t, app.GetSqlxDB(), "evm.txes", 1) + validateTxCount(t, app.GetSqlxDB(), 1) } func TestTransfersController_CreateSuccess_From_WEI(t *testing.T) { @@ -109,7 +113,7 @@ func TestTransfersController_CreateSuccess_From_WEI(t *testing.T) { assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Len(t, errors.Errors, 0) - cltest.AssertCount(t, app.GetSqlxDB(), "evm.txes", 1) + validateTxCount(t, app.GetSqlxDB(), 1) } func TestTransfersController_CreateSuccess_From_BalanceMonitorDisabled(t *testing.T) { @@ -155,7 +159,7 @@ func TestTransfersController_CreateSuccess_From_BalanceMonitorDisabled(t *testin assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Len(t, errors.Errors, 0) - cltest.AssertCount(t, app.GetSqlxDB(), "evm.txes", 1) + validateTxCount(t, app.GetSqlxDB(), 1) } func TestTransfersController_TransferZeroAddressError(t *testing.T) { @@ -323,7 +327,7 @@ func TestTransfersController_CreateSuccess_eip1559(t *testing.T) { err = web.ParseJSONAPIResponse(cltest.ParseResponseBody(t, resp), &resource) assert.NoError(t, err) - cltest.AssertCount(t, app.GetSqlxDB(), "evm.txes", 1) + validateTxCount(t, app.GetSqlxDB(), 1) // check returned data assert.NotEmpty(t, resource.Hash) @@ -393,3 +397,12 @@ func TestTransfersController_FindTxAttempt(t *testing.T) { assert.ErrorContains(t, err, "context canceled") }) } + +func validateTxCount(t *testing.T, db *sqlx.DB, count int) { + cfg := pgtest.NewQConfig(false) + txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg) + + txes, err := txStore.GetAllTxes(testutils.Context(t)) + require.NoError(t, err) + require.Len(t, txes, count) +}