Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove direct references to TXM DB from external component tests #11538

Merged
merged 4 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ type TestEvmTxStore interface {
InsertTxAttempt(attempt *TxAttempt) error
LoadTxesAttempts(etxs []*Tx, qopts ...pg.QOpt) error
GetFatalTransactions(ctx context.Context) (txes []*Tx, err error)
GetAllTxes(ctx context.Context) (txes []*Tx, err error)
GetAllTxAttempts(ctx context.Context) (attempts []TxAttempt, err error)
CountTxesByStateAndSubject(ctx context.Context, state txmgrtypes.TxState, subject uuid.UUID) (count int, err error)
FindTxesByFromAddressAndState(ctx context.Context, fromAddress common.Address, state string) (txes []*Tx, err error)
UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error
}

type evmTxStore struct {
Expand Down Expand Up @@ -2011,6 +2016,66 @@ func (o *evmTxStore) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Co
return txes, pkgerrors.Wrap(err, "FindTxesWithAttemptsAndReceiptsByIdsAndState failed")
}

// For testing only, get all txes in the DB
func (o *evmTxStore) GetAllTxes(ctx context.Context) (txes []*Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
var dbEtxs []DbEthTx
sql := "SELECT * FROM evm.txes"
err = qq.Select(&dbEtxs, sql)
txes = make([]*Tx, len(dbEtxs))
dbEthTxsToEvmEthTxPtrs(dbEtxs, txes)
return txes, err
}

// For testing only, get all tx attempts in the DB
func (o *evmTxStore) GetAllTxAttempts(ctx context.Context) (attempts []TxAttempt, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
var dbAttempts []DbEthTxAttempt
sql := "SELECT * FROM evm.tx_attempts"
err = qq.Select(&dbAttempts, sql)
attempts = dbEthTxAttemptsToEthTxAttempts(dbAttempts)
return attempts, err
}

func (o *evmTxStore) CountTxesByStateAndSubject(ctx context.Context, state txmgrtypes.TxState, subject uuid.UUID) (count int, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
sql := "SELECT COUNT(*) FROM evm.txes WHERE state = $1 AND subject = $2"
err = qq.Get(&count, sql, state, subject)
return count, err
}

func (o *evmTxStore) FindTxesByFromAddressAndState(ctx context.Context, fromAddress common.Address, state string) (txes []*Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
sql := "SELECT * FROM evm.txes WHERE from_address = $1 AND state = $2"
var dbEtxs []DbEthTx
err = qq.Select(&dbEtxs, sql, fromAddress, state)
txes = make([]*Tx, len(dbEtxs))
dbEthTxsToEvmEthTxPtrs(dbEtxs, txes)
return txes, err
}

func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
sql := "UPDATE evm.tx_attempts SET broadcast_before_block_num = $1 WHERE eth_tx_id = $2"
_, err := qq.Exec(sql, blockNum, id)
return err
}

// Returns a context that contains the values of the provided context,
// and which is canceled when either the provided contextg or TxStore parent context is canceled.
func (o *evmTxStore) mergeContexts(ctx context.Context) (context.Context, context.CancelFunc) {
Expand Down
13 changes: 10 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1811,7 +1811,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {

db := pgtest.NewSqlxDB(t)
cfg := newTestChainScopedConfig(t)
txStore := newTxStore(t, db, cfg.Database())
txStore := txmgr.NewTxStore(db, logger.Test(t), cfg.Database())
ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth()
evmtest.NewEthClientMockWithDefaultChain(t)
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore)
Expand All @@ -1822,7 +1822,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1))
}
testutils.AssertCountPerSubject(t, db, int64(5), subject1)
AssertCountPerSubject(t, txStore, int64(5), subject1)
})

t.Run("prunes if queue has exceeded capacity", func(t *testing.T) {
Expand All @@ -1831,6 +1831,13 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2))
}
testutils.AssertCountPerSubject(t, db, int64(3), subject2)
AssertCountPerSubject(t, txStore, int64(3), subject2)
})
}

func AssertCountPerSubject(t *testing.T, txStore txmgr.TestEvmTxStore, expected int64, subject uuid.UUID) {
t.Helper()
count, err := txStore.CountTxesByStateAndSubject(testutils.Context(t), "unstarted", subject)
require.NoError(t, err)
require.Equal(t, int(expected), count)
}
68 changes: 41 additions & 27 deletions core/cmd/evm_transaction_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)
Expand Down Expand Up @@ -156,7 +158,8 @@ func TestShell_SendEther_From_Txm(t *testing.T) {
)
client, r := app.NewShellAndRenderer()
db := app.GetSqlxDB()

cfg := pgtest.NewQConfig(false)
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg)
set := flag.NewFlagSet("sendether", 0)
flagSetApplyFromAction(client.SendEther, set, "")

