Skip to content

Commit

Permalink
Removed direct references to TXM DB from external component tests
Browse files Browse the repository at this point in the history
  • Loading branch information
amit-momin committed Dec 11, 2023
1 parent 120bef7 commit afa6f6d
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 62 deletions.
65 changes: 65 additions & 0 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type TestEvmTxStore interface {
InsertTxAttempt(attempt *TxAttempt) error
LoadTxesAttempts(etxs []*Tx, qopts ...pg.QOpt) error
GetFatalTransactions(ctx context.Context) (txes []*Tx, err error)
GetAllTxes(ctx context.Context) (txes []*Tx, err error)
GetAllTxAttempts(ctx context.Context) (attempts []TxAttempt, err error)
CountTxesByStateAndSubject(ctx context.Context, state txmgrtypes.TxState, subject uuid.UUID) (count int, err error)
FindTxesByFromAddressAndState(ctx context.Context, fromAddress common.Address, state string) (txes []*Tx, err error)
UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error
}

type evmTxStore struct {
Expand Down Expand Up @@ -2010,6 +2015,66 @@ func (o *evmTxStore) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Co
return txes, pkgerrors.Wrap(err, "FindTxesWithAttemptsAndReceiptsByIdsAndState failed")
}

// For testing only, get all txes in the DB
func (o *evmTxStore) GetAllTxes(ctx context.Context) (txes []*Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
var dbEtxs []DbEthTx
sql := "SELECT * FROM evm.txes"
err = qq.Select(&dbEtxs, sql)
txes = make([]*Tx, len(dbEtxs))
dbEthTxsToEvmEthTxPtrs(dbEtxs, txes)
return txes, err
}

// For testing only, get all tx attempts in the DB
func (o *evmTxStore) GetAllTxAttempts(ctx context.Context) (attempts []TxAttempt, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
var dbAttempts []DbEthTxAttempt
sql := "SELECT * FROM evm.tx_attempts"
err = qq.Select(&dbAttempts, sql)
attempts = dbEthTxAttemptsToEthTxAttempts(dbAttempts)
return attempts, err
}

func (o *evmTxStore) CountTxesByStateAndSubject(ctx context.Context, state txmgrtypes.TxState, subject uuid.UUID) (count int, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
sql := "SELECT COUNT(*) FROM evm.txes WHERE state = $1 AND subject = $2"
err = qq.Get(&count, sql, state, subject)
return count, err
}

func (o *evmTxStore) FindTxesByFromAddressAndState(ctx context.Context, fromAddress common.Address, state string) (txes []*Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
sql := "SELECT * FROM evm.txes WHERE from_address = $1 AND state = $2"
var dbEtxs []DbEthTx
err = qq.Select(&dbEtxs, sql, fromAddress, state)
txes = make([]*Tx, len(dbEtxs))
dbEthTxsToEvmEthTxPtrs(dbEtxs, txes)
return txes, err
}

func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, id int64, blockNum uint) error {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
sql := "UPDATE evm.tx_attempts SET broadcast_before_block_num = $1 WHERE eth_tx_id = $2"
_, err := qq.Exec(sql, blockNum, id)
return err
}

// Returns a context that contains the values of the provided context,
// and which is canceled when either the provided contextg or TxStore parent context is canceled.
func (o *evmTxStore) mergeContexts(ctx context.Context) (context.Context, context.CancelFunc) {
Expand Down
13 changes: 10 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1811,7 +1811,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {

db := pgtest.NewSqlxDB(t)
cfg := newTestChainScopedConfig(t)
txStore := newTxStore(t, db, cfg.Database())
txStore := txmgr.NewTxStore(db, logger.Test(t), cfg.Database())
ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth()
evmtest.NewEthClientMockWithDefaultChain(t)
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore)
Expand All @@ -1822,7 +1822,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1))
}
testutils.AssertCountPerSubject(t, db, int64(5), subject1)
AssertCountPerSubject(t, txStore, int64(5), subject1)
})

