From 1379980d42fd94ee51b0cea7933248d8ec9027ca Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 27 Nov 2023 13:18:56 -0500 Subject: [PATCH 1/8] Refactor prom reporter db API --- common/txmgr/types/mocks/tx_store.go | 105 ++++++++++++++++++ common/txmgr/types/tx_store.go | 5 + core/chains/evm/txmgr/evm_tx_store.go | 86 ++++++++++++++ core/chains/evm/txmgr/evm_tx_store_test.go | 84 ++++++++++++++ core/chains/evm/txmgr/mocks/evm_tx_store.go | 105 ++++++++++++++++++ core/services/chainlink/application.go | 28 ++--- core/services/promreporter/prom_reporter.go | 63 ++++------- .../promreporter/prom_reporter_test.go | 12 +- docs/CHANGELOG.md | 1 + 9 files changed, 427 insertions(+), 62 deletions(-) diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 0e344b9b6f9..49386f6762b 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -9,6 +9,8 @@ import ( feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" mock "github.com/stretchr/testify/mock" + null "gopkg.in/guregu/null.v4" + time "time" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -56,6 +58,30 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { _m.Called() } +// CountAllUnconfirmedTransactions provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountAllUnconfirmedTransactions(ctx context.Context, chainID CHAIN_ID) (uint32, error) { + ret := _m.Called(ctx, chainID) + + var r0 uint32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (uint32, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) uint32); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(uint32) + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CountUnconfirmedTransactions provides a mock function with given fields: ctx, fromAddress, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (uint32, error) { ret := _m.Called(ctx, fromAddress, chainID) @@ -142,6 +168,30 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInPro return r0 } +// FindEarliestUnconfirmedTxBlock provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Int, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Int); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Int) + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindLatestSequence provides a mock function with given fields: ctx, fromAddress, chainId func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLatestSequence(ctx context.Context, fromAddress ADDR, chainId CHAIN_ID) (SEQ, error) { ret := _m.Called(ctx, fromAddress, chainId) @@ -166,6 +216,30 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLatestS return r0, r1 } +// FindMinUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindMinUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Time, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Time); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindNextUnstartedTransactionFromAddress provides a mock function with given fields: ctx, etx, fromAddress, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error { ret := _m.Called(ctx, etx, fromAddress, chainID) @@ -544,6 +618,37 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgre return r0, r1 } +// GetPipelineRunStats provides a mock function with given fields: ctx +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetPipelineRunStats(ctx context.Context) (int, int, error) { + ret := _m.Called(ctx) + + var r0 int + var r1 int + var r2 error + if rf, ok := ret.Get(0).(func(context.Context) (int, int, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) int); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func(context.Context) int); ok { + r1 = rf(ctx) + } else { + r1 = ret.Get(1).(int) + } + + if rf, ok := ret.Get(2).(func(context.Context) error); ok { + r2 = rf(ctx) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // GetTxInProgress provides a mock function with given fields: ctx, fromAddress func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, fromAddress) diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index c2dfeee4146..1e810b1cba1 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -2,6 +2,7 @@ package types import ( "context" + "gopkg.in/guregu/null.v4" "math/big" "time" @@ -65,6 +66,7 @@ type TransactionStore[ FEE feetypes.Fee, ] interface { CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (count uint32, err error) + CountAllUnconfirmedTransactions(ctx context.Context, chainID CHAIN_ID) (count uint32, err error) CountUnstartedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (count uint32, err error) CreateTransaction(ctx context.Context, txRequest TxRequest[ADDR, TX_HASH], chainID CHAIN_ID) (tx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) DeleteInProgressAttempt(ctx context.Context, attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error @@ -80,6 +82,9 @@ type TransactionStore[ FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + FindMinUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) + FindEarliestUnconfirmedTxBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) + GetPipelineRunStats(ctx context.Context) (taskRunsQueued int, runsQueued int, err error) GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) HasInProgressTransaction(ctx context.Context, account ADDR, chainID CHAIN_ID) (exists bool, err error) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 9262c85a833..a955d139708 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "go.uber.org/multierr" "math/big" "strings" "time" @@ -1107,6 +1108,77 @@ ORDER BY nonce ASC return etxs, pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed") } +func (o *evmTxStore) FindMinUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (broadcastAt nullv4.Time, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + err = qq.Transaction(func(tx pg.Queryer) error { + if err = qq.QueryRowContext(ctx, `SELECT min(initial_broadcast_at) FROM evm.txes WHERE state = 'unconfirmed' AND evm_chain_id = $1`, chainID.String()).Scan(&broadcastAt); err != nil { + return fmt.Errorf("failed to query for unconfirmed eth_tx count: %w", err) + } + return nil + }, pg.OptReadOnlyTx()) + return broadcastAt, err +} + +func (o *evmTxStore) FindEarliestUnconfirmedTxBlock(ctx context.Context, chainID *big.Int) (earliestUnconfirmedTxBlock nullv4.Int, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + err = qq.Transaction(func(tx pg.Queryer) error { + err = qq.QueryRowContext(ctx, ` +SELECT MIN(broadcast_before_block_num) FROM evm.tx_attempts +JOIN evm.txes ON evm.txes.id = evm.tx_attempts.eth_tx_id +WHERE evm.txes.state = 'unconfirmed' +AND evm_chain_id = $1`, chainID.String()).Scan(&earliestUnconfirmedTxBlock) + if err != nil { + return fmt.Errorf("failed to query for earliest unconfirmed tx block: %w", err) + } + return nil + }, pg.OptReadOnlyTx()) + return earliestUnconfirmedTxBlock, err +} + +func (o *evmTxStore) GetPipelineRunStats(ctx context.Context) (taskRunsQueued int, runsQueued int, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + var rows *sql.Rows + err = qq.Transaction(func(tx pg.Queryer) error { + rows, err = qq.QueryContext(ctx, `SELECT pipeline_run_id FROM pipeline_task_runs WHERE finished_at IS NULL`) + if err != nil { + return fmt.Errorf("failed to query for pipeline_run_id: %w", err) + } + if rows.Err() != nil { + return fmt.Errorf("failed to query for pipeline_run_id: %w", rows.Err()) + } + return nil + }, pg.OptReadOnlyTx()) + + defer func() { + err = multierr.Combine(err, rows.Close()) + }() + + taskRunsQueued = 0 + pipelineRunsQueuedSet := make(map[int32]struct{}) + for rows.Next() { + var pipelineRunID int32 + if err = rows.Scan(&pipelineRunID); err != nil { + return 0, 0, fmt.Errorf("unexpected error scanning row: %w", err) + } + taskRunsQueued++ + pipelineRunsQueuedSet[pipelineRunID] = struct{}{} + } + if err = rows.Err(); err != nil { + return 0, 0, err + } + runsQueued = len(pipelineRunsQueuedSet) + return taskRunsQueued, runsQueued, err +} + func saveAttemptWithNewState(q pg.Queryer, timeout time.Duration, logger logger.Logger, attempt TxAttempt, broadcastAt time.Time) error { ctx, cancel := context.WithTimeout(context.Background(), timeout) var dbAttempt DbEthTxAttempt @@ -1638,6 +1710,20 @@ func (o *evmTxStore) CountUnconfirmedTransactions(ctx context.Context, fromAddre return o.countTransactionsWithState(ctx, fromAddress, txmgr.TxUnconfirmed, chainID) } +// CountAllUnconfirmedTransactions returns the number of unconfirmed transactions for all fromAddresses +func (o *evmTxStore) CountAllUnconfirmedTransactions(ctx context.Context, chainID *big.Int) (count uint32, err error) { + var cancel context.CancelFunc + ctx, cancel = o.mergeContexts(ctx) + defer cancel() + qq := o.q.WithOpts(pg.WithParentCtx(ctx)) + err = qq.Get(&count, `SELECT count(*) FROM evm.txes WHERE state = $1 AND evm_chain_id = $2`, + txmgr.TxUnconfirmed, chainID.String()) + if err != nil { + return 0, fmt.Errorf("failed to CountAllUnconfirmedTransactions: %w", err) + } + return count, nil +} + // CountUnstartedTransactions returns the number of unconfirmed transactions func (o *evmTxStore) CountUnstartedTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (count uint32, err error) { return o.countTransactionsWithState(ctx, fromAddress, txmgr.TxUnstarted, chainID) diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 73bfc6fc85a..1a66315c2a4 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -822,6 +822,90 @@ func TestORM_FindTransactionsConfirmedInBlockRange(t *testing.T) { }) } +func TestORM_FindMinUnconfirmedBroadcastTime(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + t.Run("no unconfirmed eth txes", func(t *testing.T) { + broadcastAt, err := txStore.FindMinUnconfirmedBroadcastTime(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.False(t, broadcastAt.Valid) + }) + + t.Run("verify broadcast time", func(t *testing.T) { + tx := cltest.MustInsertUnconfirmedEthTx(t, txStore, 123, fromAddress) + broadcastAt, err := txStore.FindMinUnconfirmedBroadcastTime(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.True(t, broadcastAt.Ptr().Equal(*tx.BroadcastAt)) + }) +} + +func TestORM_FindEarliestUnconfirmedTxBlock(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + _, fromAddress2 := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + + t.Run("no earliest unconfirmed tx block", func(t *testing.T) { + earliestBlock, err := txStore.FindEarliestUnconfirmedTxBlock(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.False(t, earliestBlock.Valid) + }) + + t.Run("verify earliest unconfirmed tx block", func(t *testing.T) { + var blockNum int64 = 2 + tx := cltest.MustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 123, blockNum, time.Now(), fromAddress) + _ = cltest.MustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 123, blockNum, time.Now().Add(time.Minute), fromAddress2) + err := txStore.UpdateTxsUnconfirmed(testutils.Context(t), []int64{tx.ID}) + require.NoError(t, err) + + earliestBlock, err := txStore.FindEarliestUnconfirmedTxBlock(testutils.Context(t), ethClient.ConfiguredChainID()) + require.NoError(t, err) + require.True(t, earliestBlock.Valid) + require.Equal(t, blockNum, earliestBlock.Int64) + }) +} + +func TestORM_GetPipelineRunStats(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + + t.Run("no pipeline run tasks", func(t *testing.T) { + taskRunsQueued, runsQueued, err := txStore.GetPipelineRunStats(testutils.Context(t)) + require.NoError(t, err) + require.Equal(t, 0, taskRunsQueued) + require.Equal(t, 0, runsQueued) + }) + + t.Run("queued pipeline run tasks", func(t *testing.T) { + pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_task_runs_pipeline_run_id_fkey DEFERRED`) + + numRuns := int64(5) + for id := int64(0); id < numRuns; id++ { + cltest.MustInsertUnfinishedPipelineTaskRun(t, db, id) + } + + taskRunsQueued, runsQueued, err := txStore.GetPipelineRunStats(testutils.Context(t)) + require.NoError(t, err) + require.Equal(t, int(numRuns), taskRunsQueued) + require.Equal(t, int(numRuns), runsQueued) + }) +} + func TestORM_SaveInsufficientEthAttempt(t *testing.T) { t.Parallel() diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index f491bda40bb..66419caadd8 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -14,6 +14,8 @@ import ( mock "github.com/stretchr/testify/mock" + null "gopkg.in/guregu/null.v4" + time "time" types "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -59,6 +61,30 @@ func (_m *EvmTxStore) Close() { _m.Called() } +// CountAllUnconfirmedTransactions provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) CountAllUnconfirmedTransactions(ctx context.Context, chainID *big.Int) (uint32, error) { + ret := _m.Called(ctx, chainID) + + var r0 uint32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (uint32, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) uint32); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(uint32) + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CountUnconfirmedTransactions provides a mock function with given fields: ctx, fromAddress, chainID func (_m *EvmTxStore) CountUnconfirmedTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (uint32, error) { ret := _m.Called(ctx, fromAddress, chainID) @@ -145,6 +171,30 @@ func (_m *EvmTxStore) DeleteInProgressAttempt(ctx context.Context, attempt types return r0 } +// FindEarliestUnconfirmedTxBlock provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) FindEarliestUnconfirmedTxBlock(ctx context.Context, chainID *big.Int) (null.Int, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (null.Int, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) null.Int); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Int) + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindLatestSequence provides a mock function with given fields: ctx, fromAddress, chainId func (_m *EvmTxStore) FindLatestSequence(ctx context.Context, fromAddress common.Address, chainId *big.Int) (evmtypes.Nonce, error) { ret := _m.Called(ctx, fromAddress, chainId) @@ -169,6 +219,30 @@ func (_m *EvmTxStore) FindLatestSequence(ctx context.Context, fromAddress common return r0, r1 } +// FindMinUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) FindMinUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (null.Time, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (null.Time, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) null.Time); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindNextUnstartedTransactionFromAddress provides a mock function with given fields: ctx, etx, fromAddress, chainID func (_m *EvmTxStore) FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], fromAddress common.Address, chainID *big.Int) error { ret := _m.Called(ctx, etx, fromAddress, chainID) @@ -649,6 +723,37 @@ func (_m *EvmTxStore) GetInProgressTxAttempts(ctx context.Context, address commo return r0, r1 } +// GetPipelineRunStats provides a mock function with given fields: ctx +func (_m *EvmTxStore) GetPipelineRunStats(ctx context.Context) (int, int, error) { + ret := _m.Called(ctx) + + var r0 int + var r1 int + var r2 error + if rf, ok := ret.Get(0).(func(context.Context) (int, int, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) int); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func(context.Context) int); ok { + r1 = rf(ctx) + } else { + r1 = ret.Get(1).(int) + } + + if rf, ok := ret.Get(2).(func(context.Context) error); ok { + r2 = rf(ctx) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // GetTxInProgress provides a mock function with given fields: ctx, fromAddress func (_m *EvmTxStore) GetTxInProgress(ctx context.Context, fromAddress common.Address) (*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { ret := _m.Called(ctx, fromAddress) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 29679ee92fb..87a2d69a3ba 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -236,11 +236,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { globalLogger.Info("DatabaseBackup: periodic database backups are disabled. To enable automatic backups, set Database.Backup.Mode=lite or Database.Backup.Mode=full") } - srvcs = append(srvcs, eventBroadcaster, mailMon) - srvcs = append(srvcs, relayerChainInterops.Services()...) - promReporter := promreporter.NewPromReporter(db.DB, globalLogger) - srvcs = append(srvcs, promReporter) - // EVM chains are used all over the place. This will need to change for fully EVM extraction // TODO: BCF-2510, BCF-2511 @@ -249,6 +244,20 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("no evm chains found") } + var ( + pipelineORM = pipeline.NewORM(db, globalLogger, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns()) + bridgeORM = bridges.NewORM(db, globalLogger, cfg.Database()) + mercuryORM = mercury.NewORM(db, globalLogger, cfg.Database()) + pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient) + jobORM = job.NewORM(db, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database()) + txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database()) + ) + + srvcs = append(srvcs, eventBroadcaster, mailMon) + srvcs = append(srvcs, relayerChainInterops.Services()...) + promReporter := promreporter.NewPromReporter(txmORM, globalLogger) + srvcs = append(srvcs, promReporter) + // Initialize Local Users ORM and Authentication Provider specified in config // BasicAdminUsersORM is initialized and required regardless of separate Authentication Provider localAdminUsersORM := localauth.NewORM(db, cfg.WebServer().SessionTimeout().Duration(), globalLogger, cfg.Database(), auditLogger) @@ -276,15 +285,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, errors.Errorf("NewApplication: Unexpected 'AuthenticationMethod': %s supported values: %s, %s", authMethod, sessions.LocalAuth, sessions.LDAPAuth) } - var ( - pipelineORM = pipeline.NewORM(db, globalLogger, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns()) - bridgeORM = bridges.NewORM(db, globalLogger, cfg.Database()) - mercuryORM = mercury.NewORM(db, globalLogger, cfg.Database()) - pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient) - jobORM = job.NewORM(db, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database()) - txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database()) - ) - for _, chain := range legacyEVMChains.Slice() { chain.HeadBroadcaster().Subscribe(promReporter) chain.TxManager().RegisterResumeCallback(pipelineRunner.ResumeRun) diff --git a/core/services/promreporter/prom_reporter.go b/core/services/promreporter/prom_reporter.go index 2306640bea6..59d5f391ae2 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/promreporter/prom_reporter.go @@ -2,7 +2,8 @@ package promreporter import ( "context" - "database/sql" + "fmt" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "math/big" "sync" "time" @@ -10,20 +11,18 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/multierr" - "gopkg.in/guregu/null.v4" - "github.com/smartcontractkit/chainlink-common/pkg/services" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" + "go.uber.org/multierr" ) //go:generate mockery --quiet --name PrometheusBackend --output ../../internal/mocks/ --case=underscore type ( promReporter struct { services.StateMachine - db *sql.DB + txStore txmgr.TxStore lggr logger.Logger backend PrometheusBackend newHeads *utils.Mailbox[*evmtypes.Head] @@ -86,7 +85,7 @@ func (defaultBackend) SetPipelineTaskRunsQueued(n int) { promPipelineRunsQueued.Set(float64(n)) } -func NewPromReporter(db *sql.DB, lggr logger.Logger, opts ...interface{}) *promReporter { +func NewPromReporter(txStore txmgr.TxStore, lggr logger.Logger, opts ...interface{}) *promReporter { var backend PrometheusBackend = defaultBackend{} period := 15 * time.Second for _, opt := range opts { @@ -100,7 +99,7 @@ func NewPromReporter(db *sql.DB, lggr logger.Logger, opts ...interface{}) *promR chStop := make(chan struct{}) return &promReporter{ - db: db, + txStore: txStore, lggr: lggr.Named("PromReporter"), backend: backend, newHeads: utils.NewSingleMailbox[*evmtypes.Head](), @@ -175,20 +174,21 @@ func (pr *promReporter) reportHeadMetrics(ctx context.Context, head *evmtypes.He } func (pr *promReporter) reportPendingEthTxes(ctx context.Context, evmChainID *big.Int) (err error) { - var unconfirmed int64 - if err := pr.db.QueryRowContext(ctx, `SELECT count(*) FROM evm.txes WHERE state = 'unconfirmed' AND evm_chain_id = $1`, evmChainID.String()).Scan(&unconfirmed); err != nil { - return errors.Wrap(err, "failed to query for unconfirmed eth_tx count") + unconfirmed, err := pr.txStore.CountAllUnconfirmedTransactions(ctx, evmChainID) + if err != nil { + return fmt.Errorf("failed to query for unconfirmed eth_tx count: %w", err) } - pr.backend.SetUnconfirmedTransactions(evmChainID, unconfirmed) + pr.backend.SetUnconfirmedTransactions(evmChainID, int64(unconfirmed)) return nil } func (pr *promReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID *big.Int) (err error) { - var broadcastAt null.Time now := time.Now() - if err := pr.db.QueryRowContext(ctx, `SELECT min(initial_broadcast_at) FROM evm.txes WHERE state = 'unconfirmed' AND evm_chain_id = $1`, evmChainID.String()).Scan(&broadcastAt); err != nil { - return errors.Wrap(err, "failed to query for unconfirmed eth_tx count") + broadcastAt, err := pr.txStore.FindMinUnconfirmedBroadcastTime(ctx, evmChainID) + if err != nil { + return fmt.Errorf("failed to query for min broadcast time: %w", err) } + var seconds float64 if broadcastAt.Valid { nanos := now.Sub(broadcastAt.ValueOrZero()) @@ -199,16 +199,11 @@ func (pr *promReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID } func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *evmtypes.Head) (err error) { - var earliestUnconfirmedTxBlock null.Int - err = pr.db.QueryRowContext(ctx, ` -SELECT MIN(broadcast_before_block_num) FROM evm.tx_attempts -JOIN evm.txes ON evm.txes.id = evm.tx_attempts.eth_tx_id -WHERE evm.txes.state = 'unconfirmed' -AND evm_chain_id = $1 -AND evm.txes.state = 'unconfirmed'`, head.EVMChainID.String()).Scan(&earliestUnconfirmedTxBlock) + earliestUnconfirmedTxBlock, err := pr.txStore.FindEarliestUnconfirmedTxBlock(ctx, head.EVMChainID.ToInt()) if err != nil { - return errors.Wrap(err, "failed to query for min broadcast_before_block_num") + return fmt.Errorf("failed to query for earliest unconfirmed tx block: %w", err) } + var blocksUnconfirmed int64 if !earliestUnconfirmedTxBlock.IsZero() { blocksUnconfirmed = head.Number - earliestUnconfirmedTxBlock.ValueOrZero() @@ -218,30 +213,10 @@ AND evm.txes.state = 'unconfirmed'`, head.EVMChainID.String()).Scan(&earliestUnc } func (pr *promReporter) reportPipelineRunStats(ctx context.Context) (err error) { - rows, err := pr.db.QueryContext(ctx, ` -SELECT pipeline_run_id FROM pipeline_task_runs WHERE finished_at IS NULL -`) + pipelineTaskRunsQueued, pipelineRunsQueued, err := pr.txStore.GetPipelineRunStats(ctx) if err != nil { - return errors.Wrap(err, "failed to query for pipeline_run_id") - } - defer func() { - err = multierr.Combine(err, rows.Close()) - }() - - pipelineTaskRunsQueued := 0 - pipelineRunsQueuedSet := make(map[int32]struct{}) - for rows.Next() { - var pipelineRunID int32 - if err = rows.Scan(&pipelineRunID); err != nil { - return errors.Wrap(err, "unexpected error scanning row") - } - pipelineTaskRunsQueued++ - pipelineRunsQueuedSet[pipelineRunID] = struct{}{} - } - if err = rows.Err(); err != nil { - return err + return fmt.Errorf("failed to query for pipeline run stats: %w", err) } - pipelineRunsQueued := len(pipelineRunsQueuedSet) pr.backend.SetPipelineTaskRunsQueued(pipelineTaskRunsQueued) pr.backend.SetPipelineRunsQueued(pipelineRunsQueued) diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/promreporter/prom_reporter_test.go index 1f15d94418d..ca58da0f81e 100644 --- a/core/services/promreporter/prom_reporter_test.go +++ b/core/services/promreporter/prom_reporter_test.go @@ -27,10 +27,12 @@ func newHead() evmtypes.Head { func Test_PromReporter_OnNewLongestChain(t *testing.T) { t.Run("with nothing in the database", func(t *testing.T) { - d := pgtest.NewSqlDB(t) + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, nil) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) backend := mocks.NewPrometheusBackend(t) - reporter := promreporter.NewPromReporter(d, logger.TestLogger(t), backend, 10*time.Millisecond) + reporter := promreporter.NewPromReporter(txStore, logger.TestLogger(t), backend, 10*time.Millisecond) var subscribeCalls atomic.Int32 @@ -74,7 +76,7 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) { subscribeCalls.Add(1) }). Return() - reporter := promreporter.NewPromReporter(db.DB, logger.TestLogger(t), backend, 10*time.Millisecond) + reporter := promreporter.NewPromReporter(txStore, logger.TestLogger(t), backend, 10*time.Millisecond) require.NoError(t, reporter.Start(testutils.Context(t))) defer func() { assert.NoError(t, reporter.Close()) }() @@ -91,11 +93,13 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) { t.Run("with unfinished pipeline task runs", func(t *testing.T) { db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, nil) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_task_runs_pipeline_run_id_fkey DEFERRED`) backend := mocks.NewPrometheusBackend(t) - reporter := promreporter.NewPromReporter(db.DB, logger.TestLogger(t), backend, 10*time.Millisecond) + reporter := promreporter.NewPromReporter(txStore, logger.TestLogger(t), backend, 10*time.Millisecond) cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1) cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1351c421340..16194c3ff67 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `PromReporter` no longer reads directly from the db, and instead uses the txStore API. - `L2Suggested` mode is now called `SuggestedPrice` ### Removed From 8de36bbb495ab0e8f0b4c53cbb77c5c748ec6f84 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 27 Nov 2023 13:27:08 -0500 Subject: [PATCH 2/8] Add test case --- core/chains/evm/txmgr/evm_tx_store_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 1a66315c2a4..abd05539bce 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1495,6 +1495,27 @@ func TestORM_CountUnconfirmedTransactions(t *testing.T) { assert.Equal(t, int(count), 3) } +func TestORM_CountAllUnconfirmedTransactions(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, nil) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + + _, fromAddress1 := cltest.MustInsertRandomKey(t, ethKeyStore) + _, fromAddress2 := cltest.MustInsertRandomKey(t, ethKeyStore) + _, fromAddress3 := cltest.MustInsertRandomKey(t, ethKeyStore) + + cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress1) + cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress2) + cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress3) + + count, err := txStore.CountAllUnconfirmedTransactions(testutils.Context(t), &cltest.FixtureChainID) + require.NoError(t, err) + assert.Equal(t, int(count), 3) +} + func TestORM_CountUnstartedTransactions(t *testing.T) { t.Parallel() From 8ae0084ba5d5e322df36c806e1fed4de0cb825d2 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Mon, 27 Nov 2023 13:45:00 -0500 Subject: [PATCH 3/8] lint --- common/txmgr/types/tx_store.go | 3 ++- core/chains/evm/txmgr/evm_tx_store.go | 3 ++- core/chains/evm/txmgr/evm_tx_store_test.go | 4 ++-- core/services/promreporter/prom_reporter.go | 6 ++++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 8cedc0c2b53..bb5e0e113d4 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -2,10 +2,11 @@ package types import ( "context" - "gopkg.in/guregu/null.v4" "math/big" "time" + "gopkg.in/guregu/null.v4" + "github.com/google/uuid" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index f6dd4a1e551..b2edd606d8c 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -6,11 +6,12 @@ import ( "encoding/json" "errors" "fmt" - "go.uber.org/multierr" "math/big" "strings" "time" + "go.uber.org/multierr" + "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/jackc/pgconn" diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 7a169d7f76d..6ba7764b508 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -865,8 +865,8 @@ func TestORM_FindEarliestUnconfirmedTxBlock(t *testing.T) { t.Run("verify earliest unconfirmed tx block", func(t *testing.T) { var blockNum int64 = 2 - tx := cltest.MustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 123, blockNum, time.Now(), fromAddress) - _ = cltest.MustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 123, blockNum, time.Now().Add(time.Minute), fromAddress2) + tx := mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 123, blockNum, time.Now(), fromAddress) + _ = mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, txStore, 123, blockNum, time.Now().Add(time.Minute), fromAddress2) err := txStore.UpdateTxsUnconfirmed(testutils.Context(t), []int64{tx.ID}) require.NoError(t, err) diff --git a/core/services/promreporter/prom_reporter.go b/core/services/promreporter/prom_reporter.go index 619bfd07486..0d18a49e840 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/promreporter/prom_reporter.go @@ -3,19 +3,21 @@ package promreporter import ( "context" "fmt" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "math/big" "sync" "time" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/multierr" + "github.com/smartcontractkit/chainlink-common/pkg/services" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" - "go.uber.org/multierr" ) //go:generate mockery --quiet --name PrometheusBackend --output ../../internal/mocks/ --case=underscore From fa180cb63af14bddef209f22b5f4270a893672b7 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 28 Nov 2023 15:43:12 -0500 Subject: [PATCH 4/8] Use Txm from LegacyChainContainer --- common/txmgr/mocks/tx_manager.go | 74 +++++++++++++ common/txmgr/txmgr.go | 29 ++++- common/txmgr/types/mocks/tx_store.go | 101 ++++++------------ common/txmgr/types/tx_store.go | 10 +- core/chains/evm/txmgr/evm_tx_store.go | 52 ++------- core/chains/evm/txmgr/evm_tx_store_test.go | 45 ++------ core/chains/evm/txmgr/mocks/evm_tx_store.go | 101 ++++++------------ core/internal/cltest/mocks.go | 14 +++ core/services/chainlink/application.go | 18 ++-- core/services/promreporter/prom_reporter.go | 68 ++++++++++-- .../promreporter/prom_reporter_test.go | 43 ++++++-- docs/CHANGELOG.md | 2 +- 12 files changed, 306 insertions(+), 251 deletions(-) diff --git a/common/txmgr/mocks/tx_manager.go b/common/txmgr/mocks/tx_manager.go index 89abf1dea51..ef33bcd2360 100644 --- a/common/txmgr/mocks/tx_manager.go +++ b/common/txmgr/mocks/tx_manager.go @@ -9,6 +9,8 @@ import ( feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" mock "github.com/stretchr/testify/mock" + null "gopkg.in/guregu/null.v4" + txmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -35,6 +37,30 @@ func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Close( return r0 } +// CountTransactionsByState provides a mock function with given fields: ctx, state, chainID +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { + ret := _m.Called(ctx, state, chainID) + + var r0 uint32 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) (uint32, error)); ok { + return rf(ctx, state, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) uint32); ok { + r0 = rf(ctx, state, chainID) + } else { + r0 = ret.Get(0).(uint32) + } + + if rf, ok := ret.Get(1).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) error); ok { + r1 = rf(ctx, state, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // CreateTransaction provides a mock function with given fields: ctx, txRequest func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, txRequest) @@ -59,6 +85,54 @@ func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Create return r0, r1 } +// FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Time, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Time); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FindEarliestUnconfirmedTxAttemptBlock provides a mock function with given fields: ctx, chainID +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Int, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Int); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Int) + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindTxesByMetaFieldAndStates provides a mock function with given fields: ctx, metaField, metaValue, states, chainID func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindTxesByMetaFieldAndStates(ctx context.Context, metaField string, metaValue string, states []txmgrtypes.TxState, chainID *big.Int) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, metaField, metaValue, states, chainID) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index b49c2b72f15..3d19a8cdd14 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -10,9 +10,9 @@ import ( "time" "github.com/google/uuid" + nullv4 "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink-common/pkg/services" - feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" @@ -55,6 +55,9 @@ type TxManager[ FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) // Find transactions loaded with transaction attempts and receipts by transaction IDs and states FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (nullv4.Time, error) + FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (nullv4.Int, error) + CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (count uint32, err error) } type reset struct { @@ -558,6 +561,18 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWi return } +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (nullv4.Time, error) { + return b.txStore.FindEarliestUnconfirmedBroadcastTime(ctx, chainID) +} + +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (nullv4.Int, error) { + return b.txStore.FindEarliestUnconfirmedTxAttemptBlock(ctx, chainID) +} + +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (count uint32, err error) { + return b.txStore.CountTransactionsByState(ctx, state, chainID) +} + type NullTxManager[ CHAIN_ID types.ID, HEAD types.Head[BLOCK_HASH], @@ -624,3 +639,15 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { return txes, errors.New(n.ErrMsg) } + +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (nullv4.Time, error) { + return nullv4.Time{}, errors.New(n.ErrMsg) +} + +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (nullv4.Int, error) { + return nullv4.Int{}, errors.New(n.ErrMsg) +} + +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (count uint32, err error) { + return count, errors.New(n.ErrMsg) +} diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 49386f6762b..8bb181c7c62 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -58,23 +58,23 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() { _m.Called() } -// CountAllUnconfirmedTransactions provides a mock function with given fields: ctx, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountAllUnconfirmedTransactions(ctx context.Context, chainID CHAIN_ID) (uint32, error) { - ret := _m.Called(ctx, chainID) +// CountTransactionsByState provides a mock function with given fields: ctx, state, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { + ret := _m.Called(ctx, state, chainID) var r0 uint32 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (uint32, error)); ok { - return rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) (uint32, error)); ok { + return rf(ctx, state, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) uint32); ok { - r0 = rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) uint32); ok { + r0 = rf(ctx, state, chainID) } else { r0 = ret.Get(0).(uint32) } - if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { - r1 = rf(ctx, chainID) + if rf, ok := ret.Get(1).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) error); ok { + r1 = rf(ctx, state, chainID) } else { r1 = ret.Error(1) } @@ -168,8 +168,32 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInPro return r0 } -// FindEarliestUnconfirmedTxBlock provides a mock function with given fields: ctx, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { +// FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Time, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Time); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FindEarliestUnconfirmedTxAttemptBlock provides a mock function with given fields: ctx, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { ret := _m.Called(ctx, chainID) var r0 null.Int @@ -216,30 +240,6 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindLatestS return r0, r1 } -// FindMinUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindMinUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { - ret := _m.Called(ctx, chainID) - - var r0 null.Time - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Time, error)); ok { - return rf(ctx, chainID) - } - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Time); ok { - r0 = rf(ctx, chainID) - } else { - r0 = ret.Get(0).(null.Time) - } - - if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { - r1 = rf(ctx, chainID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // FindNextUnstartedTransactionFromAddress provides a mock function with given fields: ctx, etx, fromAddress, chainID func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error { ret := _m.Called(ctx, etx, fromAddress, chainID) @@ -618,37 +618,6 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgre return r0, r1 } -// GetPipelineRunStats provides a mock function with given fields: ctx -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetPipelineRunStats(ctx context.Context) (int, int, error) { - ret := _m.Called(ctx) - - var r0 int - var r1 int - var r2 error - if rf, ok := ret.Get(0).(func(context.Context) (int, int, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) int); ok { - r0 = rf(ctx) - } else { - r0 = ret.Get(0).(int) - } - - if rf, ok := ret.Get(1).(func(context.Context) int); ok { - r1 = rf(ctx) - } else { - r1 = ret.Get(1).(int) - } - - if rf, ok := ret.Get(2).(func(context.Context) error); ok { - r2 = rf(ctx) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - // GetTxInProgress provides a mock function with given fields: ctx, fromAddress func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(ctx, fromAddress) diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index bb5e0e113d4..ced4c96e89b 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -5,9 +5,8 @@ import ( "math/big" "time" - "gopkg.in/guregu/null.v4" - "github.com/google/uuid" + "gopkg.in/guregu/null.v4" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/common/types" @@ -66,7 +65,7 @@ type TransactionStore[ FEE feetypes.Fee, ] interface { CountUnconfirmedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (count uint32, err error) - CountAllUnconfirmedTransactions(ctx context.Context, chainID CHAIN_ID) (count uint32, err error) + CountTransactionsByState(ctx context.Context, state TxState, chainID CHAIN_ID) (count uint32, err error) CountUnstartedTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (count uint32, err error) CreateTransaction(ctx context.Context, txRequest TxRequest[ADDR, TX_HASH], chainID CHAIN_ID) (tx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) DeleteInProgressAttempt(ctx context.Context, attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error @@ -82,9 +81,8 @@ type TransactionStore[ FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) - FindMinUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) - FindEarliestUnconfirmedTxBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) - GetPipelineRunStats(ctx context.Context) (taskRunsQueued int, runsQueued int, err error) + FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) + FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) HasInProgressTransaction(ctx context.Context, account ADDR, chainID CHAIN_ID) (exists bool, err error) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index b2edd606d8c..b610ea14014 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -10,8 +10,6 @@ import ( "strings" "time" - "go.uber.org/multierr" - "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/jackc/pgconn" @@ -1109,7 +1107,7 @@ ORDER BY nonce ASC return etxs, pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed") } -func (o *evmTxStore) FindMinUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (broadcastAt nullv4.Time, err error) { +func (o *evmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (broadcastAt nullv4.Time, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1123,7 +1121,7 @@ func (o *evmTxStore) FindMinUnconfirmedBroadcastTime(ctx context.Context, chainI return broadcastAt, err } -func (o *evmTxStore) FindEarliestUnconfirmedTxBlock(ctx context.Context, chainID *big.Int) (earliestUnconfirmedTxBlock nullv4.Int, err error) { +func (o *evmTxStore) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID *big.Int) (earliestUnconfirmedTxBlock nullv4.Int, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1142,44 +1140,6 @@ AND evm_chain_id = $1`, chainID.String()).Scan(&earliestUnconfirmedTxBlock) return earliestUnconfirmedTxBlock, err } -func (o *evmTxStore) GetPipelineRunStats(ctx context.Context) (taskRunsQueued int, runsQueued int, err error) { - var cancel context.CancelFunc - ctx, cancel = o.mergeContexts(ctx) - defer cancel() - qq := o.q.WithOpts(pg.WithParentCtx(ctx)) - var rows *sql.Rows - err = qq.Transaction(func(tx pg.Queryer) error { - rows, err = qq.QueryContext(ctx, `SELECT pipeline_run_id FROM pipeline_task_runs WHERE finished_at IS NULL`) - if err != nil { - return fmt.Errorf("failed to query for pipeline_run_id: %w", err) - } - if rows.Err() != nil { - return fmt.Errorf("failed to query for pipeline_run_id: %w", rows.Err()) - } - return nil - }, pg.OptReadOnlyTx()) - - defer func() { - err = multierr.Combine(err, rows.Close()) - }() - - taskRunsQueued = 0 - pipelineRunsQueuedSet := make(map[int32]struct{}) - for rows.Next() { - var pipelineRunID int32 - if err = rows.Scan(&pipelineRunID); err != nil { - return 0, 0, fmt.Errorf("unexpected error scanning row: %w", err) - } - taskRunsQueued++ - pipelineRunsQueuedSet[pipelineRunID] = struct{}{} - } - if err = rows.Err(); err != nil { - return 0, 0, err - } - runsQueued = len(pipelineRunsQueuedSet) - return taskRunsQueued, runsQueued, err -} - func saveAttemptWithNewState(ctx context.Context, q pg.Queryer, logger logger.Logger, attempt TxAttempt, broadcastAt time.Time) error { var dbAttempt DbEthTxAttempt dbAttempt.FromTxAttempt(&attempt) @@ -1712,16 +1672,16 @@ func (o *evmTxStore) CountUnconfirmedTransactions(ctx context.Context, fromAddre return o.countTransactionsWithState(ctx, fromAddress, txmgr.TxUnconfirmed, chainID) } -// CountAllUnconfirmedTransactions returns the number of unconfirmed transactions for all fromAddresses -func (o *evmTxStore) CountAllUnconfirmedTransactions(ctx context.Context, chainID *big.Int) (count uint32, err error) { +// CountTransactionsByState returns the number of transactions with any fromAddress in the given state +func (o *evmTxStore) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID *big.Int) (count uint32, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() qq := o.q.WithOpts(pg.WithParentCtx(ctx)) err = qq.Get(&count, `SELECT count(*) FROM evm.txes WHERE state = $1 AND evm_chain_id = $2`, - txmgr.TxUnconfirmed, chainID.String()) + state, chainID.String()) if err != nil { - return 0, fmt.Errorf("failed to CountAllUnconfirmedTransactions: %w", err) + return 0, fmt.Errorf("failed to CountTransactionsByState: %w", err) } return count, nil } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 6ba7764b508..034614da50d 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -822,7 +822,7 @@ func TestORM_FindTransactionsConfirmedInBlockRange(t *testing.T) { }) } -func TestORM_FindMinUnconfirmedBroadcastTime(t *testing.T) { +func TestORM_FindEarliestUnconfirmedBroadcastTime(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) @@ -833,20 +833,20 @@ func TestORM_FindMinUnconfirmedBroadcastTime(t *testing.T) { _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) t.Run("no unconfirmed eth txes", func(t *testing.T) { - broadcastAt, err := txStore.FindMinUnconfirmedBroadcastTime(testutils.Context(t), ethClient.ConfiguredChainID()) + broadcastAt, err := txStore.FindEarliestUnconfirmedBroadcastTime(testutils.Context(t), ethClient.ConfiguredChainID()) require.NoError(t, err) require.False(t, broadcastAt.Valid) }) t.Run("verify broadcast time", func(t *testing.T) { tx := cltest.MustInsertUnconfirmedEthTx(t, txStore, 123, fromAddress) - broadcastAt, err := txStore.FindMinUnconfirmedBroadcastTime(testutils.Context(t), ethClient.ConfiguredChainID()) + broadcastAt, err := txStore.FindEarliestUnconfirmedBroadcastTime(testutils.Context(t), ethClient.ConfiguredChainID()) require.NoError(t, err) require.True(t, broadcastAt.Ptr().Equal(*tx.BroadcastAt)) }) } -func TestORM_FindEarliestUnconfirmedTxBlock(t *testing.T) { +func TestORM_FindEarliestUnconfirmedTxAttemptBlock(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) @@ -858,7 +858,7 @@ func TestORM_FindEarliestUnconfirmedTxBlock(t *testing.T) { _, fromAddress2 := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) t.Run("no earliest unconfirmed tx block", func(t *testing.T) { - earliestBlock, err := txStore.FindEarliestUnconfirmedTxBlock(testutils.Context(t), ethClient.ConfiguredChainID()) + earliestBlock, err := txStore.FindEarliestUnconfirmedTxAttemptBlock(testutils.Context(t), ethClient.ConfiguredChainID()) require.NoError(t, err) require.False(t, earliestBlock.Valid) }) @@ -870,42 +870,13 @@ func TestORM_FindEarliestUnconfirmedTxBlock(t *testing.T) { err := txStore.UpdateTxsUnconfirmed(testutils.Context(t), []int64{tx.ID}) require.NoError(t, err) - earliestBlock, err := txStore.FindEarliestUnconfirmedTxBlock(testutils.Context(t), ethClient.ConfiguredChainID()) + earliestBlock, err := txStore.FindEarliestUnconfirmedTxAttemptBlock(testutils.Context(t), ethClient.ConfiguredChainID()) require.NoError(t, err) require.True(t, earliestBlock.Valid) require.Equal(t, blockNum, earliestBlock.Int64) }) } -func TestORM_GetPipelineRunStats(t *testing.T) { - t.Parallel() - - db := pgtest.NewSqlxDB(t) - cfg := newTestChainScopedConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - - t.Run("no pipeline run tasks", func(t *testing.T) { - taskRunsQueued, runsQueued, err := txStore.GetPipelineRunStats(testutils.Context(t)) - require.NoError(t, err) - require.Equal(t, 0, taskRunsQueued) - require.Equal(t, 0, runsQueued) - }) - - t.Run("queued pipeline run tasks", func(t *testing.T) { - pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_task_runs_pipeline_run_id_fkey DEFERRED`) - - numRuns := int64(5) - for id := int64(0); id < numRuns; id++ { - cltest.MustInsertUnfinishedPipelineTaskRun(t, db, id) - } - - taskRunsQueued, runsQueued, err := txStore.GetPipelineRunStats(testutils.Context(t)) - require.NoError(t, err) - require.Equal(t, int(numRuns), taskRunsQueued) - require.Equal(t, int(numRuns), runsQueued) - }) -} - func TestORM_SaveInsufficientEthAttempt(t *testing.T) { t.Parallel() @@ -1495,7 +1466,7 @@ func TestORM_CountUnconfirmedTransactions(t *testing.T) { assert.Equal(t, int(count), 3) } -func TestORM_CountAllUnconfirmedTransactions(t *testing.T) { +func TestORM_CountTransactionsByState(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) @@ -1511,7 +1482,7 @@ func TestORM_CountAllUnconfirmedTransactions(t *testing.T) { cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 1, fromAddress2) cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 2, fromAddress3) - count, err := txStore.CountAllUnconfirmedTransactions(testutils.Context(t), &cltest.FixtureChainID) + count, err := txStore.CountTransactionsByState(testutils.Context(t), txmgrcommon.TxUnconfirmed, &cltest.FixtureChainID) require.NoError(t, err) assert.Equal(t, int(count), 3) } diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index 66419caadd8..c57efef24aa 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -61,23 +61,23 @@ func (_m *EvmTxStore) Close() { _m.Called() } -// CountAllUnconfirmedTransactions provides a mock function with given fields: ctx, chainID -func (_m *EvmTxStore) CountAllUnconfirmedTransactions(ctx context.Context, chainID *big.Int) (uint32, error) { - ret := _m.Called(ctx, chainID) +// CountTransactionsByState provides a mock function with given fields: ctx, state, chainID +func (_m *EvmTxStore) CountTransactionsByState(ctx context.Context, state types.TxState, chainID *big.Int) (uint32, error) { + ret := _m.Called(ctx, state, chainID) var r0 uint32 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (uint32, error)); ok { - return rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context, types.TxState, *big.Int) (uint32, error)); ok { + return rf(ctx, state, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) uint32); ok { - r0 = rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context, types.TxState, *big.Int) uint32); ok { + r0 = rf(ctx, state, chainID) } else { r0 = ret.Get(0).(uint32) } - if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { - r1 = rf(ctx, chainID) + if rf, ok := ret.Get(1).(func(context.Context, types.TxState, *big.Int) error); ok { + r1 = rf(ctx, state, chainID) } else { r1 = ret.Error(1) } @@ -171,8 +171,32 @@ func (_m *EvmTxStore) DeleteInProgressAttempt(ctx context.Context, attempt types return r0 } -// FindEarliestUnconfirmedTxBlock provides a mock function with given fields: ctx, chainID -func (_m *EvmTxStore) FindEarliestUnconfirmedTxBlock(ctx context.Context, chainID *big.Int) (null.Int, error) { +// FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (null.Time, error) { + ret := _m.Called(ctx, chainID) + + var r0 null.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (null.Time, error)); ok { + return rf(ctx, chainID) + } + if rf, ok := ret.Get(0).(func(context.Context, *big.Int) null.Time); ok { + r0 = rf(ctx, chainID) + } else { + r0 = ret.Get(0).(null.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { + r1 = rf(ctx, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FindEarliestUnconfirmedTxAttemptBlock provides a mock function with given fields: ctx, chainID +func (_m *EvmTxStore) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID *big.Int) (null.Int, error) { ret := _m.Called(ctx, chainID) var r0 null.Int @@ -219,30 +243,6 @@ func (_m *EvmTxStore) FindLatestSequence(ctx context.Context, fromAddress common return r0, r1 } -// FindMinUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID -func (_m *EvmTxStore) FindMinUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (null.Time, error) { - ret := _m.Called(ctx, chainID) - - var r0 null.Time - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) (null.Time, error)); ok { - return rf(ctx, chainID) - } - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) null.Time); ok { - r0 = rf(ctx, chainID) - } else { - r0 = ret.Get(0).(null.Time) - } - - if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { - r1 = rf(ctx, chainID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // FindNextUnstartedTransactionFromAddress provides a mock function with given fields: ctx, etx, fromAddress, chainID func (_m *EvmTxStore) FindNextUnstartedTransactionFromAddress(ctx context.Context, etx *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], fromAddress common.Address, chainID *big.Int) error { ret := _m.Called(ctx, etx, fromAddress, chainID) @@ -723,37 +723,6 @@ func (_m *EvmTxStore) GetInProgressTxAttempts(ctx context.Context, address commo return r0, r1 } -// GetPipelineRunStats provides a mock function with given fields: ctx -func (_m *EvmTxStore) GetPipelineRunStats(ctx context.Context) (int, int, error) { - ret := _m.Called(ctx) - - var r0 int - var r1 int - var r2 error - if rf, ok := ret.Get(0).(func(context.Context) (int, int, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) int); ok { - r0 = rf(ctx) - } else { - r0 = ret.Get(0).(int) - } - - if rf, ok := ret.Get(1).(func(context.Context) int); ok { - r1 = rf(ctx) - } else { - r1 = ret.Get(1).(int) - } - - if rf, ok := ret.Get(2).(func(context.Context) error); ok { - r2 = rf(ctx) - } else { - r2 = ret.Error(2) - } - - return r0, r1, r2 -} - // GetTxInProgress provides a mock function with given fields: ctx, fromAddress func (_m *EvmTxStore) GetTxInProgress(ctx context.Context, fromAddress common.Address) (*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { ret := _m.Called(ctx, fromAddress) diff --git a/core/internal/cltest/mocks.go b/core/internal/cltest/mocks.go index 540924d7f02..087cc707830 100644 --- a/core/internal/cltest/mocks.go +++ b/core/internal/cltest/mocks.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + "github.com/jmoiron/sqlx" "github.com/smartcontractkit/chainlink/v2/core/chains/evm" @@ -409,6 +411,18 @@ func NewLegacyChainsWithMockChain(t testing.TB, ethClient evmclient.Client, cfg } +func NewLegacyChainsWithMockChainAndTxManager(t testing.TB, ethClient evmclient.Client, cfg evm.AppConfig, txm txmgr.TxManager) evm.LegacyChainContainer { + ch := new(evmmocks.Chain) + ch.On("Client").Return(ethClient) + ch.On("Logger").Return(logger.TestLogger(t)) + scopedCfg := evmtest.NewChainScopedConfig(t, cfg) + ch.On("ID").Return(scopedCfg.EVM().ChainID()) + ch.On("Config").Return(scopedCfg) + ch.On("TxManager").Return(txm) + + return NewLegacyChainsWithChain(ch, cfg) +} + func NewLegacyChainsWithChain(ch evm.Chain, cfg evm.AppConfig) evm.LegacyChainContainer { m := map[string]evm.Chain{ch.ID().String(): ch} return evm.NewLegacyChains(m, cfg.EVMConfigs()) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 87a2d69a3ba..a17f1c81e8e 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -244,6 +244,15 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("no evm chains found") } + srvcs = append(srvcs, eventBroadcaster, mailMon) + srvcs = append(srvcs, relayerChainInterops.Services()...) + promReporter := promreporter.NewPromReporter(db.DB, legacyEVMChains, globalLogger) + srvcs = append(srvcs, promReporter) + + // Initialize Local Users ORM and Authentication Provider specified in config + // BasicAdminUsersORM is initialized and required regardless of separate Authentication Provider + localAdminUsersORM := localauth.NewORM(db, cfg.WebServer().SessionTimeout().Duration(), globalLogger, cfg.Database(), auditLogger) + var ( pipelineORM = pipeline.NewORM(db, globalLogger, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns()) bridgeORM = bridges.NewORM(db, globalLogger, cfg.Database()) @@ -253,15 +262,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database()) ) - srvcs = append(srvcs, eventBroadcaster, mailMon) - srvcs = append(srvcs, relayerChainInterops.Services()...) - promReporter := promreporter.NewPromReporter(txmORM, globalLogger) - srvcs = append(srvcs, promReporter) - - // Initialize Local Users ORM and Authentication Provider specified in config - // BasicAdminUsersORM is initialized and required regardless of separate Authentication Provider - localAdminUsersORM := localauth.NewORM(db, cfg.WebServer().SessionTimeout().Duration(), globalLogger, cfg.Database(), auditLogger) - // Initialize Sessions ORM based on environment configured authenticator // localDB auth or remote LDAP auth authMethod := cfg.WebServer().AuthenticationMethod() diff --git a/core/services/promreporter/prom_reporter.go b/core/services/promreporter/prom_reporter.go index 0d18a49e840..64b80369557 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/promreporter/prom_reporter.go @@ -2,11 +2,14 @@ package promreporter import ( "context" + "database/sql" "fmt" "math/big" "sync" "time" + txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "github.com/pkg/errors" @@ -24,7 +27,8 @@ import ( type ( promReporter struct { services.StateMachine - txStore txmgr.TxStore + db *sql.DB + chains evm.LegacyChainContainer lggr logger.Logger backend PrometheusBackend newHeads *utils.Mailbox[*evmtypes.Head] @@ -87,7 +91,7 @@ func (defaultBackend) SetPipelineTaskRunsQueued(n int) { promPipelineRunsQueued.Set(float64(n)) } -func NewPromReporter(txStore txmgr.TxStore, lggr logger.Logger, opts ...interface{}) *promReporter { +func NewPromReporter(db *sql.DB, chainContainer evm.LegacyChainContainer, lggr logger.Logger, opts ...interface{}) *promReporter { var backend PrometheusBackend = defaultBackend{} period := 15 * time.Second for _, opt := range opts { @@ -101,7 +105,8 @@ func NewPromReporter(txStore txmgr.TxStore, lggr logger.Logger, opts ...interfac chStop := make(chan struct{}) return &promReporter{ - txStore: txStore, + db: db, + chains: chainContainer, lggr: lggr.Named("PromReporter"), backend: backend, newHeads: utils.NewSingleMailbox[*evmtypes.Head](), @@ -162,6 +167,14 @@ func (pr *promReporter) eventLoop() { } } +func (pr *promReporter) getTxm(evmChainID *big.Int) (txmgr.TxManager, error) { + chain, err := pr.chains.Get(evmChainID.String()) + if err != nil { + return nil, fmt.Errorf("failed to get chain: %w", err) + } + return chain.TxManager(), nil +} + func (pr *promReporter) reportHeadMetrics(ctx context.Context, head *evmtypes.Head) { evmChainID := head.EVMChainID.ToInt() err := multierr.Combine( @@ -176,7 +189,12 @@ func (pr *promReporter) reportHeadMetrics(ctx context.Context, head *evmtypes.He } func (pr *promReporter) reportPendingEthTxes(ctx context.Context, evmChainID *big.Int) (err error) { - unconfirmed, err := pr.txStore.CountAllUnconfirmedTransactions(ctx, evmChainID) + txm, err := pr.getTxm(evmChainID) + if err != nil { + return fmt.Errorf("failed to get txm: %w", err) + } + + unconfirmed, err := txm.CountTransactionsByState(ctx, txmgrcommon.TxUnconfirmed, evmChainID) if err != nil { return fmt.Errorf("failed to query for unconfirmed eth_tx count: %w", err) } @@ -185,23 +203,31 @@ func (pr *promReporter) reportPendingEthTxes(ctx context.Context, evmChainID *bi } func (pr *promReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID *big.Int) (err error) { - now := time.Now() - broadcastAt, err := pr.txStore.FindMinUnconfirmedBroadcastTime(ctx, evmChainID) + txm, err := pr.getTxm(evmChainID) + if err != nil { + return fmt.Errorf("failed to get txm: %w", err) + } + + broadcastAt, err := txm.FindEarliestUnconfirmedBroadcastTime(ctx, evmChainID) if err != nil { return fmt.Errorf("failed to query for min broadcast time: %w", err) } var seconds float64 if broadcastAt.Valid { - nanos := now.Sub(broadcastAt.ValueOrZero()) - seconds = float64(nanos) / 1000000000 + seconds = time.Since(broadcastAt.ValueOrZero()).Seconds() } pr.backend.SetMaxUnconfirmedAge(evmChainID, seconds) return nil } func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *evmtypes.Head) (err error) { - earliestUnconfirmedTxBlock, err := pr.txStore.FindEarliestUnconfirmedTxBlock(ctx, head.EVMChainID.ToInt()) + txm, err := pr.getTxm(head.EVMChainID.ToInt()) + if err != nil { + return fmt.Errorf("failed to get txm: %w", err) + } + + earliestUnconfirmedTxBlock, err := txm.FindEarliestUnconfirmedTxAttemptBlock(ctx, head.EVMChainID.ToInt()) if err != nil { return fmt.Errorf("failed to query for earliest unconfirmed tx block: %w", err) } @@ -215,10 +241,30 @@ func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *ev } func (pr *promReporter) reportPipelineRunStats(ctx context.Context) (err error) { - pipelineTaskRunsQueued, pipelineRunsQueued, err := pr.txStore.GetPipelineRunStats(ctx) + rows, err := pr.db.QueryContext(ctx, ` +SELECT pipeline_run_id FROM pipeline_task_runs WHERE finished_at IS NULL +`) if err != nil { - return fmt.Errorf("failed to query for pipeline run stats: %w", err) + return errors.Wrap(err, "failed to query for pipeline_run_id") + } + defer func() { + err = multierr.Combine(err, rows.Close()) + }() + + pipelineTaskRunsQueued := 0 + pipelineRunsQueuedSet := make(map[int32]struct{}) + for rows.Next() { + var pipelineRunID int32 + if err = rows.Scan(&pipelineRunID); err != nil { + return errors.Wrap(err, "unexpected error scanning row") + } + pipelineTaskRunsQueued++ + pipelineRunsQueuedSet[pipelineRunID] = struct{}{} + } + if err = rows.Err(); err != nil { + return err } + pipelineRunsQueued := len(pipelineRunsQueuedSet) pr.backend.SetPipelineTaskRunsQueued(pipelineTaskRunsQueued) pr.backend.SetPipelineRunsQueued(pipelineRunsQueued) diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/promreporter/prom_reporter_test.go index ca58da0f81e..7e9aa4756b5 100644 --- a/core/services/promreporter/prom_reporter_test.go +++ b/core/services/promreporter/prom_reporter_test.go @@ -6,15 +6,21 @@ import ( "testing" "time" + "github.com/jmoiron/sqlx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/promreporter" @@ -25,14 +31,38 @@ func newHead() evmtypes.Head { return evmtypes.Head{Number: 42, EVMChainID: utils.NewBigI(0)} } +func newLegacyChainContainer(t *testing.T, db *sqlx.DB) evm.LegacyChainContainer { + config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t) + keyStore := cltest.NewKeyStore(t, db, dbConfig).Eth() + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + estimator := gas.NewEstimator(logger.TestLogger(t), ethClient, config, evmConfig.GasEstimator()) + lggr := logger.TestLogger(t) + lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr, pgtest.NewQConfig(true)), ethClient, lggr, 100*time.Millisecond, false, 2, 3, 2, 1000) + + txm, err := txmgr.NewTxm( + db, + evmConfig, + evmConfig.GasEstimator(), + evmConfig.Transactions(), + dbConfig, + dbConfig.Listener(), + ethClient, + lggr, + lp, + keyStore, + estimator) + require.NoError(t, err) + + cfg := configtest.NewGeneralConfig(t, nil) + return cltest.NewLegacyChainsWithMockChainAndTxManager(t, ethClient, cfg, txm) +} + func Test_PromReporter_OnNewLongestChain(t *testing.T) { t.Run("with nothing in the database", func(t *testing.T) { db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) backend := mocks.NewPrometheusBackend(t) - reporter := promreporter.NewPromReporter(txStore, logger.TestLogger(t), backend, 10*time.Millisecond) + reporter := promreporter.NewPromReporter(db.DB, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond) var subscribeCalls atomic.Int32 @@ -76,7 +106,7 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) { subscribeCalls.Add(1) }). Return() - reporter := promreporter.NewPromReporter(txStore, logger.TestLogger(t), backend, 10*time.Millisecond) + reporter := promreporter.NewPromReporter(db.DB, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond) require.NoError(t, reporter.Start(testutils.Context(t))) defer func() { assert.NoError(t, reporter.Close()) }() @@ -93,13 +123,10 @@ func Test_PromReporter_OnNewLongestChain(t *testing.T) { t.Run("with unfinished pipeline task runs", func(t *testing.T) { db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_task_runs_pipeline_run_id_fkey DEFERRED`) backend := mocks.NewPrometheusBackend(t) - reporter := promreporter.NewPromReporter(txStore, logger.TestLogger(t), backend, 10*time.Millisecond) + reporter := promreporter.NewPromReporter(db.DB, newLegacyChainContainer(t, db), logger.TestLogger(t), backend, 10*time.Millisecond) cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1) cltest.MustInsertUnfinishedPipelineTaskRun(t, db, 1) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 8511d1b1e3b..d19354acebf 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -22,7 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- `PromReporter` no longer reads directly from the db, and instead uses the txStore API. +- `PromReporter` no longer directly reads txm related status from the db, and instead uses the txStore API. - `L2Suggested` mode is now called `SuggestedPrice` ### Removed From d0a3aca3ec93b941edaae5019666e9202ef7abb4 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 28 Nov 2023 15:52:59 -0500 Subject: [PATCH 5/8] Use legacyevm --- core/internal/cltest/mocks.go | 2 +- core/services/chainlink/application.go | 5 ----- core/services/promreporter/prom_reporter.go | 6 +++--- core/services/promreporter/prom_reporter_test.go | 4 ++-- 4 files changed, 6 insertions(+), 11 deletions(-) diff --git a/core/internal/cltest/mocks.go b/core/internal/cltest/mocks.go index 4db80b72d75..073b3ba246c 100644 --- a/core/internal/cltest/mocks.go +++ b/core/internal/cltest/mocks.go @@ -411,7 +411,7 @@ func NewLegacyChainsWithMockChain(t testing.TB, ethClient evmclient.Client, cfg } -func NewLegacyChainsWithMockChainAndTxManager(t testing.TB, ethClient evmclient.Client, cfg evm.AppConfig, txm txmgr.TxManager) legacyevm.LegacyChainContainer { +func NewLegacyChainsWithMockChainAndTxManager(t testing.TB, ethClient evmclient.Client, cfg legacyevm.AppConfig, txm txmgr.TxManager) legacyevm.LegacyChainContainer { ch := new(evmmocks.Chain) ch.On("Client").Return(ethClient) ch.On("Logger").Return(logger.TestLogger(t)) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 7317cab719a..efa5bd28c45 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -238,11 +238,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { globalLogger.Info("DatabaseBackup: periodic database backups are disabled. To enable automatic backups, set Database.Backup.Mode=lite or Database.Backup.Mode=full") } - srvcs = append(srvcs, eventBroadcaster, mailMon) - srvcs = append(srvcs, relayerChainInterops.Services()...) - promReporter := promreporter.NewPromReporter(db.DB, globalLogger) - srvcs = append(srvcs, promReporter) - // pool must be started before all relayers and stopped after them srvcs = append(srvcs, opts.MercuryPool) diff --git a/core/services/promreporter/prom_reporter.go b/core/services/promreporter/prom_reporter.go index 64b80369557..31b28643286 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/promreporter/prom_reporter.go @@ -4,12 +4,12 @@ import ( "context" "database/sql" "fmt" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "math/big" "sync" "time" txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" "github.com/pkg/errors" @@ -28,7 +28,7 @@ type ( promReporter struct { services.StateMachine db *sql.DB - chains evm.LegacyChainContainer + chains legacyevm.LegacyChainContainer lggr logger.Logger backend PrometheusBackend newHeads *utils.Mailbox[*evmtypes.Head] @@ -91,7 +91,7 @@ func (defaultBackend) SetPipelineTaskRunsQueued(n int) { promPipelineRunsQueued.Set(float64(n)) } -func NewPromReporter(db *sql.DB, chainContainer evm.LegacyChainContainer, lggr logger.Logger, opts ...interface{}) *promReporter { +func NewPromReporter(db *sql.DB, chainContainer legacyevm.LegacyChainContainer, lggr logger.Logger, opts ...interface{}) *promReporter { var backend PrometheusBackend = defaultBackend{} period := 15 * time.Second for _, opt := range opts { diff --git a/core/services/promreporter/prom_reporter_test.go b/core/services/promreporter/prom_reporter_test.go index 7e9aa4756b5..54e3a5d3fab 100644 --- a/core/services/promreporter/prom_reporter_test.go +++ b/core/services/promreporter/prom_reporter_test.go @@ -11,11 +11,11 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -31,7 +31,7 @@ func newHead() evmtypes.Head { return evmtypes.Head{Number: 42, EVMChainID: utils.NewBigI(0)} } -func newLegacyChainContainer(t *testing.T, db *sqlx.DB) evm.LegacyChainContainer { +func newLegacyChainContainer(t *testing.T, db *sqlx.DB) legacyevm.LegacyChainContainer { config, dbConfig, evmConfig := txmgr.MakeTestConfigs(t) keyStore := cltest.NewKeyStore(t, db, dbConfig).Eth() ethClient := evmtest.NewEthClientMockWithDefaultChain(t) From 3cef248a9aec3d4a95416e6609d14f2d025454bd Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Tue, 28 Nov 2023 16:38:05 -0500 Subject: [PATCH 6/8] lint --- core/services/promreporter/prom_reporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/promreporter/prom_reporter.go b/core/services/promreporter/prom_reporter.go index 31b28643286..832f3ba9a2b 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/promreporter/prom_reporter.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "math/big" "sync" "time" @@ -19,6 +18,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/utils" ) From 3a2bf6174f64acf2ff5c4720bd6bb34a63985c3f Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 29 Nov 2023 10:24:55 -0500 Subject: [PATCH 7/8] Use TXM chainID --- common/txmgr/txmgr.go | 24 ++++++++++----------- core/services/chainlink/application.go | 18 ++++++++-------- core/services/promreporter/prom_reporter.go | 6 +++--- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 5e6a9f8397c..503758f40da 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -55,9 +55,9 @@ type TxManager[ FindTxesWithMetaFieldByReceiptBlockNum(ctx context.Context, metaField string, blockNum int64, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) // Find transactions loaded with transaction attempts and receipts by transaction IDs and states FindTxesWithAttemptsAndReceiptsByIdsAndState(ctx context.Context, ids []big.Int, states []txmgrtypes.TxState, chainID *big.Int) (txes []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) - FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (nullv4.Time, error) - FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (nullv4.Int, error) - CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (count uint32, err error) + FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error) + FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error) + CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) } type reset struct { @@ -561,16 +561,16 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesWi return } -func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (nullv4.Time, error) { - return b.txStore.FindEarliestUnconfirmedBroadcastTime(ctx, chainID) +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error) { + return b.txStore.FindEarliestUnconfirmedBroadcastTime(ctx, b.chainID) } -func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (nullv4.Int, error) { - return b.txStore.FindEarliestUnconfirmedTxAttemptBlock(ctx, chainID) +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error) { + return b.txStore.FindEarliestUnconfirmedTxAttemptBlock(ctx, b.chainID) } -func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (count uint32, err error) { - return b.txStore.CountTransactionsByState(ctx, state, chainID) +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) { + return b.txStore.CountTransactionsByState(ctx, state, b.chainID) } type NullTxManager[ @@ -640,14 +640,14 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin return txes, errors.New(n.ErrMsg) } -func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (nullv4.Time, error) { +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (nullv4.Time, error) { return nullv4.Time{}, errors.New(n.ErrMsg) } -func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (nullv4.Int, error) { +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (nullv4.Int, error) { return nullv4.Int{}, errors.New(n.ErrMsg) } -func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (count uint32, err error) { +func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) { return count, errors.New(n.ErrMsg) } diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index efa5bd28c45..c3efb132914 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -258,15 +258,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { // BasicAdminUsersORM is initialized and required regardless of separate Authentication Provider localAdminUsersORM := localauth.NewORM(db, cfg.WebServer().SessionTimeout().Duration(), globalLogger, cfg.Database(), auditLogger) - var ( - pipelineORM = pipeline.NewORM(db, globalLogger, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns()) - bridgeORM = bridges.NewORM(db, globalLogger, cfg.Database()) - mercuryORM = mercury.NewORM(db, globalLogger, cfg.Database()) - pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient) - jobORM = job.NewORM(db, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database()) - txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database()) - ) - // Initialize Sessions ORM based on environment configured authenticator // localDB auth or remote LDAP auth authMethod := cfg.WebServer().AuthenticationMethod() @@ -290,6 +281,15 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, errors.Errorf("NewApplication: Unexpected 'AuthenticationMethod': %s supported values: %s, %s", authMethod, sessions.LocalAuth, sessions.LDAPAuth) } + var ( + pipelineORM = pipeline.NewORM(db, globalLogger, cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns()) + bridgeORM = bridges.NewORM(db, globalLogger, cfg.Database()) + mercuryORM = mercury.NewORM(db, globalLogger, cfg.Database()) + pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient) + jobORM = job.NewORM(db, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database()) + txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database()) + ) + for _, chain := range legacyEVMChains.Slice() { chain.HeadBroadcaster().Subscribe(promReporter) chain.TxManager().RegisterResumeCallback(pipelineRunner.ResumeRun) diff --git a/core/services/promreporter/prom_reporter.go b/core/services/promreporter/prom_reporter.go index 832f3ba9a2b..3e1444a6da1 100644 --- a/core/services/promreporter/prom_reporter.go +++ b/core/services/promreporter/prom_reporter.go @@ -194,7 +194,7 @@ func (pr *promReporter) reportPendingEthTxes(ctx context.Context, evmChainID *bi return fmt.Errorf("failed to get txm: %w", err) } - unconfirmed, err := txm.CountTransactionsByState(ctx, txmgrcommon.TxUnconfirmed, evmChainID) + unconfirmed, err := txm.CountTransactionsByState(ctx, txmgrcommon.TxUnconfirmed) if err != nil { return fmt.Errorf("failed to query for unconfirmed eth_tx count: %w", err) } @@ -208,7 +208,7 @@ func (pr *promReporter) reportMaxUnconfirmedAge(ctx context.Context, evmChainID return fmt.Errorf("failed to get txm: %w", err) } - broadcastAt, err := txm.FindEarliestUnconfirmedBroadcastTime(ctx, evmChainID) + broadcastAt, err := txm.FindEarliestUnconfirmedBroadcastTime(ctx) if err != nil { return fmt.Errorf("failed to query for min broadcast time: %w", err) } @@ -227,7 +227,7 @@ func (pr *promReporter) reportMaxUnconfirmedBlocks(ctx context.Context, head *ev return fmt.Errorf("failed to get txm: %w", err) } - earliestUnconfirmedTxBlock, err := txm.FindEarliestUnconfirmedTxAttemptBlock(ctx, head.EVMChainID.ToInt()) + earliestUnconfirmedTxBlock, err := txm.FindEarliestUnconfirmedTxAttemptBlock(ctx) if err != nil { return fmt.Errorf("failed to query for earliest unconfirmed tx block: %w", err) } From d1bd16548fc898d7c035e8eff0d44c1e27c90283 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 29 Nov 2023 10:53:03 -0500 Subject: [PATCH 8/8] lint --- common/txmgr/mocks/tx_manager.go | 54 ++++++++++++++++---------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/common/txmgr/mocks/tx_manager.go b/common/txmgr/mocks/tx_manager.go index ef33bcd2360..27077218f6e 100644 --- a/common/txmgr/mocks/tx_manager.go +++ b/common/txmgr/mocks/tx_manager.go @@ -37,23 +37,23 @@ func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Close( return r0 } -// CountTransactionsByState provides a mock function with given fields: ctx, state, chainID -func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { - ret := _m.Called(ctx, state, chainID) +// CountTransactionsByState provides a mock function with given fields: ctx, state +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (uint32, error) { + ret := _m.Called(ctx, state) var r0 uint32 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) (uint32, error)); ok { - return rf(ctx, state, chainID) + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState) (uint32, error)); ok { + return rf(ctx, state) } - if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) uint32); ok { - r0 = rf(ctx, state, chainID) + if rf, ok := ret.Get(0).(func(context.Context, txmgrtypes.TxState) uint32); ok { + r0 = rf(ctx, state) } else { r0 = ret.Get(0).(uint32) } - if rf, ok := ret.Get(1).(func(context.Context, txmgrtypes.TxState, CHAIN_ID) error); ok { - r1 = rf(ctx, state, chainID) + if rf, ok := ret.Get(1).(func(context.Context, txmgrtypes.TxState) error); ok { + r1 = rf(ctx, state) } else { r1 = ret.Error(1) } @@ -85,23 +85,23 @@ func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Create return r0, r1 } -// FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx, chainID -func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) { - ret := _m.Called(ctx, chainID) +// FindEarliestUnconfirmedBroadcastTime provides a mock function with given fields: ctx +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedBroadcastTime(ctx context.Context) (null.Time, error) { + ret := _m.Called(ctx) var r0 null.Time var r1 error - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Time, error)); ok { - return rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context) (null.Time, error)); ok { + return rf(ctx) } - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Time); ok { - r0 = rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context) null.Time); ok { + r0 = rf(ctx) } else { r0 = ret.Get(0).(null.Time) } - if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { - r1 = rf(ctx, chainID) + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) } else { r1 = ret.Error(1) } @@ -109,23 +109,23 @@ func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEa return r0, r1 } -// FindEarliestUnconfirmedTxAttemptBlock provides a mock function with given fields: ctx, chainID -func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) { - ret := _m.Called(ctx, chainID) +// FindEarliestUnconfirmedTxAttemptBlock provides a mock function with given fields: ctx +func (_m *TxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context) (null.Int, error) { + ret := _m.Called(ctx) var r0 null.Int var r1 error - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) (null.Int, error)); ok { - return rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context) (null.Int, error)); ok { + return rf(ctx) } - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) null.Int); ok { - r0 = rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context) null.Int); ok { + r0 = rf(ctx) } else { r0 = ret.Get(0).(null.Int) } - if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { - r1 = rf(ctx, chainID) + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) } else { r1 = ret.Error(1) }