Expand All @@ -170,21 +173,26 @@ func TestShell_SendEther_From_Txm(t *testing.T) {

assert.NoError(t, client.SendEther(c))

dbEvmTx := txmgr.DbEthTx{}
require.NoError(t, db.Get(&dbEvmTx, `SELECT * FROM evm.txes`))
require.Equal(t, "100.500000000000000000", dbEvmTx.Value.String())
require.Equal(t, fromAddress, dbEvmTx.FromAddress)
require.Equal(t, to, dbEvmTx.ToAddress.String())
evmTxes, err := txStore.GetAllTxes(testutils.Context(t))
require.NoError(t, err)
require.Len(t, evmTxes, 1)
evmTx := evmTxes[0]
value := assets.Eth(evmTx.Value)
require.Equal(t, "100.500000000000000000", value.String())
require.Equal(t, fromAddress, evmTx.FromAddress)
require.Equal(t, to, evmTx.ToAddress.String())

output := *r.Renders[0].(*cmd.EthTxPresenter)
assert.Equal(t, &dbEvmTx.FromAddress, output.From)
assert.Equal(t, &dbEvmTx.ToAddress, output.To)
assert.Equal(t, dbEvmTx.Value.String(), output.Value)
assert.Equal(t, fmt.Sprintf("%d", *dbEvmTx.Nonce), output.Nonce)

var dbEvmTxAttempt txmgr.DbEthTxAttempt
require.NoError(t, db.Get(&dbEvmTxAttempt, `SELECT * FROM evm.tx_attempts`))
assert.Equal(t, dbEvmTxAttempt.Hash, output.Hash)
assert.Equal(t, &evmTx.FromAddress, output.From)
assert.Equal(t, &evmTx.ToAddress, output.To)
assert.Equal(t, value.String(), output.Value)
assert.Equal(t, fmt.Sprintf("%d", *evmTx.Sequence), output.Nonce)

attempts, err := txStore.GetAllTxAttempts(testutils.Context(t))
require.NoError(t, err)
require.Len(t, attempts, 1)
assert.Equal(t, attempts[0].Hash, output.Hash)

}

func TestShell_SendEther_From_Txm_WEI(t *testing.T) {
Expand Down Expand Up @@ -216,6 +224,8 @@ func TestShell_SendEther_From_Txm_WEI(t *testing.T) {
)
client, r := app.NewShellAndRenderer()
db := app.GetSqlxDB()
cfg := pgtest.NewQConfig(false)
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg)

set := flag.NewFlagSet("sendether", 0)
flagSetApplyFromAction(client.SendEther, set, "")
Expand All @@ -236,19 +246,23 @@ func TestShell_SendEther_From_Txm_WEI(t *testing.T) {

assert.NoError(t, client.SendEther(c))

dbEvmTx := txmgr.DbEthTx{}
require.NoError(t, db.Get(&dbEvmTx, `SELECT * FROM evm.txes`))
require.Equal(t, "1.000000000000000000", dbEvmTx.Value.String())
require.Equal(t, fromAddress, dbEvmTx.FromAddress)
require.Equal(t, to, dbEvmTx.ToAddress.String())
evmTxes, err := txStore.GetAllTxes(testutils.Context(t))
require.NoError(t, err)
require.Len(t, evmTxes, 1)
evmTx := evmTxes[0]
value := assets.Eth(evmTx.Value)
require.Equal(t, "1.000000000000000000", value.String())
require.Equal(t, fromAddress, evmTx.FromAddress)
require.Equal(t, to, evmTx.ToAddress.String())

output := *r.Renders[0].(*cmd.EthTxPresenter)
assert.Equal(t, &dbEvmTx.FromAddress, output.From)
assert.Equal(t, &dbEvmTx.ToAddress, output.To)
assert.Equal(t, dbEvmTx.Value.String(), output.Value)
assert.Equal(t, fmt.Sprintf("%d", *dbEvmTx.Nonce), output.Nonce)

var dbEvmTxAttempt txmgr.DbEthTxAttempt
require.NoError(t, db.Get(&dbEvmTxAttempt, `SELECT * FROM evm.tx_attempts`))
assert.Equal(t, dbEvmTxAttempt.Hash, output.Hash)
assert.Equal(t, &evmTx.FromAddress, output.From)
assert.Equal(t, &evmTx.ToAddress, output.To)
assert.Equal(t, value.String(), output.Value)
assert.Equal(t, fmt.Sprintf("%d", *evmTx.Sequence), output.Nonce)

attempts, err := txStore.GetAllTxAttempts(testutils.Context(t))
require.NoError(t, err)
require.Len(t, attempts, 1)
assert.Equal(t, attempts[0].Hash, output.Hash)
}
2 changes: 1 addition & 1 deletion core/cmd/shell_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func TestShell_RebroadcastTransactions_OutsideRange_Txm(t *testing.T) {

assert.NoError(t, c.RebroadcastTransactions(ctx))

cltest.AssertEthTxAttemptCountStays(t, app.GetSqlxDB(), 1)
cltest.AssertEthTxAttemptCountStays(t, txStore, 1)
})
}
}
Expand Down
10 changes: 4 additions & 6 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,16 +974,14 @@ func AssertPipelineRunsStays(t testing.TB, pipelineSpecID int32, db *sqlx.DB, wa
}