t.Run("prunes if queue has exceeded capacity", func(t *testing.T) {
Expand All @@ -1831,6 +1831,13 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2))
}
testutils.AssertCountPerSubject(t, db, int64(3), subject2)
AssertCountPerSubject(t, txStore, int64(3), subject2)
})
}

func AssertCountPerSubject(t *testing.T, txStore txmgr.TestEvmTxStore, expected int64, subject uuid.UUID) {
t.Helper()
count, err := txStore.CountTxesByStateAndSubject(testutils.Context(t), "unstarted", subject)
require.NoError(t, err)
require.Equal(t, int(expected), count)
}
68 changes: 41 additions & 27 deletions core/cmd/evm_transaction_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)
Expand Down Expand Up @@ -156,7 +158,8 @@ func TestShell_SendEther_From_Txm(t *testing.T) {
)
client, r := app.NewShellAndRenderer()
db := app.GetSqlxDB()

cfg := pgtest.NewQConfig(false)
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg)
set := flag.NewFlagSet("sendether", 0)
flagSetApplyFromAction(client.SendEther, set, "")

Expand All @@ -170,21 +173,26 @@ func TestShell_SendEther_From_Txm(t *testing.T) {

assert.NoError(t, client.SendEther(c))

dbEvmTx := txmgr.DbEthTx{}
require.NoError(t, db.Get(&dbEvmTx, `SELECT * FROM evm.txes`))
require.Equal(t, "100.500000000000000000", dbEvmTx.Value.String())
require.Equal(t, fromAddress, dbEvmTx.FromAddress)
require.Equal(t, to, dbEvmTx.ToAddress.String())
evmTxes, err := txStore.GetAllTxes(testutils.Context(t))
require.NoError(t, err)
require.Len(t, evmTxes, 1)
evmTx := evmTxes[0]
value := assets.Eth(evmTx.Value)
require.Equal(t, "100.500000000000000000", value.String())
require.Equal(t, fromAddress, evmTx.FromAddress)
require.Equal(t, to, evmTx.ToAddress.String())

output := *r.Renders[0].(*cmd.EthTxPresenter)
assert.Equal(t, &dbEvmTx.FromAddress, output.From)
assert.Equal(t, &dbEvmTx.ToAddress, output.To)
assert.Equal(t, dbEvmTx.Value.String(), output.Value)
assert.Equal(t, fmt.Sprintf("%d", *dbEvmTx.Nonce), output.Nonce)

var dbEvmTxAttempt txmgr.DbEthTxAttempt
require.NoError(t, db.Get(&dbEvmTxAttempt, `SELECT * FROM evm.tx_attempts`))
assert.Equal(t, dbEvmTxAttempt.Hash, output.Hash)
assert.Equal(t, &evmTx.FromAddress, output.From)
assert.Equal(t, &evmTx.ToAddress, output.To)
assert.Equal(t, value.String(), output.Value)
assert.Equal(t, fmt.Sprintf("%d", *evmTx.Sequence), output.Nonce)

attempts, err := txStore.GetAllTxAttempts(testutils.Context(t))
require.NoError(t, err)
require.Len(t, attempts, 1)
assert.Equal(t, attempts[0].Hash, output.Hash)

}

func TestShell_SendEther_From_Txm_WEI(t *testing.T) {
Expand Down Expand Up @@ -216,6 +224,8 @@ func TestShell_SendEther_From_Txm_WEI(t *testing.T) {
)
client, r := app.NewShellAndRenderer()
db := app.GetSqlxDB()
cfg := pgtest.NewQConfig(false)
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg)

