diff --git a/common/txmgr/types/keystore.go b/common/txmgr/types/keystore.go new file mode 100644 index 00000000000..f7894963ee5 --- /dev/null +++ b/common/txmgr/types/keystore.go @@ -0,0 +1,12 @@ +package types + +import "github.com/smartcontractkit/chainlink/v2/core/services/pg" + +// KeyStore encompasses the subset of keystore used by txmgr +type KeyStore[ADDR any, ID any, S any] interface { + CheckEnabled(address ADDR, chainID ID) error + NextSequence(address ADDR, chainID ID, qopts ...pg.QOpt) (S, error) + EnabledAddressesForChain(chainId ID) ([]ADDR, error) + IncrementNextSequence(address ADDR, chainID ID, currentSequence S, qopts ...pg.QOpt) error + SubscribeToKeyChanges() (ch chan struct{}, unsub func()) +} diff --git a/core/chains/evm/monitor/balance.go b/core/chains/evm/monitor/balance.go index b9e9d0885c4..5f3e027ef2b 100644 --- a/core/chains/evm/monitor/balance.go +++ b/core/chains/evm/monitor/balance.go @@ -20,7 +20,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -170,16 +169,16 @@ func (w *worker) Work() { } func (w *worker) WorkCtx(ctx context.Context) { - keys, err := w.bm.ethKeyStore.EnabledKeysForChain(w.bm.chainID) + enabledAddresses, err := w.bm.ethKeyStore.EnabledAddressesForChain(w.bm.chainID) if err != nil { w.bm.logger.Error("BalanceMonitor: error getting keys", err) } var wg sync.WaitGroup - wg.Add(len(keys)) - for _, key := range keys { - go func(k ethkey.KeyV2) { + wg.Add(len(enabledAddresses)) + for _, key := range enabledAddresses { + go func(k gethCommon.Address) { defer wg.Done() w.checkAccountBalance(ctx, k) }(key) @@ -190,24 +189,24 @@ func (w *worker) WorkCtx(ctx context.Context) { // Approximately ETH block time const ethFetchTimeout = 15 * time.Second -func (w *worker) checkAccountBalance(ctx context.Context, k ethkey.KeyV2) { +func (w *worker) checkAccountBalance(ctx context.Context, address gethCommon.Address) { ctx, cancel := context.WithTimeout(ctx, ethFetchTimeout) defer cancel() - bal, err := w.bm.ethClient.BalanceAt(ctx, k.Address, nil) + bal, err := w.bm.ethClient.BalanceAt(ctx, address, nil) if err != nil { - w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s", k.Address.Hex()), + w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s", address.Hex()), "error", err, - "address", k.Address, + "address", address, ) } else if bal == nil { - w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s: invariant violation, bal may not be nil", k.Address.Hex()), + w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s: invariant violation, bal may not be nil", address.Hex()), "error", err, - "address", k.Address, + "address", address, ) } else { ethBal := assets.Eth(*bal) - w.bm.updateBalance(ethBal, k.Address) + w.bm.updateBalance(ethBal, address) } } diff --git a/core/chains/evm/txmgr/eth_broadcaster.go b/core/chains/evm/txmgr/eth_broadcaster.go index d1c1961f8e5..a2cf96448e2 100644 --- a/core/chains/evm/txmgr/eth_broadcaster.go +++ b/core/chains/evm/txmgr/eth_broadcaster.go @@ -23,7 +23,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/label" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -102,8 +101,8 @@ type EthBroadcaster struct { ethTxInsertListener pg.Subscription eventBroadcaster pg.EventBroadcaster - ks KeyStore - keyStates []ethkey.State + ks txmgrtypes.KeyStore[gethCommon.Address, *big.Int, int64] + addresses []gethCommon.Address checkerFactory TransmitCheckerFactory @@ -119,9 +118,10 @@ type EthBroadcaster struct { } // NewEthBroadcaster returns a new concrete EthBroadcaster -func NewEthBroadcaster(orm ORM, ethClient evmclient.Client, config Config, keystore KeyStore, +func NewEthBroadcaster(orm ORM, ethClient evmclient.Client, config Config, + keystore txmgrtypes.KeyStore[gethCommon.Address, *big.Int, int64], eventBroadcaster pg.EventBroadcaster, - keyStates []ethkey.State, resumeCallback ResumeCallback, + addresses []gethCommon.Address, resumeCallback ResumeCallback, txAttemptBuilder txmgrtypes.TxAttemptBuilder[*evmtypes.Head, gas.EvmFee, gethCommon.Address, gethCommon.Hash, EthTx, EthTxAttempt], logger logger.Logger, checkerFactory TransmitCheckerFactory, autoSyncNonce bool) *EthBroadcaster { @@ -137,7 +137,7 @@ func NewEthBroadcaster(orm ORM, ethClient evmclient.Client, config Config, keyst config: config, eventBroadcaster: eventBroadcaster, ks: keystore, - keyStates: keyStates, + addresses: addresses, checkerFactory: checkerFactory, triggers: triggers, chStop: make(chan struct{}), @@ -155,10 +155,10 @@ func (eb *EthBroadcaster) Start(ctx context.Context) error { return errors.Wrap(err, "EthBroadcaster could not start") } - eb.wg.Add(len(eb.keyStates)) - for _, k := range eb.keyStates { + eb.wg.Add(len(eb.addresses)) + for _, k := range eb.addresses { triggerCh := make(chan struct{}, 1) - eb.triggers[k.Address.Address()] = triggerCh + eb.triggers[k] = triggerCh go eb.monitorEthTxs(k, triggerCh) } @@ -244,20 +244,20 @@ func (eb *EthBroadcaster) newResendBackoff() backoff.Backoff { } } -func (eb *EthBroadcaster) monitorEthTxs(k ethkey.State, triggerCh chan struct{}) { +func (eb *EthBroadcaster) monitorEthTxs(k gethCommon.Address, triggerCh chan struct{}) { defer eb.wg.Done() ctx, cancel := utils.ContextFromChan(eb.chStop) defer cancel() if eb.autoSyncNonce { - eb.logger.Debugw("Auto-syncing nonce", "address", k.Address) + eb.logger.Debugw("Auto-syncing nonce", "address", k) eb.SyncNonce(ctx, k) if ctx.Err() != nil { return } } else { - eb.logger.Debugw("Skipping nonce auto-sync", "address", k.Address) + eb.logger.Debugw("Skipping nonce auto-sync", "address", k) } // errorRetryCh allows retry on exponential backoff in case of timeout or @@ -306,17 +306,13 @@ func (eb *EthBroadcaster) monitorEthTxs(k ethkey.State, triggerCh chan struct{}) } // syncNonce tries to sync the key nonce, retrying indefinitely until success -func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k ethkey.State) { - if k.Disabled { - eb.logger.Infow("Skipping nonce sync for disabled key", "address", k.Address) - return - } +func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k gethCommon.Address) { syncer := NewNonceSyncer(eb.orm, eb.logger, eb.ethClient, eb.ks) nonceSyncRetryBackoff := eb.newNonceSyncBackoff() if err := syncer.Sync(ctx, k); err != nil { // Enter retry loop with backoff var attempt int - eb.logger.Errorw("Failed to sync with on-chain nonce", "address", k.Address, "attempt", attempt, "err", err) + eb.logger.Errorw("Failed to sync with on-chain nonce", "address", k, "attempt", attempt, "err", err) for { select { case <-eb.chStop: @@ -326,10 +322,10 @@ func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k ethkey.State) { if err := syncer.Sync(ctx, k); err != nil { if attempt > 5 { - eb.logger.Criticalw("Failed to sync with on-chain nonce", "address", k.Address, "attempt", attempt, "err", err) + eb.logger.Criticalw("Failed to sync with on-chain nonce", "address", k, "attempt", attempt, "err", err) eb.SvcErrBuffer.Append(err) } else { - eb.logger.Warnw("Failed to sync with on-chain nonce", "address", k.Address, "attempt", attempt, "err", err) + eb.logger.Warnw("Failed to sync with on-chain nonce", "address", k, "attempt", attempt, "err", err) } continue } @@ -341,8 +337,8 @@ func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k ethkey.State) { // ProcessUnstartedEthTxs picks up and handles all eth_txes in the queue // revive:disable:error-return -func (eb *EthBroadcaster) ProcessUnstartedEthTxs(ctx context.Context, keyState ethkey.State) (err error, retryable bool) { - return eb.processUnstartedEthTxs(ctx, keyState.Address.Address()) +func (eb *EthBroadcaster) ProcessUnstartedEthTxs(ctx context.Context, address gethCommon.Address) (err error, retryable bool) { + return eb.processUnstartedEthTxs(ctx, address) } // NOTE: This MUST NOT be run concurrently for the same address or it could @@ -749,11 +745,11 @@ func (eb *EthBroadcaster) saveFatallyErroredTransaction(lgr logger.Logger, etx * } func (eb *EthBroadcaster) getNextNonce(address gethCommon.Address) (nonce int64, err error) { - return eb.ks.GetNextNonce(address, &eb.chainID) + return eb.ks.NextSequence(address, &eb.chainID) } func (eb *EthBroadcaster) incrementNextNonce(address gethCommon.Address, currentNonce int64, qopts ...pg.QOpt) error { - return eb.ks.IncrementNextNonce(address, &eb.chainID, currentNonce, qopts...) + return eb.ks.IncrementNextSequence(address, &eb.chainID, currentNonce, qopts...) } func observeTimeUntilBroadcast(chainID big.Int, createdAt, broadcastAt time.Time) { diff --git a/core/chains/evm/txmgr/eth_broadcaster_test.go b/core/chains/evm/txmgr/eth_broadcaster_test.go index 1fb4d3268ff..a1ac3ae672a 100644 --- a/core/chains/evm/txmgr/eth_broadcaster_test.go +++ b/core/chains/evm/txmgr/eth_broadcaster_test.go @@ -39,7 +39,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" ksmocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pg/datatypes" @@ -52,13 +51,13 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { cfg := configtest.NewTestGeneralConfig(t) borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) checkerFactory := &txmgr.CheckerFactory{Client: ethClient} - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, checkerFactory, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, checkerFactory, false) toAddress := gethCommon.HexToAddress("0x6C03DDA95a2AEd917EeCc6eddD4b9D16E6380411") timeNow := time.Now() @@ -68,7 +67,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { gasLimit := uint32(242) t.Run("no eth_txes at all", func(t *testing.T) { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) }) @@ -86,7 +85,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { } require.NoError(t, borm.InsertEthTx(&etx)) - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) }) @@ -121,7 +120,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { require.NoError(t, borm.InsertEthTx(&etxUnconfirmed)) require.NoError(t, borm.InsertEthTx(&etxWithError)) - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) }) @@ -198,7 +197,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { require.NoError(t, borm.InsertEthTx(&earlierEthTx)) // Do the thing - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) @@ -265,7 +264,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { c.EVM[0].GasEstimator.PriceMax = assets.NewWeiI(rnd + 2) }) evmcfg = evmtest.NewChainScopedConfig(t, cfg) - eb = cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, checkerFactory, false) + eb = cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, checkerFactory, false) t.Run("sends transactions with type 0x2 in EIP-1559 mode", func(t *testing.T) { eipTxWithoutAl := txmgr.EthTx{ @@ -299,7 +298,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -367,7 +366,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { require.NoError(t, borm.InsertEthTx(ðTx)) { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -401,7 +400,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { require.NoError(t, borm.InsertEthTx(ðTx)) { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -437,7 +436,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { require.NoError(t, borm.InsertEthTx(ðTx)) { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -456,13 +455,13 @@ func TestEthBroadcaster_TransmitChecking(t *testing.T) { cfg := configtest.NewTestGeneralConfig(t) borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) checkerFactory := &testCheckerFactory{} - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, checkerFactory, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, checkerFactory, false) toAddress := gethCommon.HexToAddress("0x6C03DDA95a2AEd917EeCc6eddD4b9D16E6380411") gasLimit := uint32(242) @@ -489,7 +488,7 @@ func TestEthBroadcaster_TransmitChecking(t *testing.T) { require.NoError(t, borm.InsertEthTx(ðTx)) { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -522,7 +521,7 @@ func TestEthBroadcaster_TransmitChecking(t *testing.T) { require.NoError(t, borm.InsertEthTx(ðTx)) { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -552,7 +551,7 @@ func TestEthBroadcaster_TransmitChecking(t *testing.T) { require.NoError(t, borm.InsertEthTx(ðTx)) { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -573,7 +572,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_OptimisticLockingOnEthTx(t *testi evmcfg := evmtest.NewChainScopedConfig(t, cfg) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) estimator := txmgrmocks.NewFeeEstimator[*evmtypes.Head, gas.EvmFee, *assets.Wei, gethCommon.Hash](t) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ChainID(), evmcfg, ethKeyStore, estimator) @@ -591,7 +590,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_OptimisticLockingOnEthTx(t *testi evmcfg, ethKeyStore, &pg.NullEventBroadcaster{}, - []ethkey.State{keyState}, + []gethCommon.Address{fromAddress}, nil, txBuilder, logger.TestLogger(t), @@ -624,7 +623,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_OptimisticLockingOnEthTx(t *testi }() { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -640,13 +639,13 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success_WithMultiplier(t *testing borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) evmcfg := evmtest.NewChainScopedConfig(t, cfg) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) ethClient.On("SendTransaction", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { assert.Equal(t, int(1600), int(tx.Gas())) @@ -666,7 +665,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success_WithMultiplier(t *testing // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -723,11 +722,11 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) // Crashed right after we commit the database transaction that saved // the nonce to the eth_tx so evm_key_states.next_nonce has not been @@ -740,7 +739,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -761,11 +760,11 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) // Crashed right after we commit the database transaction that saved // the nonce to the eth_tx so keys.next_nonce has not been @@ -778,7 +777,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -799,11 +798,11 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) // Crashed right after we commit the database transaction that saved // the nonce to the eth_tx so keys.next_nonce has not been @@ -816,7 +815,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -836,11 +835,11 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) // Crashed right after we commit the database transaction that saved // the nonce to the eth_tx so keys.next_nonce has not been @@ -853,7 +852,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -875,11 +874,11 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) // Crashed right after we commit the database transaction that saved // the nonce to the eth_tx so keys.next_nonce has not been @@ -891,7 +890,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { })).Return(failedToReachNodeError).Once() // Do the thing - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), failedToReachNodeError.Error()) assert.True(t, retryable) @@ -912,7 +911,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, nextNonce) cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { // Configured gas price changed @@ -922,7 +921,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) // Crashed right after we commit the database transaction that saved // the nonce to the eth_tx so keys.next_nonce has not been @@ -940,7 +939,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -962,7 +961,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { } func getLocalNextNonce(t *testing.T, kst keystore.Eth, fromAddress gethCommon.Address) uint64 { - n, err := kst.GetNextNonce(fromAddress, &cltest.FixtureChainID) + n, err := kst.NextSequence(fromAddress, &cltest.FixtureChainID) require.NoError(t, err) require.NotNil(t, n) return uint64(n) @@ -983,12 +982,12 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) evmcfg := evmtest.NewChainScopedConfig(t, cfg) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) require.NoError(t, utils.JustError(db.Exec(`SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`))) @@ -1010,7 +1009,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1057,7 +1056,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { })).Return(errors.New(fatalErrorExample)).Once() { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1108,7 +1107,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce })).Return(errors.New(fatalErrorExample)).Once() - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "something exploded in the callback") assert.True(t, retryable) @@ -1130,7 +1129,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { })).Return(errors.New(fatalErrorExample)).Once() { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1145,11 +1144,11 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { estimator := gas.NewWrappedEvmEstimator(gas.NewFixedPriceEstimator(evmcfg, lggr), evmcfg) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ChainID(), evmcfg, ethKeyStore, estimator) eb = txmgr.NewEthBroadcaster(borm, ethClient, evmcfg, ethKeyStore, eventBroadcaster, - []ethkey.State{keyState}, fn, txBuilder, lggr, + []gethCommon.Address{fromAddress}, fn, txBuilder, lggr, &testCheckerFactory{}, false) { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1184,7 +1183,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), "tx fee (1.10 ether) exceeds the configured cap (1.00 ether)") assert.Contains(t, err.Error(), "error while sending transaction") @@ -1215,7 +1214,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce+1, nil).Once() { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1254,7 +1253,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() // Do the thing - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) require.Contains(t, err.Error(), retryableErrorExample) assert.True(t, retryable) @@ -1278,7 +1277,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { })).Return(nil).Once() { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1317,7 +1316,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), errors.New("pending nonce fetch failed")).Once() // Do the thing - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) require.Contains(t, err.Error(), retryableErrorExample) require.Contains(t, err.Error(), "pending nonce fetch failed") @@ -1342,7 +1341,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { })).Return(nil).Once() { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1382,7 +1381,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce+1, nil).Once() // Do the thing - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.NoError(t, err) assert.False(t, retryable) @@ -1434,7 +1433,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1471,7 +1470,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { })).Return(failedToReachNodeError).Once() // Do the thing - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") assert.True(t, retryable) @@ -1503,7 +1502,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // Do the thing { - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1534,7 +1533,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { c.EVM[0].GasEstimator.BumpMin = assets.NewWeiI(0) c.EVM[0].GasEstimator.BumpPercent = ptr[uint16](0) })) - eb2 := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg2, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb2 := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg2, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) etx := txmgr.EthTx{ FromAddress: fromAddress, @@ -1552,7 +1551,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { })).Return(errors.New(underpricedError)).Once() // Do the thing - err, retryable := eb2.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb2.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "bumped gas price of 20 gwei is equal to original gas price of 20 gwei. ACTION REQUIRED: This is a configuration error, you must increase either EVM.GasEstimator.BumpPercent or EVM.GasEstimator.BumpMin") assert.True(t, retryable) @@ -1578,7 +1577,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce })).Return(errors.New(insufficientEthError)).Once() - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), "insufficient funds for transfer") assert.True(t, retryable) @@ -1617,7 +1616,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce })).Return(errors.New(nonceGapError)).Once() - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), nonceGapError) assert.True(t, retryable) @@ -1651,7 +1650,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { c.EVM[0].GasEstimator.BumpMin = assets.NewWeiI(0) c.EVM[0].GasEstimator.BumpPercent = ptr[uint16](0) })) - eb2 := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg2, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb2 := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg2, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) etx := txmgr.EthTx{ FromAddress: fromAddress, @@ -1670,7 +1669,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { })).Return(errors.New(underpricedError)).Once() // Check gas tip cap verification - err, retryable := eb2.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb2.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "bumped gas tip cap of 1 wei is less than or equal to original gas tip cap of 1 wei") assert.True(t, retryable) @@ -1700,9 +1699,9 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { c.EVM[0].GasEstimator.EIP1559DynamicFees = ptr(true) c.EVM[0].GasEstimator.TipCapDefault = assets.NewWeiI(0) })) - eb2 := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg2, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb2 := cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg2, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) - err, retryable := eb2.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb2.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "specified gas tip cap of 0 is below min configured gas tip of 1 wei for key") assert.True(t, retryable) @@ -1713,7 +1712,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { c.EVM[0].GasEstimator.EIP1559DynamicFees = ptr(true) c.EVM[0].GasEstimator.TipCapDefault = gasTipCapDefault })) - eb2 = cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg2, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb2 = cltest.NewEthBroadcaster(t, borm, ethClient, ethKeyStore, evmcfg2, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) // Second was underpriced but above minimum ethClient.On("SendTransaction", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce && tx.GasTipCap().Cmp(gasTipCapDefault.ToInt()) == 0 @@ -1727,7 +1726,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce && tx.GasTipCap().Cmp(big.NewInt(0).Add(gasTipCapDefault.ToInt(), big.NewInt(0).Mul(evmcfg2.EvmGasBumpWei().ToInt(), big.NewInt(2)))) == 0 })).Return(nil).Once() - err, retryable = eb2.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable = eb2.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.NoError(t, err) assert.False(t, retryable) @@ -1749,13 +1748,13 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_KeystoreErrors(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) realKeystore := cltest.NewKeyStore(t, db, cfg) - keyState, fromAddress := cltest.MustInsertRandomKeyReturningState(t, realKeystore.Eth()) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, realKeystore.Eth()) evmcfg := evmtest.NewChainScopedConfig(t, cfg) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) kst := ksmocks.NewEth(t) - eb := cltest.NewEthBroadcaster(t, borm, ethClient, kst, evmcfg, []ethkey.State{keyState}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, ethClient, kst, evmcfg, []gethCommon.Address{fromAddress}, &testCheckerFactory{}, false) t.Run("tx signing fails", func(t *testing.T) { etx := txmgr.EthTx{ @@ -1769,9 +1768,9 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_KeystoreErrors(t *testing.T) { require.NoError(t, borm.InsertEthTx(&etx)) tx := *gethTypes.NewTx(&gethTypes.LegacyTx{}) - next, err := realKeystore.Eth().GetNextNonce(fromAddress, testutils.FixtureChainID) + next, err := realKeystore.Eth().NextSequence(fromAddress, testutils.FixtureChainID) require.NoError(t, err) - kst.On("GetNextNonce", fromAddress, testutils.FixtureChainID, mock.Anything).Return(next, nil).Once() + kst.On("NextSequence", fromAddress, testutils.FixtureChainID, mock.Anything).Return(next, nil).Once() kst.On("SignTx", fromAddress, mock.AnythingOfType("*types.Transaction"), @@ -1780,7 +1779,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_KeystoreErrors(t *testing.T) { })).Return(&tx, errors.New("could not sign transaction")).Once() // Do the thing - err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), keyState) + err, retryable := eb.ProcessUnstartedEthTxs(testutils.Context(t), fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "could not sign transaction") assert.True(t, retryable) @@ -1820,9 +1819,9 @@ func TestEthBroadcaster_IncrementNextNonce(t *testing.T) { keyState, _ := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) // Cannot increment if supplied nonce doesn't match existing - require.Error(t, ethKeyStore.IncrementNextNonce(keyState.Address.Address(), &cltest.FixtureChainID, int64(42))) + require.Error(t, ethKeyStore.IncrementNextSequence(keyState.Address.Address(), &cltest.FixtureChainID, int64(42))) - require.NoError(t, ethKeyStore.IncrementNextNonce(keyState.Address.Address(), &cltest.FixtureChainID, int64(0))) + require.NoError(t, ethKeyStore.IncrementNextSequence(keyState.Address.Address(), &cltest.FixtureChainID, int64(0))) // Nonce bumped to 1 var nonce int64 @@ -1841,7 +1840,7 @@ func TestEthBroadcaster_Trigger(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) evmcfg := evmtest.NewChainScopedConfig(t, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - eb := cltest.NewEthBroadcaster(t, borm, evmtest.NewEthClientMockWithDefaultChain(t), ethKeyStore, evmcfg, []ethkey.State{}, &testCheckerFactory{}, false) + eb := cltest.NewEthBroadcaster(t, borm, evmtest.NewEthClientMockWithDefaultChain(t), ethKeyStore, evmcfg, []gethCommon.Address{}, &testCheckerFactory{}, false) eb.Trigger(testutils.NewAddress()) eb.Trigger(testutils.NewAddress()) @@ -1883,9 +1882,9 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) kst := cltest.NewKeyStore(t, db, cfg).Eth() - k1, fromAddress := cltest.MustInsertRandomKeyReturningState(t, kst, true) - k2, disabledAddress := cltest.MustInsertRandomKeyReturningState(t, kst, false) - keyStates := []ethkey.State{k1, k2} + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, kst, true) + _, disabledAddress := cltest.MustInsertRandomKeyReturningState(t, kst, false) + keys := []gethCommon.Address{fromAddress, disabledAddress} ethNodeNonce := uint64(22) @@ -1901,7 +1900,7 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ChainID(), evmcfg, kst, estimator) - eb := txmgr.NewEthBroadcaster(borm, ethClient, evmcfg, kst, eventBroadcaster, keyStates, nil, txBuilder, lggr, checkerFactory, false) + eb := txmgr.NewEthBroadcaster(borm, ethClient, evmcfg, kst, eventBroadcaster, keys, nil, txBuilder, lggr, checkerFactory, false) require.NoError(t, eb.Start(ctx)) defer func() { assert.NoError(t, eb.Close()) }() @@ -1913,7 +1912,7 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ChainID(), evmcfg, kst, estimator) - eb := txmgr.NewEthBroadcaster(borm, ethClient, evmcfg, kst, eventBroadcaster, keyStates, nil, txBuilder, lggr, checkerFactory, true) + eb := txmgr.NewEthBroadcaster(borm, ethClient, evmcfg, kst, eventBroadcaster, []gethCommon.Address{fromAddress}, nil, txBuilder, lggr, checkerFactory, true) ethClient.On("PendingNonceAt", mock.Anything, mock.MatchedBy(func(account gethCommon.Address) bool { return account.Hex() == fromAddress.Hex() @@ -1943,7 +1942,7 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ChainID(), evmcfg, kst, estimator) - eb := txmgr.NewEthBroadcaster(borm, ethClient, evmcfg, kst, eventBroadcaster, keyStates, nil, txBuilder, lggr, checkerFactory, true) + eb := txmgr.NewEthBroadcaster(borm, ethClient, evmcfg, kst, eventBroadcaster, []gethCommon.Address{fromAddress}, nil, txBuilder, lggr, checkerFactory, true) ethClient.On("PendingNonceAt", mock.Anything, mock.MatchedBy(func(account gethCommon.Address) bool { return account.Hex() == fromAddress.Hex() diff --git a/core/chains/evm/txmgr/eth_confirmer.go b/core/chains/evm/txmgr/eth_confirmer.go index d406b3ad670..939b847db0f 100644 --- a/core/chains/evm/txmgr/eth_confirmer.go +++ b/core/chains/evm/txmgr/eth_confirmer.go @@ -25,7 +25,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/label" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -122,8 +121,8 @@ type EthConfirmer struct { config Config chainID big.Int - ks KeyStore - keyStates []ethkey.State + ks txmgrtypes.KeyStore[gethCommon.Address, *big.Int, int64] + addresses []gethCommon.Address mb *utils.Mailbox[*evmtypes.Head] ctx context.Context @@ -134,8 +133,9 @@ type EthConfirmer struct { } // NewEthConfirmer instantiates a new eth confirmer -func NewEthConfirmer(orm ORM, ethClient evmclient.Client, config Config, keystore KeyStore, - keyStates []ethkey.State, resumeCallback ResumeCallback, +func NewEthConfirmer(orm ORM, ethClient evmclient.Client, config Config, + keystore txmgrtypes.KeyStore[gethCommon.Address, *big.Int, int64], + addresses []gethCommon.Address, resumeCallback ResumeCallback, txAttemptBuilder txmgrtypes.TxAttemptBuilder[*evmtypes.Head, gas.EvmFee, gethCommon.Address, gethCommon.Hash, EthTx, EthTxAttempt], lggr logger.Logger) *EthConfirmer { @@ -152,7 +152,7 @@ func NewEthConfirmer(orm ORM, ethClient evmclient.Client, config Config, keystor config, *ethClient.ChainID(), keystore, - keyStates, + addresses, utils.NewSingleMailbox[*evmtypes.Head](), ctx, cancel, @@ -579,10 +579,10 @@ func (ec *EthConfirmer) RebroadcastWhereNecessary(ctx context.Context, blockHeig // It is safe to process separate keys concurrently // NOTE: This design will block one key if another takes a really long time to execute - wg.Add(len(ec.keyStates)) + wg.Add(len(ec.addresses)) errors := []error{} var errMu sync.Mutex - for _, key := range ec.keyStates { + for _, address := range ec.addresses { go func(fromAddress gethCommon.Address) { if err := ec.rebroadcastWhereNecessary(ctx, fromAddress, blockHeight); err != nil { errMu.Lock() @@ -592,7 +592,7 @@ func (ec *EthConfirmer) RebroadcastWhereNecessary(ctx context.Context, blockHeig } wg.Done() - }(key.Address.Address()) + }(address) } wg.Wait() @@ -974,8 +974,8 @@ func (ec *EthConfirmer) EnsureConfirmedTransactionsInLongestChain(ctx context.Co var wg sync.WaitGroup errors := []error{} var errMu sync.Mutex - wg.Add(len(ec.keyStates)) - for _, key := range ec.keyStates { + wg.Add(len(ec.addresses)) + for _, address := range ec.addresses { go func(fromAddress gethCommon.Address) { if err := ec.handleAnyInProgressAttempts(ctx, fromAddress, head.BlockNumber()); err != nil { errMu.Lock() @@ -985,7 +985,7 @@ func (ec *EthConfirmer) EnsureConfirmedTransactionsInLongestChain(ctx context.Co } wg.Done() - }(key.Address.Address()) + }(address) } wg.Wait() diff --git a/core/chains/evm/txmgr/eth_confirmer_test.go b/core/chains/evm/txmgr/eth_confirmer_test.go index 7bb32d22069..a5ea616eb67 100644 --- a/core/chains/evm/txmgr/eth_confirmer_test.go +++ b/core/chains/evm/txmgr/eth_confirmer_test.go @@ -35,7 +35,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" ksmocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -118,10 +117,10 @@ func TestEthConfirmer_CheckForReceipts(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethKeyStore := cltest.NewKeyStore(t, db, config).Eth() - key, fromAddress := cltest.MustAddRandomKeyToKeystore(t, ethKeyStore) - state := cltest.MustGetStateForKey(t, ethKeyStore, key) + _, fromAddress := cltest.MustAddRandomKeyToKeystore(t, ethKeyStore) + // state := cltest.MustGetStateForKey(t, ethKeyStore, key) - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []gethCommon.Address{fromAddress}, nil) nonce := int64(0) ctx := testutils.Context(t) @@ -533,13 +532,13 @@ func TestEthConfirmer_CheckForReceipts_batching(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ctx := testutils.Context(t) @@ -599,8 +598,8 @@ func TestEthConfirmer_CheckForReceipts_HandlesNonFwdTxsWithForwardingEnabled(t * ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, nil) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ctx := testutils.Context(t) // tx is not forwarded and doesn't have meta set. EthConfirmer should handle nil meta values etx := cltest.MustInsertUnconfirmedEthTx(t, borm, 0, fromAddress) @@ -647,13 +646,13 @@ func TestEthConfirmer_CheckForReceipts_only_likely_confirmed(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ctx := testutils.Context(t) @@ -704,11 +703,11 @@ func TestEthConfirmer_CheckForReceipts_should_not_check_for_likely_unconfirmed(t ethKeyStore := cltest.NewKeyStore(t, db, config).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ctx := testutils.Context(t) @@ -733,15 +732,15 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt_scoped_to_key(t ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() chainId1, chainId2 := 1, 2 - state1_1, fromAddress1_1 := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, chainId1) - state1_2, fromAddress1_2 := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, chainId1) - state2_1, fromAddress2_1 := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, chainId2) + _, fromAddress1_1 := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, chainId1) + _, fromAddress1_2 := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, chainId1) + _, fromAddress2_1 := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, chainId2) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("NonceAt", mock.Anything, mock.Anything, mock.Anything).Return(uint64(20), nil) evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state1_1, state1_2, state2_1}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress1_1, fromAddress1_2, fromAddress2_1}, nil) ctx := testutils.Context(t) // STATE @@ -802,13 +801,13 @@ func TestEthConfirmer_CheckForReceipts_confirmed_missing_receipt(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ctx := testutils.Context(t) @@ -1059,13 +1058,13 @@ func TestEthConfirmer_CheckConfirmedMissingReceipt(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ctx := testutils.Context(t) @@ -1138,13 +1137,13 @@ func TestEthConfirmer_CheckConfirmedMissingReceipt_batchSendTransactions_fails(t ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ctx := testutils.Context(t) @@ -1202,13 +1201,13 @@ func TestEthConfirmer_CheckConfirmedMissingReceipt_smallEvmRPCBatchSize_middleBa ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ctx := testutils.Context(t) @@ -1273,7 +1272,7 @@ func TestEthConfirmer_FindEthTxsRequiringRebroadcast(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) currentHead := int64(30) gasBumpThreshold := int64(10) @@ -1289,7 +1288,7 @@ func TestEthConfirmer_FindEthTxsRequiringRebroadcast(t *testing.T) { lggr := logger.TestLogger(t) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, nil) t.Run("returns nothing when there are no transactions", func(t *testing.T) { etxs, err := ec.FindEthTxsRequiringRebroadcast(testutils.Context(t), lggr, fromAddress, currentHead, gasBumpThreshold, 10, 0, cltest.FixtureChainID) @@ -1556,8 +1555,8 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WithConnectivityCheck(t *testing borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - keys := []ethkey.State{state} + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + keys := []gethCommon.Address{fromAddress} kst := ksmocks.NewEth(t) estimator := gasmocks.NewEvmEstimator(t) @@ -1597,8 +1596,8 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WithConnectivityCheck(t *testing borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - keys := []ethkey.State{state} + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + keys := []gethCommon.Address{fromAddress} kst := ksmocks.NewEth(t) estimator := gasmocks.NewEvmEstimator(t) @@ -1645,9 +1644,9 @@ func TestEthConfirmer_RebroadcastWhereNecessary(t *testing.T) { evmcfg := evmtest.NewChainScopedConfig(t, cfg) - otherKey, _ := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - keys := []ethkey.State{state, otherKey} + _, otherAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + keys := []gethCommon.Address{otherAddress, fromAddress} kst := ksmocks.NewEth(t) // Use a mock keystore for this test @@ -2283,9 +2282,9 @@ func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyUnderpriced_ThenGoesTh evmcfg := evmtest.NewChainScopedConfig(t, cfg) - otherKey, _ := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - keys := []ethkey.State{state, otherKey} + _, otherAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + keys := []gethCommon.Address{otherAddress, fromAddress} // Use a mock keystore for this test kst := ksmocks.NewEth(t) @@ -2392,10 +2391,10 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) { _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) - keys, err := ethKeyStore.EnabledKeysForChain(testutils.FixtureChainID) - require.NoError(t, err) - keyStates, err := ethKeyStore.GetStatesForKeys(keys) + enabledAddresses, err := ethKeyStore.EnabledAddressesForChain(testutils.FixtureChainID) require.NoError(t, err) + // keyStates, err := ethKeyStore.GetStatesForKeys(keys) + // require.NoError(t, err) config := newTestChainScopedConfig(t) currentHead := int64(30) @@ -2411,7 +2410,7 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) { insufficientEthError := errors.New("insufficient funds for gas * price + value") t.Run("saves attempt with state 'insufficient_eth' if eth node returns this error", func(t *testing.T) { - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, keyStates, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, enabledAddresses, nil) expectedBumpedGasPrice := big.NewInt(20000000000) require.Greater(t, expectedBumpedGasPrice.Int64(), attempt1_1.GasPrice.ToInt().Int64()) @@ -2437,7 +2436,7 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) { }) t.Run("does not bump gas when previous error was 'out of eth', instead resubmits existing transaction", func(t *testing.T) { - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, keyStates, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, enabledAddresses, nil) expectedBumpedGasPrice := big.NewInt(20000000000) require.Greater(t, expectedBumpedGasPrice.Int64(), attempt1_1.GasPrice.ToInt().Int64()) @@ -2462,7 +2461,7 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) { }) t.Run("saves the attempt as broadcast after node wallet has been topped up with sufficient balance", func(t *testing.T) { - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, keyStates, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, enabledAddresses, nil) expectedBumpedGasPrice := big.NewInt(20000000000) require.Greater(t, expectedBumpedGasPrice.Int64(), attempt1_1.GasPrice.ToInt().Int64()) @@ -2494,7 +2493,7 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) { c.EVM[0].GasEstimator.BumpTxDepth = ptr(uint16(depth)) }) evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, keyStates, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, enabledAddresses, nil) for i := 0; i < etxCount; i++ { n := nonce @@ -2523,12 +2522,12 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) config := newTestChainScopedConfig(t) - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []gethCommon.Address{fromAddress}, nil) head := evmtypes.Head{ Hash: utils.NewHash(), @@ -2696,7 +2695,7 @@ func TestEthConfirmer_ForceRebroadcast(t *testing.T) { borm := cltest.NewTxmORM(t, db, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg).Eth() - state, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) config := newTestChainScopedConfig(t) mustInsertUnstartedEthTx(t, borm, fromAddress) @@ -2709,7 +2708,7 @@ func TestEthConfirmer_ForceRebroadcast(t *testing.T) { t.Run("rebroadcasts one eth_tx if it falls within in nonce range", func(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ethClient.On("SendTransaction", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { return tx.Nonce() == uint64(*etx1.Nonce) && @@ -2724,7 +2723,7 @@ func TestEthConfirmer_ForceRebroadcast(t *testing.T) { t.Run("uses default gas limit if overrideGasLimit is 0", func(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ethClient.On("SendTransaction", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { return tx.Nonce() == uint64(*etx1.Nonce) && @@ -2739,7 +2738,7 @@ func TestEthConfirmer_ForceRebroadcast(t *testing.T) { t.Run("rebroadcasts several eth_txes in nonce range", func(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ethClient.On("SendTransaction", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { return tx.Nonce() == uint64(*etx1.Nonce) && uint64(tx.GasPrice().Int64()) == gasPriceWei && tx.Gas() == uint64(overrideGasLimit) @@ -2753,7 +2752,7 @@ func TestEthConfirmer_ForceRebroadcast(t *testing.T) { t.Run("broadcasts zero transactions if eth_tx doesn't exist for that nonce", func(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ethClient.On("SendTransaction", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { return tx.Nonce() == uint64(1) @@ -2778,7 +2777,7 @@ func TestEthConfirmer_ForceRebroadcast(t *testing.T) { t.Run("zero transactions use default gas limit if override wasn't specified", func(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []ethkey.State{state}, nil) + ec := cltest.NewEthConfirmer(t, borm, ethClient, config, ethKeyStore, []gethCommon.Address{fromAddress}, nil) ethClient.On("SendTransaction", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { return tx.Nonce() == uint64(0) && uint64(tx.GasPrice().Int64()) == gasPriceWei && uint32(tx.Gas()) == config.EvmGasLimitDefault() @@ -2797,8 +2796,8 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db, config).Eth() - key, fromAddress := cltest.MustAddRandomKeyToKeystore(t, ethKeyStore) - state := cltest.MustGetStateForKey(t, ethKeyStore, key) + _, fromAddress := cltest.MustAddRandomKeyToKeystore(t, ethKeyStore) + // state := cltest.MustGetStateForKey(t, ethKeyStore, key) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) @@ -2823,7 +2822,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`) t.Run("doesn't process task runs that are not suspended (possibly already previously resumed)", func(t *testing.T) { - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, func(uuid.UUID, interface{}, error) error { + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, func(uuid.UUID, interface{}, error) error { t.Fatal("No value expected") return nil }) @@ -2842,7 +2841,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { }) t.Run("doesn't process task runs where the receipt is younger than minConfirmations", func(t *testing.T) { - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, func(uuid.UUID, interface{}, error) error { + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, func(uuid.UUID, interface{}, error) error { t.Fatal("No value expected") return nil }) @@ -2864,7 +2863,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { t.Run("processes eth_txes with receipts older than minConfirmations", func(t *testing.T) { ch := make(chan interface{}) var err error - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, func(id uuid.UUID, value interface{}, thisErr error) error { + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, func(id uuid.UUID, value interface{}, thisErr error) error { err = thisErr ch <- value return nil @@ -2904,7 +2903,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { t.Run("processes eth_txes with receipt older than minConfirmations that reverted", func(t *testing.T) { ch := make(chan interface{}) var err error - ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []ethkey.State{state}, func(id uuid.UUID, value interface{}, thisErr error) error { + ec := cltest.NewEthConfirmer(t, borm, ethClient, evmcfg, ethKeyStore, []gethCommon.Address{fromAddress}, func(id uuid.UUID, value interface{}, thisErr error) error { err = thisErr ch <- value return nil diff --git a/core/chains/evm/txmgr/eth_resender.go b/core/chains/evm/txmgr/eth_resender.go index 254a0bb0e18..de24c3afc00 100644 --- a/core/chains/evm/txmgr/eth_resender.go +++ b/core/chains/evm/txmgr/eth_resender.go @@ -6,9 +6,11 @@ import ( "math/big" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" "github.com/pkg/errors" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/label" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -30,7 +32,7 @@ const defaultResenderPollInterval = 5 * time.Second type EthResender struct { orm ORM ethClient evmclient.Client - ks KeyStore + ks txmgrtypes.KeyStore[common.Address, *big.Int, int64] chainID big.Int interval time.Duration config Config @@ -42,7 +44,7 @@ type EthResender struct { } // NewEthResender creates a new concrete EthResender -func NewEthResender(lggr logger.Logger, orm ORM, ethClient evmclient.Client, ks KeyStore, pollInterval time.Duration, config Config) *EthResender { +func NewEthResender(lggr logger.Logger, orm ORM, ethClient evmclient.Client, ks txmgrtypes.KeyStore[common.Address, *big.Int, int64], pollInterval time.Duration, config Config) *EthResender { if config.EthTxResendAfterThreshold() == 0 { panic("EthResender requires a non-zero threshold") } @@ -96,7 +98,7 @@ func (er *EthResender) runLoop() { } func (er *EthResender) resendUnconfirmed() error { - keys, err := er.ks.EnabledKeysForChain(&er.chainID) + enabledAddresses, err := er.ks.EnabledAddressesForChain(&er.chainID) if err != nil { return errors.Wrapf(err, "EthResender failed getting enabled keys for chain %s", er.chainID.String()) } @@ -104,9 +106,9 @@ func (er *EthResender) resendUnconfirmed() error { maxInFlightTransactions := er.config.EvmMaxInFlightTransactions() olderThan := time.Now().Add(-ageThreshold) var allAttempts []EthTxAttempt - for _, k := range keys { + for _, k := range enabledAddresses { var attempts []EthTxAttempt - attempts, err = er.orm.FindEthTxAttemptsRequiringResend(olderThan, maxInFlightTransactions, er.chainID, k.Address) + attempts, err = er.orm.FindEthTxAttemptsRequiringResend(olderThan, maxInFlightTransactions, er.chainID, k) if err != nil { return errors.Wrap(err, "failed to FindEthTxAttemptsRequiringResend") } diff --git a/core/chains/evm/txmgr/nonce_syncer.go b/core/chains/evm/txmgr/nonce_syncer.go index eb4ee32694b..2cb641e3882 100644 --- a/core/chains/evm/txmgr/nonce_syncer.go +++ b/core/chains/evm/txmgr/nonce_syncer.go @@ -10,13 +10,12 @@ import ( evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) type ( NonceSyncerKeyStore interface { - GetNextNonce(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) + NextSequence(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) } // NonceSyncer manages the delicate task of syncing the local nonce with the // chain nonce in case of divergence. @@ -80,11 +79,8 @@ func NewNonceSyncer(orm ORM, lggr logger.Logger, ethClient evmclient.Client, kst // // This should only be called once, before the EthBroadcaster has started. // Calling it later is not safe and could lead to races. -func (s NonceSyncer) Sync(ctx context.Context, keyState ethkey.State) (err error) { - if keyState.Disabled { - return errors.Errorf("cannot sync disabled key state: %s", keyState.Address) - } - err = s.fastForwardNonceIfNecessary(ctx, keyState.Address.Address()) +func (s NonceSyncer) Sync(ctx context.Context, address common.Address) (err error) { + err = s.fastForwardNonceIfNecessary(ctx, address) return errors.Wrap(err, "NonceSyncer#fastForwardNoncesIfNecessary failed") } @@ -97,7 +93,7 @@ func (s NonceSyncer) fastForwardNonceIfNecessary(ctx context.Context, address co return nil } - keyNextNonce, err := s.kst.GetNextNonce(address, s.chainID, pg.WithParentCtx(ctx)) + keyNextNonce, err := s.kst.NextSequence(address, s.chainID, pg.WithParentCtx(ctx)) if err != nil { return err } diff --git a/core/chains/evm/txmgr/nonce_syncer_test.go b/core/chains/evm/txmgr/nonce_syncer_test.go index 82beca17023..e73f1f7bd64 100644 --- a/core/chains/evm/txmgr/nonce_syncer_test.go +++ b/core/chains/evm/txmgr/nonce_syncer_test.go @@ -38,7 +38,7 @@ func Test_NonceSyncer_Sync(t *testing.T) { ns := txmgr.NewNonceSyncer(borm, logger.TestLogger(t), ethClient, ethKeyStore) sendingKeys := cltest.MustSendingKeyStates(t, ethKeyStore, testutils.FixtureChainID) - err := ns.Sync(testutils.Context(t), sendingKeys[0]) + err := ns.Sync(testutils.Context(t), sendingKeys[0].Address.Address()) require.Error(t, err) assert.Contains(t, err.Error(), "something exploded") @@ -64,7 +64,7 @@ func Test_NonceSyncer_Sync(t *testing.T) { ns := txmgr.NewNonceSyncer(borm, logger.TestLogger(t), ethClient, ethKeyStore) sendingKeys := cltest.MustSendingKeyStates(t, ethKeyStore, testutils.FixtureChainID) - require.NoError(t, ns.Sync(testutils.Context(t), sendingKeys[0])) + require.NoError(t, ns.Sync(testutils.Context(t), sendingKeys[0].Address.Address())) cltest.AssertCount(t, db, "eth_txes", 0) cltest.AssertCount(t, db, "eth_tx_attempts", 0) @@ -89,7 +89,7 @@ func Test_NonceSyncer_Sync(t *testing.T) { ns := txmgr.NewNonceSyncer(borm, logger.TestLogger(t), ethClient, ethKeyStore) sendingKeys := cltest.MustSendingKeyStates(t, ethKeyStore, testutils.FixtureChainID) - require.NoError(t, ns.Sync(testutils.Context(t), sendingKeys[0])) + require.NoError(t, ns.Sync(testutils.Context(t), sendingKeys[0].Address.Address())) cltest.AssertCount(t, db, "eth_txes", 0) cltest.AssertCount(t, db, "eth_tx_attempts", 0) @@ -121,7 +121,7 @@ func Test_NonceSyncer_Sync(t *testing.T) { sendingKeys := cltest.MustSendingKeyStates(t, ethKeyStore, testutils.FixtureChainID) for _, k := range sendingKeys { - require.NoError(t, ns.Sync(testutils.Context(t), k)) + require.NoError(t, ns.Sync(testutils.Context(t), k.Address.Address())) } assertDatabaseNonce(t, db, key1, 5) @@ -146,7 +146,7 @@ func Test_NonceSyncer_Sync(t *testing.T) { ns := txmgr.NewNonceSyncer(borm, logger.TestLogger(t), ethClient, ethKeyStore) sendingKeys := cltest.MustSendingKeyStates(t, ethKeyStore, testutils.FixtureChainID) - require.NoError(t, ns.Sync(testutils.Context(t), sendingKeys[0])) + require.NoError(t, ns.Sync(testutils.Context(t), sendingKeys[0].Address.Address())) assertDatabaseNonce(t, db, key1, 0) ethClient = evmtest.NewEthClientMockWithDefaultChain(t) @@ -157,7 +157,7 @@ func Test_NonceSyncer_Sync(t *testing.T) { })).Return(uint64(2), nil) ns = txmgr.NewNonceSyncer(borm, logger.TestLogger(t), ethClient, ethKeyStore) - require.NoError(t, ns.Sync(testutils.Context(t), sendingKeys[0])) + require.NoError(t, ns.Sync(testutils.Context(t), sendingKeys[0].Address.Address())) assertDatabaseNonce(t, db, key1, 1) }) } diff --git a/core/chains/evm/txmgr/orm_test.go b/core/chains/evm/txmgr/orm_test.go index 3d8a0e478e8..136c0b2a227 100644 --- a/core/chains/evm/txmgr/orm_test.go +++ b/core/chains/evm/txmgr/orm_test.go @@ -1305,7 +1305,7 @@ func TestORM_UpdateEthKeyNextNonce(t *testing.T) { err := borm.UpdateEthKeyNextNonce(int64(24), int64(0), fromAddress, *ethClient.ChainID()) require.NoError(t, err) - newNextNonce, err := ethKeyStore.GetNextNonce(fromAddress, ethClient.ChainID()) + newNextNonce, err := ethKeyStore.NextSequence(fromAddress, ethClient.ChainID()) require.NoError(t, err) assert.Equal(t, int64(24), newNextNonce) }) diff --git a/core/chains/evm/txmgr/txmgr.go b/core/chains/evm/txmgr/txmgr.go index d49838cb564..dea28c29654 100644 --- a/core/chains/evm/txmgr/txmgr.go +++ b/core/chains/evm/txmgr/txmgr.go @@ -22,7 +22,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/null" "github.com/smartcontractkit/chainlink/v2/core/services" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -49,16 +48,6 @@ type Config interface { TriggerFallbackDBPollInterval() time.Duration } -// KeyStore encompasses the subset of keystore used by txmgr -type KeyStore interface { - CheckEnabled(address common.Address, chainID *big.Int) error - EnabledKeysForChain(chainID *big.Int) (keys []ethkey.KeyV2, err error) - GetNextNonce(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) - GetStatesForChain(chainID *big.Int) ([]ethkey.State, error) - IncrementNextNonce(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error - SubscribeToKeyChanges() (ch chan struct{}, unsub func()) -} - // For more information about the Txm architecture, see the design doc: // https://www.notion.so/chainlink/Txm-Architecture-Overview-9dc62450cd7a443ba9e7dceffa1a8d6b @@ -99,7 +88,7 @@ type Txm struct { q pg.Q ethClient evmclient.Client config Config - keyStore KeyStore + keyStore txmgrtypes.KeyStore[common.Address, *big.Int, int64] eventBroadcaster pg.EventBroadcaster chainID big.Int checkerFactory TransmitCheckerFactory @@ -126,7 +115,7 @@ func (b *Txm) RegisterResumeCallback(fn ResumeCallback) { } // NewTxm creates a new Txm with the given configuration. -func NewTxm(db *sqlx.DB, ethClient evmclient.Client, cfg Config, keyStore KeyStore, eventBroadcaster pg.EventBroadcaster, lggr logger.Logger, checkerFactory TransmitCheckerFactory, +func NewTxm(db *sqlx.DB, ethClient evmclient.Client, cfg Config, keyStore txmgrtypes.KeyStore[common.Address, *big.Int, int64], eventBroadcaster pg.EventBroadcaster, lggr logger.Logger, checkerFactory TransmitCheckerFactory, fwdMgr txmgrtypes.ForwarderManager[common.Address], txAttemptBuilder txmgrtypes.TxAttemptBuilder[*evmtypes.Head, gas.EvmFee, common.Address, common.Hash, EthTx, EthTxAttempt], ) *Txm { @@ -169,20 +158,20 @@ func NewTxm(db *sqlx.DB, ethClient evmclient.Client, cfg Config, keyStore KeySto // The provided context can be used to terminate Start sequence. func (b *Txm) Start(ctx context.Context) (merr error) { return b.StartOnce("Txm", func() error { - keyStates, err := b.keyStore.GetStatesForChain(&b.chainID) + enabledAddresses, err := b.keyStore.EnabledAddressesForChain(&b.chainID) if err != nil { return errors.Wrap(err, "Txm: failed to load key states") } - if len(keyStates) > 0 { - b.logger.Debugw(fmt.Sprintf("Booting with %d keys", len(keyStates)), "keys", keyStates) + if len(enabledAddresses) > 0 { + b.logger.Debugw(fmt.Sprintf("Booting with %d keys", len(enabledAddresses)), "keys", enabledAddresses) } else { b.logger.Warnf("Chain %s does not have any eth keys, no transactions will be sent on this chain", b.chainID.String()) } var ms services.MultiStart - b.ethBroadcaster = NewEthBroadcaster(b.orm, b.ethClient, b.config, b.keyStore, b.eventBroadcaster, keyStates, b.resumeCallback, b.txAttemptBuilder, b.logger, b.checkerFactory, b.config.EvmNonceAutoSync()) - b.ethConfirmer = NewEthConfirmer(b.orm, b.ethClient, b.config, b.keyStore, keyStates, b.resumeCallback, b.txAttemptBuilder, b.logger) + b.ethBroadcaster = NewEthBroadcaster(b.orm, b.ethClient, b.config, b.keyStore, b.eventBroadcaster, enabledAddresses, b.resumeCallback, b.txAttemptBuilder, b.logger, b.checkerFactory, b.config.EvmNonceAutoSync()) + b.ethConfirmer = NewEthConfirmer(b.orm, b.ethClient, b.config, b.keyStore, enabledAddresses, b.resumeCallback, b.txAttemptBuilder, b.logger) if err = ms.Start(ctx, b.ethBroadcaster); err != nil { return errors.Wrap(err, "Txm: EthBroadcaster failed to start") } @@ -195,7 +184,7 @@ func (b *Txm) Start(ctx context.Context) (merr error) { } b.wg.Add(1) - go b.runLoop(b.ethBroadcaster, b.ethConfirmer, keyStates) + go b.runLoop(b.ethBroadcaster, b.ethConfirmer, enabledAddresses) <-b.chSubbed if b.reaper != nil { @@ -291,7 +280,7 @@ func (b *Txm) HealthReport() map[string]error { return report } -func (b *Txm) runLoop(eb *EthBroadcaster, ec *EthConfirmer, keyStates []ethkey.State) { +func (b *Txm) runLoop(eb *EthBroadcaster, ec *EthConfirmer, enabledAddresses []common.Address) { // eb, ec and keyStates can all be modified by the runloop. // This is concurrent-safe because the runloop ensures serial access. defer b.wg.Done() @@ -324,8 +313,8 @@ func (b *Txm) runLoop(eb *EthBroadcaster, ec *EthConfirmer, keyStates []ethkey.S close(r.done) } - eb = NewEthBroadcaster(b.orm, b.ethClient, b.config, b.keyStore, b.eventBroadcaster, keyStates, b.resumeCallback, b.txAttemptBuilder, b.logger, b.checkerFactory, false) - ec = NewEthConfirmer(b.orm, b.ethClient, b.config, b.keyStore, keyStates, b.resumeCallback, b.txAttemptBuilder, b.logger) + eb = NewEthBroadcaster(b.orm, b.ethClient, b.config, b.keyStore, b.eventBroadcaster, enabledAddresses, b.resumeCallback, b.txAttemptBuilder, b.logger, b.checkerFactory, false) + ec = NewEthConfirmer(b.orm, b.ethClient, b.config, b.keyStore, enabledAddresses, b.resumeCallback, b.txAttemptBuilder, b.logger) var wg sync.WaitGroup // two goroutines to handle independent backoff retries starting: @@ -422,13 +411,13 @@ func (b *Txm) runLoop(eb *EthBroadcaster, ec *EthConfirmer, keyStates []ethkey.S continue } var err error - keyStates, err = b.keyStore.GetStatesForChain(&b.chainID) + enabledAddresses, err = b.keyStore.EnabledAddressesForChain(&b.chainID) if err != nil { b.logger.Criticalf("Failed to reload key states after key change") b.SvcErrBuffer.Append(err) continue } - b.logger.Debugw("Keys changed, reloading", "keyStates", keyStates) + b.logger.Debugw("Keys changed, reloading", "keyStates", enabledAddresses) execReset(nil) } diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 5d05682a3f2..0b5d0c64f4a 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -31,7 +31,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" ksmocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/pg" pgmocks "github.com/smartcontractkit/chainlink/v2/core/services/pg/mocks" @@ -478,8 +477,7 @@ func TestTxm_Lifecycle(t *testing.T) { config.On("GasEstimatorMode").Return("FixedPrice") config.On("LogSQL").Return(false).Maybe() config.On("EvmRPCDefaultBatchSize").Return(uint32(4)).Maybe() - kst.On("GetStatesForChain", &cltest.FixtureChainID).Return([]ethkey.State{}, nil).Once() - kst.On("EnabledKeysForChain", &cltest.FixtureChainID).Return([]ethkey.KeyV2{}, nil) + kst.On("EnabledAddressesForChain", &cltest.FixtureChainID).Return([]gethcommon.Address{}, nil).Twice() keyChangeCh := make(chan struct{}) unsub := cltest.NewAwaiter() @@ -505,7 +503,7 @@ func TestTxm_Lifecycle(t *testing.T) { keyState := cltest.MustGenerateRandomKeyState(t) - kst.On("GetStatesForChain", &cltest.FixtureChainID).Return([]ethkey.State{keyState}, nil).Once() + kst.On("EnabledAddressesForChain", &cltest.FixtureChainID).Return([]gethcommon.Address{keyState.Address.Address()}, nil) sub.On("Close").Return() ethClient.On("PendingNonceAt", mock.AnythingOfType("*context.cancelCtx"), keyState.Address.Address()).Return(uint64(0), nil).Maybe() config.On("TriggerFallbackDBPollInterval").Return(1 * time.Hour).Maybe() diff --git a/core/cmd/local_client.go b/core/cmd/local_client.go index 8abc0b47d28..1d042d58b42 100644 --- a/core/cmd/local_client.go +++ b/core/cmd/local_client.go @@ -585,14 +585,14 @@ func (cli *Client) RebroadcastTransactions(c *clipkg.Context) (err error) { cli.Logger.Infof("Rebroadcasting transactions from %v to %v", beginningNonce, endingNonce) - keyStates, err := keyStore.Eth().GetStatesForChain(chain.ID()) + enabledAddresses, err := keyStore.Eth().EnabledAddressesForChain(chain.ID()) if err != nil { return cli.errorOut(err) } orm := txmgr.NewORM(app.GetSqlxDB(), lggr, cli.Config) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ChainID(), chain.Config(), keyStore.Eth(), nil) - ec := txmgr.NewEthConfirmer(orm, ethClient, chain.Config(), keyStore.Eth(), keyStates, nil, txBuilder, chain.Logger()) + ec := txmgr.NewEthConfirmer(orm, ethClient, chain.Config(), keyStore.Eth(), enabledAddresses, nil, txBuilder, chain.Logger()) err = ec.ForceRebroadcast(beginningNonce, endingNonce, gasPriceWei, address, uint32(overrideGasLimit)) return cli.errorOut(err) } diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 4d2e474cb73..34c0196ccb7 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -200,7 +200,7 @@ func NewJobPipelineV2(t testing.TB, cfg config.BasicConfig, cc evm.ChainSet, db } // NewEthBroadcaster creates a new txmgr.EthBroadcaster for use in testing. -func NewEthBroadcaster(t testing.TB, orm txmgr.ORM, ethClient evmclient.Client, keyStore keystore.Eth, config evmconfig.ChainScopedConfig, keyStates []ethkey.State, checkerFactory txmgr.TransmitCheckerFactory, nonceAutoSync bool) *txmgr.EthBroadcaster { +func NewEthBroadcaster(t testing.TB, orm txmgr.ORM, ethClient evmclient.Client, keyStore keystore.Eth, config evmconfig.ChainScopedConfig, addresses []common.Address, checkerFactory txmgr.TransmitCheckerFactory, nonceAutoSync bool) *txmgr.EthBroadcaster { t.Helper() eventBroadcaster := NewEventBroadcaster(t, config.DatabaseURL()) err := eventBroadcaster.Start(testutils.Context(t.(*testing.T))) @@ -210,7 +210,7 @@ func NewEthBroadcaster(t testing.TB, orm txmgr.ORM, ethClient evmclient.Client, estimator := gas.NewWrappedEvmEstimator(gas.NewFixedPriceEstimator(config, lggr), config) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ChainID(), config, keyStore, estimator) return txmgr.NewEthBroadcaster(orm, ethClient, config, keyStore, eventBroadcaster, - keyStates, nil, txBuilder, lggr, + addresses, nil, txBuilder, lggr, checkerFactory, nonceAutoSync) } @@ -219,12 +219,12 @@ func NewEventBroadcaster(t testing.TB, dbURL url.URL) pg.EventBroadcaster { return pg.NewEventBroadcaster(dbURL, 0, 0, lggr, uuid.NewV4()) } -func NewEthConfirmer(t testing.TB, orm txmgr.ORM, ethClient evmclient.Client, config evmconfig.ChainScopedConfig, ks keystore.Eth, keyStates []ethkey.State, fn txmgr.ResumeCallback) *txmgr.EthConfirmer { +func NewEthConfirmer(t testing.TB, orm txmgr.ORM, ethClient evmclient.Client, config evmconfig.ChainScopedConfig, ks keystore.Eth, addresses []common.Address, fn txmgr.ResumeCallback) *txmgr.EthConfirmer { t.Helper() lggr := logger.TestLogger(t) estimator := gas.NewWrappedEvmEstimator(gas.NewFixedPriceEstimator(config, lggr), config) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ChainID(), config, ks, estimator) - ec := txmgr.NewEthConfirmer(orm, ethClient, config, ks, keyStates, fn, txBuilder, lggr) + ec := txmgr.NewEthConfirmer(orm, ethClient, config, ks, addresses, fn, txBuilder, lggr) return ec } diff --git a/core/services/keystore/eth.go b/core/services/keystore/eth.go index fd9ad65ac3f..7864b8d760f 100644 --- a/core/services/keystore/eth.go +++ b/core/services/keystore/eth.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/pkg/errors" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) @@ -31,8 +32,8 @@ type Eth interface { Disable(address common.Address, chainID *big.Int, qopts ...pg.QOpt) error Reset(address common.Address, chainID *big.Int, nonce int64, qopts ...pg.QOpt) error - GetNextNonce(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) - IncrementNextNonce(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error + NextSequence(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) + IncrementNextSequence(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error EnsureKeys(chainIDs ...*big.Int) error SubscribeToKeyChanges() (ch chan struct{}, unsub func()) @@ -46,6 +47,7 @@ type Eth interface { GetState(id string, chainID *big.Int) (ethkey.State, error) GetStatesForKeys([]ethkey.KeyV2) ([]ethkey.State, error) GetStatesForChain(chainID *big.Int) ([]ethkey.State, error) + EnabledAddressesForChain(chainID *big.Int) (addresses []common.Address, err error) XXXTestingOnlySetState(ethkey.State) XXXTestingOnlyAdd(key ethkey.KeyV2) @@ -59,6 +61,8 @@ type eth struct { var _ Eth = ð{} +var _ txmgrtypes.KeyStore[common.Address, *big.Int, int64] = (*eth)(nil) + func newEthKeyStore(km *keyManager) *eth { return ð{ keyManager: km, @@ -178,13 +182,13 @@ func (ks *eth) Export(id string, password string) ([]byte, error) { } // Get the next nonce for the given key and chain. It is safest to always to go the DB for this -func (ks *eth) GetNextNonce(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (nonce int64, err error) { +func (ks *eth) NextSequence(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (nonce int64, err error) { if !ks.exists(address) { return 0, errors.Errorf("key with address %s does not exist", address.Hex()) } nonce, err = ks.orm.getNextNonce(address, chainID, qopts...) if err != nil { - return 0, errors.Wrap(err, "GetNextNonce failed") + return 0, errors.Wrap(err, "NextSequence failed") } ks.lock.Lock() defer ks.lock.Unlock() @@ -201,7 +205,7 @@ func (ks *eth) GetNextNonce(address common.Address, chainID *big.Int, qopts ...p } // IncrementNextNonce increments keys.next_nonce by 1 -func (ks *eth) IncrementNextNonce(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error { +func (ks *eth) IncrementNextSequence(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error { if !ks.exists(address) { return errors.Errorf("key with address %s does not exist", address.Hex()) } @@ -489,6 +493,23 @@ func (ks *eth) GetStatesForChain(chainID *big.Int) (states []ethkey.State, err e return } +func (ks *eth) EnabledAddressesForChain(chainID *big.Int) (addresses []common.Address, err error) { + if chainID == nil { + return nil, errors.New("chainID must be non-nil") + } + ks.lock.RLock() + defer ks.lock.RUnlock() + if ks.isLocked() { + return nil, ErrLocked + } + for _, s := range ks.keyStates.ChainIDKeyID[chainID.String()] { + if !s.Disabled { + addresses = append(addresses, s.Address.Address()) + } + } + return +} + func (ks *eth) getV1KeysAsV2() (keys []ethkey.KeyV2, nonces []int64, fundings []bool, _ error) { v1Keys, err := ks.orm.GetEncryptedV1EthKeys() if err != nil { diff --git a/core/services/keystore/eth_test.go b/core/services/keystore/eth_test.go index 71bef6b0cb4..06856ace0b4 100644 --- a/core/services/keystore/eth_test.go +++ b/core/services/keystore/eth_test.go @@ -162,6 +162,47 @@ func Test_EthKeyStore(t *testing.T) { assert.Error(t, err) assert.EqualError(t, err, "chainID must be non-nil") }) + + t.Run("EnabledAddressesForChain with specified chain ID", func(t *testing.T) { + defer reset() + key, err := ethKeyStore.Create(testutils.FixtureChainID) + require.NoError(t, err) + key2, err := ethKeyStore.Create(big.NewInt(1337)) + require.NoError(t, err) + testutils.AssertCount(t, db, "evm_key_states", 2) + keys, err := ethKeyStore.GetAll() + require.NoError(t, err) + assert.Len(t, keys, 2) + + //get enabled addresses for FixtureChainID + enabledAddresses, err := ethKeyStore.EnabledAddressesForChain(testutils.FixtureChainID) + require.NoError(t, err) + require.Len(t, enabledAddresses, 1) + require.Equal(t, key.Address, enabledAddresses[0]) + + //get enabled addresses for chain 1337 + enabledAddresses, err = ethKeyStore.EnabledAddressesForChain(big.NewInt(1337)) + require.NoError(t, err) + require.Len(t, enabledAddresses, 1) + require.Equal(t, key2.Address, enabledAddresses[0]) + + // /get enabled addresses for nil chain ID + _, err = ethKeyStore.EnabledAddressesForChain(nil) + assert.Error(t, err) + assert.EqualError(t, err, "chainID must be non-nil") + + // disable the key for chain FixtureChainID + err = ethKeyStore.Disable(key.Address, testutils.FixtureChainID) + require.NoError(t, err) + + enabledAddresses, err = ethKeyStore.EnabledAddressesForChain(testutils.FixtureChainID) + require.NoError(t, err) + assert.Len(t, enabledAddresses, 0) + enabledAddresses, err = ethKeyStore.EnabledAddressesForChain(big.NewInt(1337)) + require.NoError(t, err) + assert.Len(t, enabledAddresses, 1) + require.Equal(t, key2.Address, enabledAddresses[0]) + }) } func Test_EthKeyStore_GetRoundRobinAddress(t *testing.T) { @@ -602,7 +643,7 @@ func Test_EthKeyStore_Reset(t *testing.T) { err := ks.Reset(k1.Address, testutils.FixtureChainID, newNonce) assert.NoError(t, err) - nonce, err := ks.GetNextNonce(k1.Address, testutils.FixtureChainID) + nonce, err := ks.NextSequence(k1.Address, testutils.FixtureChainID) require.NoError(t, err) assert.Equal(t, nonce, newNonce) @@ -628,7 +669,7 @@ func Test_EthKeyStore_Reset(t *testing.T) { }) } -func Test_GetNextNonce(t *testing.T) { +func Test_NextSequence(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) @@ -640,26 +681,26 @@ func Test_GetNextNonce(t *testing.T) { _, addr1 := cltest.MustInsertRandomKey(t, ks, testutils.FixtureChainID, randNonce) cltest.MustInsertRandomKey(t, ks, testutils.FixtureChainID) - nonce, err := ks.GetNextNonce(addr1, testutils.FixtureChainID) + nonce, err := ks.NextSequence(addr1, testutils.FixtureChainID) require.NoError(t, err) assert.Equal(t, randNonce, nonce) - _, err = ks.GetNextNonce(addr1, testutils.SimulatedChainID) + _, err = ks.NextSequence(addr1, testutils.SimulatedChainID) require.Error(t, err) - assert.Contains(t, err.Error(), fmt.Sprintf("GetNextNonce failed: key with address %s is not enabled for chain %s: sql: no rows in result set", addr1.Hex(), testutils.SimulatedChainID.String())) + assert.Contains(t, err.Error(), fmt.Sprintf("NextSequence failed: key with address %s is not enabled for chain %s: sql: no rows in result set", addr1.Hex(), testutils.SimulatedChainID.String())) randAddr1 := utils.RandomAddress() - _, err = ks.GetNextNonce(randAddr1, testutils.FixtureChainID) + _, err = ks.NextSequence(randAddr1, testutils.FixtureChainID) require.Error(t, err) assert.Contains(t, err.Error(), fmt.Sprintf("key with address %s does not exist", randAddr1.Hex())) randAddr2 := utils.RandomAddress() - _, err = ks.GetNextNonce(randAddr2, testutils.NewRandomEVMChainID()) + _, err = ks.NextSequence(randAddr2, testutils.NewRandomEVMChainID()) require.Error(t, err) assert.Contains(t, err.Error(), fmt.Sprintf("key with address %s does not exist", randAddr2.Hex())) } -func Test_IncrementNextNonce(t *testing.T) { +func Test_IncrementNextSequence(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) @@ -671,25 +712,25 @@ func Test_IncrementNextNonce(t *testing.T) { _, addr1 := cltest.MustInsertRandomKey(t, ks, testutils.FixtureChainID, randNonce) cltest.MustInsertRandomKey(t, ks, testutils.FixtureChainID) - err := ks.IncrementNextNonce(addr1, testutils.FixtureChainID, randNonce-1) + err := ks.IncrementNextSequence(addr1, testutils.FixtureChainID, randNonce-1) assert.ErrorIs(t, err, sql.ErrNoRows) - err = ks.IncrementNextNonce(addr1, testutils.FixtureChainID, randNonce) + err = ks.IncrementNextSequence(addr1, testutils.FixtureChainID, randNonce) require.NoError(t, err) var nonce int64 require.NoError(t, db.Get(&nonce, `SELECT next_nonce FROM evm_key_states WHERE address = $1 AND evm_chain_id = $2`, addr1, testutils.FixtureChainID.String())) assert.Equal(t, randNonce+1, nonce) - err = ks.IncrementNextNonce(addr1, testutils.SimulatedChainID, randNonce+1) + err = ks.IncrementNextSequence(addr1, testutils.SimulatedChainID, randNonce+1) assert.ErrorIs(t, err, sql.ErrNoRows) randAddr1 := utils.RandomAddress() - err = ks.IncrementNextNonce(randAddr1, testutils.FixtureChainID, randNonce+1) + err = ks.IncrementNextSequence(randAddr1, testutils.FixtureChainID, randNonce+1) require.Error(t, err) assert.Contains(t, err.Error(), fmt.Sprintf("key with address %s does not exist", randAddr1.Hex())) randAddr2 := utils.RandomAddress() - err = ks.IncrementNextNonce(randAddr2, testutils.NewRandomEVMChainID(), randNonce+1) + err = ks.IncrementNextSequence(randAddr2, testutils.NewRandomEVMChainID(), randNonce+1) require.Error(t, err) assert.Contains(t, err.Error(), fmt.Sprintf("key with address %s does not exist", randAddr2.Hex())) diff --git a/core/services/keystore/mocks/eth.go b/core/services/keystore/mocks/eth.go index 3c485a536c1..0b84d2e6706 100644 --- a/core/services/keystore/mocks/eth.go +++ b/core/services/keystore/mocks/eth.go @@ -130,6 +130,32 @@ func (_m *Eth) Enable(address common.Address, chainID *big.Int, qopts ...pg.QOpt return r0 } +// EnabledAddressesForChain provides a mock function with given fields: chainID +func (_m *Eth) EnabledAddressesForChain(chainID *big.Int) ([]common.Address, error) { + ret := _m.Called(chainID) + + var r0 []common.Address + var r1 error + if rf, ok := ret.Get(0).(func(*big.Int) ([]common.Address, error)); ok { + return rf(chainID) + } + if rf, ok := ret.Get(0).(func(*big.Int) []common.Address); ok { + r0 = rf(chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]common.Address) + } + } + + if rf, ok := ret.Get(1).(func(*big.Int) error); ok { + r1 = rf(chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // EnabledKeysForChain provides a mock function with given fields: chainID func (_m *Eth) EnabledKeysForChain(chainID *big.Int) ([]ethkey.KeyV2, error) { ret := _m.Called(chainID) @@ -252,37 +278,6 @@ func (_m *Eth) GetAll() ([]ethkey.KeyV2, error) { return r0, r1 } -// GetNextNonce provides a mock function with given fields: address, chainID, qopts -func (_m *Eth) GetNextNonce(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) { - _va := make([]interface{}, len(qopts)) - for _i := range qopts { - _va[_i] = qopts[_i] - } - var _ca []interface{} - _ca = append(_ca, address, chainID) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(common.Address, *big.Int, ...pg.QOpt) (int64, error)); ok { - return rf(address, chainID, qopts...) - } - if rf, ok := ret.Get(0).(func(common.Address, *big.Int, ...pg.QOpt) int64); ok { - r0 = rf(address, chainID, qopts...) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(common.Address, *big.Int, ...pg.QOpt) error); ok { - r1 = rf(address, chainID, qopts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // GetRoundRobinAddress provides a mock function with given fields: chainID, addresses func (_m *Eth) GetRoundRobinAddress(chainID *big.Int, addresses ...common.Address) (common.Address, error) { _va := make([]interface{}, len(addresses)) @@ -423,8 +418,8 @@ func (_m *Eth) Import(keyJSON []byte, password string, chainIDs ...*big.Int) (et return r0, r1 } -// IncrementNextNonce provides a mock function with given fields: address, chainID, currentNonce, qopts -func (_m *Eth) IncrementNextNonce(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error { +// IncrementNextSequence provides a mock function with given fields: address, chainID, currentNonce, qopts +func (_m *Eth) IncrementNextSequence(address common.Address, chainID *big.Int, currentNonce int64, qopts ...pg.QOpt) error { _va := make([]interface{}, len(qopts)) for _i := range qopts { _va[_i] = qopts[_i] @@ -444,6 +439,37 @@ func (_m *Eth) IncrementNextNonce(address common.Address, chainID *big.Int, curr return r0 } +// NextSequence provides a mock function with given fields: address, chainID, qopts +func (_m *Eth) NextSequence(address common.Address, chainID *big.Int, qopts ...pg.QOpt) (int64, error) { + _va := make([]interface{}, len(qopts)) + for _i := range qopts { + _va[_i] = qopts[_i] + } + var _ca []interface{} + _ca = append(_ca, address, chainID) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(common.Address, *big.Int, ...pg.QOpt) (int64, error)); ok { + return rf(address, chainID, qopts...) + } + if rf, ok := ret.Get(0).(func(common.Address, *big.Int, ...pg.QOpt) int64); ok { + r0 = rf(address, chainID, qopts...) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(common.Address, *big.Int, ...pg.QOpt) error); ok { + r1 = rf(address, chainID, qopts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Reset provides a mock function with given fields: address, chainID, nonce, qopts func (_m *Eth) Reset(address common.Address, chainID *big.Int, nonce int64, qopts ...pg.QOpt) error { _va := make([]interface{}, len(qopts))