// AssertEthTxAttemptCountStays asserts that the number of tx attempts remains at the provided value
func AssertEthTxAttemptCountStays(t testing.TB, db *sqlx.DB, want int) []int64 {
func AssertEthTxAttemptCountStays(t testing.TB, txStore txmgr.TestEvmTxStore, want int) []int64 {
g := gomega.NewWithT(t)

var txaIds []int64
var err error
g.Consistently(func() []int64 {
txaIds = make([]int64, 0)
err = db.Select(&txaIds, `SELECT ID FROM evm.tx_attempts ORDER BY id ASC`)
g.Consistently(func() []txmgr.TxAttempt {
attempts, err := txStore.GetAllTxAttempts(testutils.Context(t))
assert.NoError(t, err)
return txaIds
return attempts
}, AssertNoActionTimeout, DBPollingInterval).Should(gomega.HaveLen(want))
return txaIds
}
Expand Down
9 changes: 0 additions & 9 deletions core/internal/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,15 +425,6 @@ func AssertCount(t *testing.T, db *sqlx.DB, tableName string, expected int64) {
require.Equal(t, expected, count)
}

func AssertCountPerSubject(t *testing.T, db *sqlx.DB, expected int64, subject uuid.UUID) {
t.Helper()
var count int64
err := db.Get(&count, `SELECT COUNT(*) FROM evm.txes
WHERE state = 'unstarted' AND subject = $1;`, subject)
require.NoError(t, err)
require.Equal(t, expected, count)
}

func NewTestFlagSet() *flag.FlagSet {
return flag.NewFlagSet("test", flag.PanicOnError)
}
Expand Down
7 changes: 6 additions & 1 deletion core/services/keeper/upkeep_executer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,12 @@ func Test_UpkeepExecuter_PerformsUpkeep_Error(t *testing.T) {
executer.OnNewLongestChain(testutils.Context(t), &head)

g.Eventually(wasCalled.Load).Should(gomega.Equal(true))
cltest.AssertCountStays(t, db, "evm.txes", 0)

cfg := pgtest.NewQConfig(false)
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg)
txes, err := txStore.GetAllTxes(testutils.Context(t))
require.NoError(t, err)
require.Len(t, txes, 0)
}

func ptr[T any](t T) *T { return &t }
2 changes: 1 addition & 1 deletion core/services/promreporter/prom_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) {
etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress)
cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress)
cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress)
require.NoError(t, utils.JustError(db.Exec(`UPDATE evm.tx_attempts SET broadcast_before_block_num = 7 WHERE eth_tx_id = $1`, etx.ID)))
require.NoError(t, txStore.UpdateTxAttemptBroadcastBeforeBlockNum(testutils.Context(t), etx.ID, 7))

head := newHead()
reporter.OnNewLongestChain(testutils.Context(t), &head)
Expand Down
10 changes: 6 additions & 4 deletions core/services/vrf/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,13 @@ func TestDelegate_InvalidLog(t *testing.T) {
}
}

// Ensure we have NOT queued up an eth transaction
var ethTxes []txmgr.DbEthTx
err = vuni.prm.GetQ().Select(&ethTxes, `SELECT * FROM evm.txes;`)
db := pgtest.NewSqlxDB(t)
cfg := pgtest.NewQConfig(false)
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg)

txes, err := txStore.GetAllTxes(testutils.Context(t))
require.NoError(t, err)
require.Len(t, ethTxes, 0)
require.Len(t, txes, 0)
}

func TestFulfilledCheck(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions core/web/eth_keys_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
webpresenters "github.com/smartcontractkit/chainlink/v2/core/web/presenters"
Expand Down Expand Up @@ -411,10 +412,12 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) {
})
assert.NoError(t, err)

var count int
err = app.GetSqlxDB().Get(&count, `SELECT count(*) FROM evm.txes WHERE from_address = $1 AND state = 'fatal_error'`, addr)
db := app.GetSqlxDB()
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg.Database())

txes, err := txStore.FindTxesByFromAddressAndState(testutils.Context(t), addr, "fatal_error")
require.NoError(t, err)
assert.Equal(t, 0, count)
require.Len(t, txes, 0)

client := app.NewHTTPClient(nil)
chainURL := url.URL{Path: "/v2/keys/evm/chain"}
Expand All @@ -437,10 +440,12 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) {
assert.Equal(t, cltest.FixtureChainID.String(), updatedKey.EVMChainID.String())
assert.Equal(t, false, updatedKey.Disabled)

var s string
err = app.GetSqlxDB().Get(&s, `SELECT error FROM evm.txes WHERE from_address = $1 AND state = 'fatal_error'`, addr)
txes, err = txStore.FindTxesByFromAddressAndState(testutils.Context(t), addr, "fatal_error")
require.NoError(t, err)
assert.Equal(t, "abandoned", s)
require.Len(t, txes, 1)

tx := txes[0]
assert.Equal(t, "abandoned", tx.Error.String)
}

func TestETHKeysController_ChainFailure_InvalidAbandon(t *testing.T) {
Expand Down
Loading