set := flag.NewFlagSet("sendether", 0)
flagSetApplyFromAction(client.SendEther, set, "")
Expand All @@ -236,19 +246,23 @@ func TestShell_SendEther_From_Txm_WEI(t *testing.T) {

assert.NoError(t, client.SendEther(c))

dbEvmTx := txmgr.DbEthTx{}
require.NoError(t, db.Get(&dbEvmTx, `SELECT * FROM evm.txes`))
require.Equal(t, "1.000000000000000000", dbEvmTx.Value.String())
require.Equal(t, fromAddress, dbEvmTx.FromAddress)
require.Equal(t, to, dbEvmTx.ToAddress.String())
evmTxes, err := txStore.GetAllTxes(testutils.Context(t))
require.NoError(t, err)
require.Len(t, evmTxes, 1)
evmTx := evmTxes[0]
value := assets.Eth(evmTx.Value)
require.Equal(t, "1.000000000000000000", value.String())
require.Equal(t, fromAddress, evmTx.FromAddress)
require.Equal(t, to, evmTx.ToAddress.String())

output := *r.Renders[0].(*cmd.EthTxPresenter)
assert.Equal(t, &dbEvmTx.FromAddress, output.From)
assert.Equal(t, &dbEvmTx.ToAddress, output.To)
assert.Equal(t, dbEvmTx.Value.String(), output.Value)
assert.Equal(t, fmt.Sprintf("%d", *dbEvmTx.Nonce), output.Nonce)

var dbEvmTxAttempt txmgr.DbEthTxAttempt
require.NoError(t, db.Get(&dbEvmTxAttempt, `SELECT * FROM evm.tx_attempts`))
assert.Equal(t, dbEvmTxAttempt.Hash, output.Hash)
assert.Equal(t, &evmTx.FromAddress, output.From)
assert.Equal(t, &evmTx.ToAddress, output.To)
assert.Equal(t, value.String(), output.Value)
assert.Equal(t, fmt.Sprintf("%d", *evmTx.Sequence), output.Nonce)

attempts, err := txStore.GetAllTxAttempts(testutils.Context(t))
require.NoError(t, err)
require.Len(t, attempts, 1)
assert.Equal(t, attempts[0].Hash, output.Hash)
}
2 changes: 1 addition & 1 deletion core/cmd/shell_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func TestShell_RebroadcastTransactions_OutsideRange_Txm(t *testing.T) {

assert.NoError(t, c.RebroadcastTransactions(ctx))

cltest.AssertEthTxAttemptCountStays(t, app.GetSqlxDB(), 1)
cltest.AssertEthTxAttemptCountStays(t, txStore, 1)
})
}
}
Expand Down
10 changes: 4 additions & 6 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,16 +972,14 @@ func AssertPipelineRunsStays(t testing.TB, pipelineSpecID int32, db *sqlx.DB, wa
}

// AssertEthTxAttemptCountStays asserts that the number of tx attempts remains at the provided value
func AssertEthTxAttemptCountStays(t testing.TB, db *sqlx.DB, want int) []int64 {
func AssertEthTxAttemptCountStays(t testing.TB, txStore txmgr.TestEvmTxStore, want int) []int64 {
g := gomega.NewWithT(t)

var txaIds []int64
var err error
g.Consistently(func() []int64 {
txaIds = make([]int64, 0)
err = db.Select(&txaIds, `SELECT ID FROM evm.tx_attempts ORDER BY id ASC`)
g.Consistently(func() []txmgr.TxAttempt {
attempts, err := txStore.GetAllTxAttempts(testutils.Context(t))
assert.NoError(t, err)
return txaIds
return attempts
}, AssertNoActionTimeout, DBPollingInterval).Should(gomega.HaveLen(want))
return txaIds
}
Expand Down
9 changes: 0 additions & 9 deletions core/internal/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,15 +425,6 @@ func AssertCount(t *testing.T, db *sqlx.DB, tableName string, expected int64) {
require.Equal(t, expected, count)
}

func AssertCountPerSubject(t *testing.T, db *sqlx.DB, expected int64, subject uuid.UUID) {
t.Helper()
var count int64
err := db.Get(&count, `SELECT COUNT(*) FROM evm.txes
WHERE state = 'unstarted' AND subject = $1;`, subject)
require.NoError(t, err)
require.Equal(t, expected, count)
}

func NewTestFlagSet() *flag.FlagSet {
return flag.NewFlagSet("test", flag.PanicOnError)
}
Expand Down
7 changes: 6 additions & 1 deletion core/services/keeper/upkeep_executer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,12 @@ func Test_UpkeepExecuter_PerformsUpkeep_Error(t *testing.T) {
executer.OnNewLongestChain(testutils.Context(t), &head)

g.Eventually(wasCalled.Load).Should(gomega.Equal(true))
cltest.AssertCountStays(t, db, "evm.txes", 0)

cfg := pgtest.NewQConfig(false)
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg)
txes, err := txStore.GetAllTxes(testutils.Context(t))
require.NoError(t, err)
require.Len(t, txes, 0)
}

func ptr[T any](t T) *T { return &t }
2 changes: 1 addition & 1 deletion core/services/promreporter/prom_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) {
etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress)
cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress)
cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress)
require.NoError(t, utils.JustError(db.Exec(`UPDATE evm.tx_attempts SET broadcast_before_block_num = 7 WHERE eth_tx_id = $1`, etx.ID)))
require.NoError(t, txStore.UpdateTxAttemptBroadcastBeforeBlockNum(testutils.Context(t), etx.ID, 7))

head := newHead()
reporter.OnNewLongestChain(testutils.Context(t), &head)
Expand Down
10 changes: 6 additions & 4 deletions core/services/vrf/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,13 @@ func TestDelegate_InvalidLog(t *testing.T) {
}
}

// Ensure we have NOT queued up an eth transaction
var ethTxes []txmgr.DbEthTx
err = vuni.prm.GetQ().Select(&ethTxes, `SELECT * FROM evm.txes;`)
db := pgtest.NewSqlxDB(t)
cfg := pgtest.NewQConfig(false)
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg)

txes, err := txStore.GetAllTxes(testutils.Context(t))
require.NoError(t, err)
require.Len(t, ethTxes, 0)
require.Len(t, txes, 0)
}

func TestFulfilledCheck(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions core/web/eth_keys_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
webpresenters "github.com/smartcontractkit/chainlink/v2/core/web/presenters"
Expand Down Expand Up @@ -411,10 +412,12 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) {
})
assert.NoError(t, err)

var count int
err = app.GetSqlxDB().Get(&count, `SELECT count(*) FROM evm.txes WHERE from_address = $1 AND state = 'fatal_error'`, addr)
db := app.GetSqlxDB()
txStore := txmgr.NewTxStore(db, logger.TestLogger(t), cfg.Database())

txes, err := txStore.FindTxesByFromAddressAndState(testutils.Context(t), addr, "fatal_error")
require.NoError(t, err)
assert.Equal(t, 0, count)
require.Len(t, txes, 0)

client := app.NewHTTPClient(nil)
chainURL := url.URL{Path: "/v2/keys/evm/chain"}
Expand All @@ -437,10 +440,12 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) {
assert.Equal(t, cltest.FixtureChainID.String(), updatedKey.EVMChainID.String())
assert.Equal(t, false, updatedKey.Disabled)

var s string
err = app.GetSqlxDB().Get(&s, `SELECT error FROM evm.txes WHERE from_address = $1 AND state = 'fatal_error'`, addr)
txes, err = txStore.FindTxesByFromAddressAndState(testutils.Context(t), addr, "fatal_error")
require.NoError(t, err)
assert.Equal(t, "abandoned", s)
require.Len(t, txes, 1)

tx := txes[0]
assert.Equal(t, "abandoned", tx.Error.String)
}

func TestETHKeysController_ChainFailure_InvalidAbandon(t *testing.T) {
Expand Down
Loading

0 comments on commit afa6f6d

Please sign in to comment.