From 07c9f6cadd449989b21977af461305ded8e5b2f0 Mon Sep 17 00:00:00 2001 From: amit-momin <108959691+amit-momin@users.noreply.github.com> Date: Mon, 18 Mar 2024 20:07:42 -0500 Subject: [PATCH] Extract sequence tracking from the Broadcaster (#12353) * Extracted sequence tracking from the Broadcaster into a separate component * Fixed test and linting * Updated NonceTracker to use TXM client * Fixed tests * Moved NonceTracker initialization into the EVM Broadcaster builder * Fixed issue with in-progress tx during startup * Fixed linting * Added changeset * Updated changeset message --------- Co-authored-by: Prashant Yadav <34992934+prashantkumar1982@users.noreply.github.com> --- .changeset/silver-months-glow.md | 5 + common/txmgr/broadcaster.go | 172 +-------- common/txmgr/sequence_syncer.go | 11 - common/txmgr/txmgr.go | 3 - common/txmgr/types/sequence_tracker.go | 26 ++ core/chains/evm/txmgr/broadcaster_test.go | 404 +++++--------------- core/chains/evm/txmgr/builder.go | 12 +- core/chains/evm/txmgr/evm_tx_store.go | 20 - core/chains/evm/txmgr/evm_tx_store_test.go | 2 +- core/chains/evm/txmgr/models.go | 2 +- core/chains/evm/txmgr/nonce_syncer.go | 106 ----- core/chains/evm/txmgr/nonce_syncer_test.go | 114 ------ core/chains/evm/txmgr/nonce_tracker.go | 186 +++++++++ core/chains/evm/txmgr/nonce_tracker_test.go | 293 ++++++++++++++ core/chains/evm/types/nonce.go | 4 - core/services/vrf/v2/integration_v2_test.go | 2 +- core/services/vrf/v2/listener_v2_test.go | 2 +- 17 files changed, 625 insertions(+), 739 deletions(-) create mode 100644 .changeset/silver-months-glow.md delete mode 100644 common/txmgr/sequence_syncer.go create mode 100644 common/txmgr/types/sequence_tracker.go delete mode 100644 core/chains/evm/txmgr/nonce_syncer.go delete mode 100644 core/chains/evm/txmgr/nonce_syncer_test.go create mode 100644 core/chains/evm/txmgr/nonce_tracker.go create mode 100644 core/chains/evm/txmgr/nonce_tracker_test.go diff --git a/.changeset/silver-months-glow.md b/.changeset/silver-months-glow.md new file mode 100644 index 00000000000..195525353fc --- /dev/null +++ b/.changeset/silver-months-glow.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fixed a race condition bug around EVM nonce management, which could cause the Node to skip a nonce and get stuck. diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index f56e6b0368c..5c9d9ae55bb 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "slices" "sync" "time" @@ -112,13 +111,13 @@ type Broadcaster[ txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE] client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ] - resumeCallback ResumeCallback - chainID CHAIN_ID - config txmgrtypes.BroadcasterChainConfig - feeConfig txmgrtypes.BroadcasterFeeConfig - txConfig txmgrtypes.BroadcasterTransactionsConfig - listenerConfig txmgrtypes.BroadcasterListenerConfig + sequenceTracker txmgrtypes.SequenceTracker[ADDR, SEQ] + resumeCallback ResumeCallback + chainID CHAIN_ID + config txmgrtypes.BroadcasterChainConfig + feeConfig txmgrtypes.BroadcasterFeeConfig + txConfig txmgrtypes.BroadcasterTransactionsConfig + listenerConfig txmgrtypes.BroadcasterListenerConfig // autoSyncSequence, if set, will cause Broadcaster to fast-forward the sequence // when Start is called @@ -141,10 +140,6 @@ type Broadcaster[ initSync sync.Mutex isStarted bool - - sequenceLock sync.RWMutex - nextSequenceMap map[ADDR]SEQ - generateNextSequence types.GenerateNextSequenceFunc[SEQ] } func NewBroadcaster[ @@ -164,11 +159,10 @@ func NewBroadcaster[ listenerConfig txmgrtypes.BroadcasterListenerConfig, keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ], txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], - sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ], + sequenceTracker txmgrtypes.SequenceTracker[ADDR, SEQ], lggr logger.Logger, checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], autoSyncSequence bool, - generateNextSequence types.GenerateNextSequenceFunc[SEQ], ) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { lggr = logger.Named(lggr, "Broadcaster") b := &Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{ @@ -176,7 +170,6 @@ func NewBroadcaster[ txStore: txStore, client: client, TxAttemptBuilder: txAttemptBuilder, - sequenceSyncer: sequenceSyncer, chainID: client.ConfiguredChainID(), config: config, feeConfig: feeConfig, @@ -185,10 +178,10 @@ func NewBroadcaster[ ks: keystore, checkerFactory: checkerFactory, autoSyncSequence: autoSyncSequence, + sequenceTracker: sequenceTracker, } b.processUnstartedTxsImpl = b.processUnstartedTxs - b.generateNextSequence = generateNextSequence return b } @@ -222,9 +215,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star eb.wg = sync.WaitGroup{} eb.wg.Add(len(eb.enabledAddresses)) eb.triggers = make(map[ADDR]chan struct{}) - eb.sequenceLock.Lock() - eb.nextSequenceMap = eb.loadNextSequenceMap(ctx, eb.enabledAddresses) - eb.sequenceLock.Unlock() + eb.sequenceTracker.LoadNextSequences(ctx, eb.enabledAddresses) for _, addr := range eb.enabledAddresses { triggerCh := make(chan struct{}, 1) eb.triggers[addr] = triggerCh @@ -284,46 +275,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig } } -// Load the next sequence map using the tx table or on-chain (if not found in tx table) -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(ctx context.Context, addresses []ADDR) map[ADDR]SEQ { - nextSequenceMap := make(map[ADDR]SEQ) - for _, address := range addresses { - seq, err := eb.getSequenceForAddr(ctx, address) - if err == nil { - nextSequenceMap[address] = seq - } - } - - return nextSequenceMap -} - -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getSequenceForAddr(ctx context.Context, address ADDR) (seq SEQ, err error) { - // Get the highest sequence from the tx table - // Will need to be incremented since this sequence is already used - seq, err = eb.txStore.FindLatestSequence(ctx, address, eb.chainID) - if err == nil { - seq = eb.generateNextSequence(seq) - return seq, nil - } - // Look for nonce on-chain if no tx found for address in TxStore or if error occurred - // Returns the nonce that should be used for the next transaction so no need to increment - seq, err = eb.client.PendingSequenceAt(ctx, address) - if err == nil { - return seq, nil - } - eb.lggr.Criticalw("failed to retrieve next sequence from on-chain for address: ", "address", address.String()) - return seq, err - -} - -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newSequenceSyncBackoff() backoff.Backoff { - return backoff.Backoff{ - Min: 100 * time.Millisecond, - Max: 5 * time.Second, - Jitter: true, - } -} - func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newResendBackoff() backoff.Backoff { return backoff.Backoff{ Min: 1 * time.Second, @@ -340,7 +291,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni if eb.autoSyncSequence { eb.lggr.Debugw("Auto-syncing sequence", "address", addr.String()) - eb.SyncSequence(ctx, addr) + eb.sequenceTracker.SyncSequence(ctx, addr, eb.chStop) if ctx.Err() != nil { return } @@ -393,46 +344,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni } } -// syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR) { - sequenceSyncRetryBackoff := eb.newSequenceSyncBackoff() - localSequence, err := eb.GetNextSequence(ctx, addr) - // Address not found in map so skip sync - if err != nil { - eb.lggr.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err) - return - } - - // Enter loop with retries - var attempt int - for { - select { - case <-eb.chStop: - return - case <-time.After(sequenceSyncRetryBackoff.Duration()): - attempt++ - newNextSequence, err := eb.sequenceSyncer.Sync(ctx, addr, localSequence) - if err != nil { - if attempt > 5 { - eb.lggr.Criticalw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err) - eb.SvcErrBuffer.Append(err) - } else { - eb.lggr.Warnw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err) - } - continue - } - // Found new sequence to use from on-chain - if localSequence.String() != newNextSequence.String() { - eb.lggr.Infow("Fast-forward sequence", "address", addr, "newNextSequence", newNextSequence, "oldNextSequence", localSequence) - // Set new sequence in the map - eb.SetNextSequence(addr, newNextSequence) - } - return - - } - } -} - // ProcessUnstartedTxs picks up and handles all txes in the queue // revive:disable:error-return func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) ProcessUnstartedTxs(ctx context.Context, addr ADDR) (retryable bool, err error) { @@ -619,18 +530,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // and hand off to the confirmer to get the receipt (or mark as // failed). observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now()) - // Check if from_address exists in map to ensure it is valid before broadcasting - var sequence SEQ - sequence, err = eb.GetNextSequence(ctx, etx.FromAddress) - if err != nil { - return err, true - } err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast) if err != nil { return err, true } // Increment sequence if successfully broadcasted - eb.IncrementNextSequence(etx.FromAddress, sequence) + eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) return err, true case client.Underpriced: return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt) @@ -677,18 +582,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // transaction to have been accepted. In this case, the right thing to // do is assume success and hand off to Confirmer - // Check if from_address exists in map to ensure it is valid before broadcasting - var sequence SEQ - sequence, err = eb.GetNextSequence(ctx, etx.FromAddress) - if err != nil { - return err, true - } err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast) if err != nil { return err, true } // Increment sequence if successfully broadcasted - eb.IncrementNextSequence(etx.FromAddress, sequence) + eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence) return err, true } // Either the unknown error prevented the transaction from being mined, or @@ -716,7 +615,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next return nil, fmt.Errorf("findNextUnstartedTransactionFromAddress failed: %w", err) } - sequence, err := eb.GetNextSequence(ctx, etx.FromAddress) + sequence, err := eb.sequenceTracker.GetNextSequence(ctx, etx.FromAddress) if err != nil { return nil, err } @@ -805,49 +704,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save return eb.txStore.UpdateTxFatalError(ctx, etx) } -// Used to get the next usable sequence for a transaction -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(ctx context.Context, address ADDR) (seq SEQ, err error) { - eb.sequenceLock.Lock() - defer eb.sequenceLock.Unlock() - // Get next sequence from map - seq, exists := eb.nextSequenceMap[address] - if exists { - return seq, nil - } - - eb.lggr.Infow("address not found in local next sequence map. Attempting to search and populate sequence.", "address", address.String()) - // Check if address is in the enabled address list - if !slices.Contains(eb.enabledAddresses, address) { - return seq, fmt.Errorf("address disabled: %s", address) - } - - // Try to retrieve next sequence from tx table or on-chain to load the map - // A scenario could exist where loading the map during startup failed (e.g. All configured RPC's are unreachable at start) - // The expectation is that the node does not fail startup so sequences need to be loaded during runtime - foundSeq, err := eb.getSequenceForAddr(ctx, address) - if err != nil { - return seq, fmt.Errorf("failed to find next sequence for address: %s", address) - } - - // Set sequence in map - eb.nextSequenceMap[address] = foundSeq - return foundSeq, nil -} - -// Used to increment the sequence in the mapping to have the next usable one available for the next transaction -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) IncrementNextSequence(address ADDR, seq SEQ) { - eb.sequenceLock.Lock() - defer eb.sequenceLock.Unlock() - eb.nextSequenceMap[address] = eb.generateNextSequence(seq) -} - -// Used to set the next sequence explicitly to a certain value -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetNextSequence(address ADDR, seq SEQ) { - eb.sequenceLock.Lock() - defer eb.sequenceLock.Unlock() - eb.nextSequenceMap[address] = seq -} - func observeTimeUntilBroadcast[CHAIN_ID types.ID](chainID CHAIN_ID, createdAt, broadcastAt time.Time) { duration := float64(broadcastAt.Sub(createdAt)) promTimeUntilBroadcast.WithLabelValues(chainID.String()).Observe(duration) diff --git a/common/txmgr/sequence_syncer.go b/common/txmgr/sequence_syncer.go deleted file mode 100644 index dd4d458dd74..00000000000 --- a/common/txmgr/sequence_syncer.go +++ /dev/null @@ -1,11 +0,0 @@ -package txmgr - -import ( - "context" - - "github.com/smartcontractkit/chainlink/v2/common/types" -) - -type SequenceSyncer[ADDR types.Hashable, TX_HASH types.Hashable, BLOCK_HASH types.Hashable, SEQ types.Sequence] interface { - Sync(ctx context.Context, addr ADDR, localSequence SEQ) (SEQ, error) -} diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 2bd9ed4d2d2..74d218915d9 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -107,7 +107,6 @@ type Txm[ tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] fwdMgr txmgrtypes.ForwarderManager[ADDR] txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ] } func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) { @@ -136,7 +135,6 @@ func NewTxm[ fwdMgr txmgrtypes.ForwarderManager[ADDR], txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], - sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ], broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], @@ -157,7 +155,6 @@ func NewTxm[ reset: make(chan reset), fwdMgr: fwdMgr, txAttemptBuilder: txAttemptBuilder, - sequenceSyncer: sequenceSyncer, broadcaster: broadcaster, confirmer: confirmer, resender: resender, diff --git a/common/txmgr/types/sequence_tracker.go b/common/txmgr/types/sequence_tracker.go new file mode 100644 index 00000000000..7e824aa38cd --- /dev/null +++ b/common/txmgr/types/sequence_tracker.go @@ -0,0 +1,26 @@ +package types + +import ( + "context" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/common/types" +) + +type SequenceTracker[ + // Represents an account address, in native chain format. + ADDR types.Hashable, + // Represents the sequence type for a chain. For example, nonce for EVM. + SEQ types.Sequence, +] interface { + // Load the next sequence needed for transactions for all enabled addresses + LoadNextSequences(context.Context, []ADDR) + // Get the next sequence to assign to a transaction + GetNextSequence(context.Context, ADDR) (SEQ, error) + // Signals the existing sequence has been used so generates and stores the next sequence + // Can be a no-op depending on the chain + GenerateNextSequence(ADDR, SEQ) + // Syncs the local sequence with the one on-chain in case the address as been used externally + // Can be a no-op depending on the chain + SyncSequence(context.Context, ADDR, services.StopChan) +} diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 8c51c557fb5..cb40fecc55f 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -39,8 +39,6 @@ import ( ksmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/keystore/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" - ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" @@ -59,6 +57,7 @@ func NewTestEthBroadcaster( config evmconfig.ChainScopedConfig, checkerFactory txmgr.TransmitCheckerFactory, nonceAutoSync bool, + nonceTracker txmgr.NonceTracker, ) *txmgr.Broadcaster { t.Helper() @@ -68,8 +67,7 @@ func NewTestEthBroadcaster( return gas.NewFixedPriceEstimator(config.EVM().GasEstimator(), ge.BlockHistory(), lggr) }, ge.EIP1559DynamicFees(), nil) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), ge, keyStore, estimator) - txNonceSyncer := txmgr.NewNonceSyncer(txStore, lggr, ethClient) - ethBroadcaster := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTxmConfig(config.EVM()), txmgr.NewEvmTxmFeeConfig(config.EVM().GasEstimator()), config.EVM().Transactions(), config.Database().Listener(), keyStore, txBuilder, txNonceSyncer, lggr, checkerFactory, nonceAutoSync) + ethBroadcaster := txmgrcommon.NewBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTxmConfig(config.EVM()), txmgr.NewEvmTxmFeeConfig(config.EVM().GasEstimator()), config.EVM().Transactions(), config.Database().Listener(), keyStore, txBuilder, nonceTracker, lggr, checkerFactory, nonceAutoSync) // Mark instance as test ethBroadcaster.XXXTestDisableUnstartedTxAutoProcessing() @@ -86,17 +84,17 @@ func TestEthBroadcaster_Lifecycle(t *testing.T) { cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) estimator := gasmocks.NewEvmFeeEstimator(t) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), evmcfg.EVM().GasEstimator(), ethKeyStore, estimator) + txmClient := txmgr.NewEvmTxmClient(ethClient) ethClient.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), nil) eb := txmgr.NewEvmBroadcaster( txStore, - txmgr.NewEvmTxmClient(ethClient), + txmClient, txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), evmcfg.Database().Listener(), ethKeyStore, txBuilder, - nil, logger.Test(t), &testCheckerFactory{}, false, @@ -145,16 +143,16 @@ func TestEthBroadcaster_LoadNextSequenceMapFailure_StartupSuccess(t *testing.T) estimator := gasmocks.NewEvmFeeEstimator(t) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), evmcfg.EVM().GasEstimator(), ethKeyStore, estimator) ethClient.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), errors.New("Getting on-chain nonce failed")) + txmClient := txmgr.NewEvmTxmClient(ethClient) eb := txmgr.NewEvmBroadcaster( txStore, - txmgr.NewEvmTxmClient(ethClient), + txmClient, txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), evmcfg.Database().Listener(), ethKeyStore, txBuilder, - nil, logger.Test(t), &testCheckerFactory{}, false, @@ -180,7 +178,9 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() ethClient.On("PendingNonceAt", mock.Anything, otherAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, checkerFactory, false) + lggr := logger.Test(t) + nonceTracker := txmgr.NewNonceTracker(lggr, txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, checkerFactory, false, nonceTracker) toAddress := gethCommon.HexToAddress("0x6C03DDA95a2AEd917EeCc6eddD4b9D16E6380411") timeNow := time.Now() @@ -380,7 +380,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { }) evmcfg = evmtest.NewChainScopedConfig(t, cfg) ethClient.On("PendingNonceAt", mock.Anything, otherAddress).Return(uint64(1), nil).Once() - eb = NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, checkerFactory, false) + nonceTracker = txmgr.NewNonceTracker(lggr, txStore, txmgr.NewEvmTxmClient(ethClient)) + eb = NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, checkerFactory, false, nonceTracker) t.Run("sends transactions with type 0x2 in EIP-1559 mode", func(t *testing.T) { ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { @@ -528,7 +529,8 @@ func TestEthBroadcaster_TransmitChecking(t *testing.T) { evmcfg := evmtest.NewChainScopedConfig(t, cfg) checkerFactory := &testCheckerFactory{} ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, checkerFactory, false) + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, checkerFactory, false, nonceTracker) checker := txmgr.TransmitCheckerSpec{ CheckerType: txmgr.TransmitCheckerTypeSimulate, @@ -619,16 +621,16 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_OptimisticLockingOnEthTx(t *testi <-chBlock }).Once() ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil) + txmClient := txmgr.NewEvmTxmClient(ethClient) eb := txmgr.NewEvmBroadcaster( txStore, - txmgr.NewEvmTxmClient(ethClient), + txmClient, evmcfg, txmgr.NewEvmTxmFeeConfig(ccfg.EVM().GasEstimator()), ccfg.EVM().Transactions(), cfg.Database().Listener(), ethKeyStore, txBuilder, - nil, logger.Test(t), &testCheckerFactory{}, false, @@ -676,7 +678,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success_WithMultiplier(t *testing ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false) + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { assert.Equal(t, int(1600), int(tx.Gas())) @@ -756,7 +759,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false) + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker) // Crashed right after we commit the database transaction that saved // the nonce to the eth_tx so evm.key_states.next_nonce has not been @@ -794,7 +798,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false) + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker) // Crashed right after we commit the database transaction that saved the nonce to the eth_tx inProgressEthTx := mustInsertInProgressEthTxWithAttempt(t, txStore, firstNonce, fromAddress) @@ -830,7 +835,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false) + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker) // Crashed right after we commit the database transaction that saved the nonce to the eth_tx inProgressEthTx := mustInsertInProgressEthTxWithAttempt(t, txStore, firstNonce, fromAddress) @@ -865,7 +871,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false) + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker) // Crashed right after we commit the database transaction that saved the nonce to the eth_tx inProgressEthTx := mustInsertInProgressEthTxWithAttempt(t, txStore, firstNonce, fromAddress) @@ -902,7 +909,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false) + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker) // Crashed right after we commit the database transaction that saved the nonce to the eth_tx inProgressEthTx := mustInsertInProgressEthTxWithAttempt(t, txStore, firstNonce, fromAddress) @@ -943,7 +951,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false) + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker) // Crashed right after we commit the database transaction that saved the nonce to the eth_tx inProgressEthTx := mustInsertInProgressEthTxWithAttempt(t, txStore, firstNonce, fromAddress) @@ -980,8 +989,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { }) } -func getLocalNextNonce(t *testing.T, eb *txmgr.Broadcaster, fromAddress gethCommon.Address) uint64 { - n, err := eb.GetNextSequence(testutils.Context(t), fromAddress) +func getLocalNextNonce(t *testing.T, nonceTracker txmgr.NonceTracker, fromAddress gethCommon.Address) uint64 { + n, err := nonceTracker.GetNextSequence(testutils.Context(t), fromAddress) require.NoError(t, err) require.NotNil(t, n) return uint64(n) @@ -1006,7 +1015,10 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { evmcfg := evmtest.NewChainScopedConfig(t, cfg) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false) + lggr := logger.Test(t) + txmClient := txmgr.NewEvmTxmClient(ethClient) + nonceTracker := txmgr.NewNonceTracker(lggr, txStore, txmClient) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker) ctx := testutils.Context(t) require.NoError(t, commonutils.JustError(db.Exec(`SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`))) @@ -1039,7 +1051,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { assert.Len(t, etx1.TxAttempts, 1) // Check that the local nonce was incremented by one - finalNextNonce := getLocalNextNonce(t, eb, fromAddress) + finalNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) require.NoError(t, err) require.NotNil(t, finalNextNonce) require.Equal(t, int64(1), int64(finalNextNonce)) @@ -1047,7 +1059,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { t.Run("geth Client returns an error in the fatal errors category", func(t *testing.T) { fatalErrorExample := "exceeds block gas limit" - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) t.Run("without callback", func(t *testing.T) { etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) @@ -1072,7 +1084,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // Check that the key had its nonce reset var nonce evmtypes.Nonce - nonce, err = eb.GetNextSequence(ctx, fromAddress) + nonce, err = nonceTracker.GetNextSequence(ctx, fromAddress) require.NoError(t, err) // Saved NextNonce must be the same as before because this transaction // was not accepted by the eth node and never can be @@ -1135,14 +1147,12 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // same as the parent test, but callback is set by ctor t.Run("callback set by ctor", func(t *testing.T) { - lggr := logger.Test(t) estimator := gas.NewWrappedEvmEstimator(lggr, func(lggr logger.Logger) gas.EvmEstimator { return gas.NewFixedPriceEstimator(evmcfg.EVM().GasEstimator(), evmcfg.EVM().GasEstimator().BlockHistory(), lggr) }, evmcfg.EVM().GasEstimator().EIP1559DynamicFees(), nil) txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), evmcfg.EVM().GasEstimator(), ethKeyStore, estimator) - localNextNonce = getLocalNextNonce(t, eb, fromAddress) - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() - eb2 := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), evmcfg.Database().Listener(), ethKeyStore, txBuilder, nil, lggr, &testCheckerFactory{}, false) + localNextNonce = getLocalNextNonce(t, nonceTracker, fromAddress) + eb2 := txmgr.NewEvmBroadcaster(txStore, txmClient, txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), evmcfg.Database().Listener(), ethKeyStore, txBuilder, lggr, &testCheckerFactory{}, false) retryable, err := eb2.ProcessUnstartedTxs(ctx, fromAddress) assert.NoError(t, err) assert.False(t, retryable) @@ -1155,7 +1165,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { t.Run("geth Client fails with error indicating that the transaction was too expensive", func(t *testing.T) { TxFeeExceedsCapError := "tx fee (1.10 ether) exceeds the configured cap (1.00 ether)" - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) + ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce @@ -1185,7 +1196,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // Check that the key had its nonce reset var nonce evmtypes.Nonce - nonce, err = eb.GetNextSequence(ctx, fromAddress) + nonce, err = nonceTracker.GetNextSequence(ctx, fromAddress) require.NoError(t, err) // Saved NextNonce must be the same as before because this transaction // was not accepted by the eth node and never can be @@ -1213,7 +1224,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { t.Run("eth Client call fails with an unexpected random error, and transaction was not accepted into mempool", func(t *testing.T) { retryableErrorExample := "some unknown error" - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce @@ -1265,7 +1276,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { t.Run("eth client call fails with an unexpected random error, and the nonce check also subsequently fails", func(t *testing.T) { retryableErrorExample := "some unknown error" - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce @@ -1317,7 +1328,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { t.Run("eth Client call fails with an unexpected random error, and transaction was accepted into mempool", func(t *testing.T) { retryableErrorExample := "some strange RPC returns an unexpected thing" - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce @@ -1349,7 +1360,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // configured for the transaction pool. // This is a configuration error by the node operator, since it means they set the base gas level too low. underpricedError := "transaction underpriced" - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) // First was underpriced @@ -1397,7 +1408,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { t.Run("failed to reach node for some reason", func(t *testing.T) { failedToReachNodeError := context.DeadlineExceeded - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce @@ -1426,7 +1437,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // This happens if parity is rejecting transactions that are not priced high enough to even get into the mempool at all // It should pretend it was accepted into the mempool and hand off to ethConfirmer to bump gas as normal temporarilyUnderpricedError := "There are too many transactions in the queue. Your transaction was dropped due to limit. Try increasing the fee." - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) // Re-use the previously unfinished transaction, no need to insert new @@ -1457,7 +1468,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // configured for the transaction pool. // This is a configuration error by the node operator, since it means they set the base gas level too low. underpricedError := "transaction underpriced" - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) // In this scenario the node operator REALLY fucked up and set the bump // to zero (even though that should not be possible due to config // validation) @@ -1465,8 +1476,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { c.EVM[0].GasEstimator.BumpMin = assets.NewWeiI(0) c.EVM[0].GasEstimator.BumpPercent = ptr[uint16](0) })) - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() - eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg2, &testCheckerFactory{}, false) + eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg2, &testCheckerFactory{}, false, nonceTracker) mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) // First was underpriced @@ -1486,7 +1496,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { t.Run("eth tx is left in progress if eth node returns insufficient eth", func(t *testing.T) { insufficientEthError := "insufficient funds for transfer" - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce @@ -1515,7 +1525,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { pgtest.MustExec(t, db, `DELETE FROM evm.txes`) t.Run("eth tx is left in progress if nonce is too high", func(t *testing.T) { - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) nonceGapError := "NonceGap, Future nonce. Expected nonce: " + strconv.FormatUint(localNextNonce, 10) etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { @@ -1556,12 +1566,12 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { c.EVM[0].GasEstimator.BumpMin = assets.NewWeiI(0) c.EVM[0].GasEstimator.BumpPercent = ptr[uint16](0) })) - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() - eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg2, &testCheckerFactory{}, false) + eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg2, &testCheckerFactory{}, false, nonceTracker) mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) underpricedError := "transaction underpriced" - localNextNonce = getLocalNextNonce(t, eb, fromAddress) + localNextNonce = getLocalNextNonce(t, nonceTracker, fromAddress) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { return tx.Nonce() == localNextNonce && tx.GasTipCap().Cmp(big.NewInt(1)) == 0 }), fromAddress).Return(commonclient.Underpriced, errors.New(underpricedError)).Once() @@ -1580,7 +1590,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // configured for the transaction pool. // This is a configuration error by the node operator, since it means they set the base gas level too low. underpricedError := "transaction underpriced" - localNextNonce := getLocalNextNonce(t, eb, fromAddress) + localNextNonce := getLocalNextNonce(t, nonceTracker, fromAddress) mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, encodedPayload, gasLimit, value, &cltest.FixtureChainID) // Check gas tip cap verification @@ -1589,7 +1599,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { c.EVM[0].GasEstimator.TipCapDefault = assets.NewWeiI(0) })) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() - eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg2, &testCheckerFactory{}, false) + eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg2, &testCheckerFactory{}, false, nonceTracker) retryable, err := eb2.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) @@ -1602,8 +1612,9 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { c.EVM[0].GasEstimator.EIP1559DynamicFees = ptr(true) c.EVM[0].GasEstimator.TipCapDefault = gasTipCapDefault })) - localNextNonce = getLocalNextNonce(t, eb, fromAddress) - eb2 = NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg2, &testCheckerFactory{}, false) + localNextNonce = getLocalNextNonce(t, nonceTracker, fromAddress) + ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() + eb2 = NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg2, &testCheckerFactory{}, false, nonceTracker) // Second was underpriced but above minimum ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { @@ -1649,9 +1660,11 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_KeystoreErrors(t *testing.T) { addresses := []gethCommon.Address{fromAddress} kst.On("EnabledAddressesForChain", mock.Anything, &cltest.FixtureChainID).Return(addresses, nil).Once() ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, kst, evmcfg, &testCheckerFactory{}, false) + lggr := logger.Test(t) + nonceTracker := txmgr.NewNonceTracker(lggr, txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, kst, evmcfg, &testCheckerFactory{}, false, nonceTracker) ctx := testutils.Context(t) - _, err := eb.GetNextSequence(ctx, fromAddress) + _, err := nonceTracker.GetNextSequence(ctx, fromAddress) require.NoError(t, err) t.Run("tx signing fails", func(t *testing.T) { @@ -1679,55 +1692,12 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_KeystoreErrors(t *testing.T) { // Check that the key did not have its nonce incremented var nonce evmtypes.Nonce - nonce, err = eb.GetNextSequence(ctx, fromAddress) + nonce, err = nonceTracker.GetNextSequence(ctx, fromAddress) require.NoError(t, err) require.Equal(t, int64(localNonce), int64(nonce)) }) } -func TestEthBroadcaster_GetNextNonce(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - fromAddress := testutils.NewAddress() - evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - - kst := ksmocks.NewEth(t) - addresses := []gethCommon.Address{fromAddress} - kst.On("EnabledAddressesForChain", mock.Anything, &cltest.FixtureChainID).Return(addresses, nil).Once() - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, kst, evmcfg, &testCheckerFactory{}, false) - nonce := getLocalNextNonce(t, eb, fromAddress) - require.NotNil(t, nonce) - assert.Equal(t, int64(0), int64(nonce)) -} - -func TestEthBroadcaster_IncrementNextNonce(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - kst := ksmocks.NewEth(t) - fromAddress := testutils.NewAddress() - evmcfg := evmtest.NewChainScopedConfig(t, cfg) - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - - addresses := []gethCommon.Address{fromAddress} - kst.On("EnabledAddressesForChain", mock.Anything, &cltest.FixtureChainID).Return(addresses, nil).Once() - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, kst, evmcfg, &testCheckerFactory{}, false) - - ctx := testutils.Context(t) - nonce, err := eb.GetNextSequence(ctx, fromAddress) - require.NoError(t, err) - eb.IncrementNextSequence(fromAddress, nonce) - - // Nonce bumped to 1 - nonce, err = eb.GetNextSequence(ctx, fromAddress) - require.NoError(t, err) - require.Equal(t, int64(1), int64(nonce)) -} - func TestEthBroadcaster_Trigger(t *testing.T) { t.Parallel() @@ -1738,7 +1708,10 @@ func TestEthBroadcaster_Trigger(t *testing.T) { txStore := cltest.NewTestTxStore(t, db, cfg.Database()) evmcfg := evmtest.NewChainScopedConfig(t, cfg) ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - eb := NewTestEthBroadcaster(t, txStore, evmtest.NewEthClientMockWithDefaultChain(t), ethKeyStore, evmcfg, &testCheckerFactory{}, false) + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.Test(t) + nonceTracker := txmgr.NewNonceTracker(lggr, txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false, nonceTracker) eb.Trigger(testutils.NewAddress()) eb.Trigger(testutils.NewAddress()) @@ -1759,9 +1732,6 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { kst := cltest.NewKeyStore(t, db, cfg.Database()).Eth() _, fromAddress := cltest.RandomKey{Disabled: false}.MustInsertWithState(t, kst) - _, disabledAddress := cltest.RandomKey{Disabled: true}.MustInsertWithState(t, kst) - - ethNodeNonce := uint64(22) estimator := gas.NewWrappedEvmEstimator(lggr, func(lggr logger.Logger) gas.EvmEstimator { return gas.NewFixedPriceEstimator(evmcfg.EVM().GasEstimator(), evmcfg.EVM().GasEstimator().BlockHistory(), lggr) @@ -1778,7 +1748,8 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { addresses := []gethCommon.Address{fromAddress} kst.On("EnabledAddressesForChain", mock.Anything, &cltest.FixtureChainID).Return(addresses, nil).Once() ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, txBuilder, nil, lggr, checkerFactory, false) + txmClient := txmgr.NewEvmTxmClient(ethClient) + eb := txmgr.NewEvmBroadcaster(txStore, txmClient, evmTxmCfg, txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, txBuilder, lggr, checkerFactory, false) err := eb.Start(ctx) assert.NoError(t, err) @@ -1786,236 +1757,45 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { testutils.WaitForLogMessage(t, observed, "Skipping sequence auto-sync") }) - - t.Run("when nonce syncer returns new nonce, successfully sets nonce", func(t *testing.T) { - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), ge, kst, estimator) - - txNonceSyncer := txmgr.NewNonceSyncer(txStore, lggr, ethClient) - kst := ksmocks.NewEth(t) - addresses := []gethCommon.Address{fromAddress} - kst.On("EnabledAddressesForChain", mock.Anything, &cltest.FixtureChainID).Return(addresses, nil).Once() - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, txBuilder, txNonceSyncer, lggr, checkerFactory, true) - - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(ethNodeNonce, nil).Once() - servicetest.Run(t, eb) - - testutils.WaitForLogMessage(t, observed, "Fast-forward sequence") - - // Check nextSequenceMap to make sure it has correct nonce assigned - nonce, err := eb.GetNextSequence(ctx, fromAddress) - require.NoError(t, err) - require.Equal(t, strconv.FormatUint(ethNodeNonce, 10), nonce.String()) - - // The disabled key did not get updated - _, err = eb.GetNextSequence(ctx, disabledAddress) - require.Error(t, err) - }) - - ethNodeNonce++ - observed.TakeAll() - - t.Run("when nonce syncer returns error, retries and successfully sets nonce", func(t *testing.T) { - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), ge, kst, estimator) - txNonceSyncer := txmgr.NewNonceSyncer(txStore, lggr, ethClient) - - kst := ksmocks.NewEth(t) - addresses := []gethCommon.Address{fromAddress} - kst.On("EnabledAddressesForChain", mock.Anything, &cltest.FixtureChainID).Return(addresses, nil).Once() - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - - eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, txBuilder, txNonceSyncer, lggr, checkerFactory, true) - eb.XXXTestDisableUnstartedTxAutoProcessing() - - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), errors.New("something exploded")).Once() - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(ethNodeNonce, nil) - - servicetest.Run(t, eb) - - testutils.WaitForLogMessage(t, observed, "Fast-forward sequence") - - // Check keyState to make sure it has correct nonce assigned - nonce, err := eb.GetNextSequence(ctx, fromAddress) - require.NoError(t, err) - assert.Equal(t, int64(ethNodeNonce), int64(nonce)) - - // The disabled key did not get updated - _, err = eb.GetNextSequence(ctx, disabledAddress) - require.Error(t, err) - }) -} - -func Test_LoadSequenceMap(t *testing.T) { - t.Parallel() - ctx := testutils.Context(t) - t.Run("set next nonce using entries from tx table", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ks := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - evmcfg := evmtest.NewChainScopedConfig(t, cfg) - checkerFactory := &txmgr.CheckerFactory{Client: ethClient} - _, fromAddress := cltest.MustInsertRandomKey(t, ks) - cltest.MustInsertUnconfirmedEthTx(t, txStore, int64(0), fromAddress) - cltest.MustInsertUnconfirmedEthTx(t, txStore, int64(1), fromAddress) - eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) - - nonce, err := eb.GetNextSequence(ctx, fromAddress) - require.NoError(t, err) - require.Equal(t, int64(2), int64(nonce)) - }) - - t.Run("set next nonce using client when not found in tx table", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ks := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - evmcfg := evmtest.NewChainScopedConfig(t, cfg) - checkerFactory := &txmgr.CheckerFactory{Client: ethClient} - _, fromAddress := cltest.MustInsertRandomKey(t, ks) - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(10), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) - - nonce, err := eb.GetNextSequence(ctx, fromAddress) - require.NoError(t, err) - require.Equal(t, int64(10), int64(nonce)) - }) -} - -func Test_NextNonce(t *testing.T) { - t.Parallel() - - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ks := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - evmcfg := evmtest.NewChainScopedConfig(t, cfg) - checkerFactory := &txmgr.CheckerFactory{Client: ethClient} - randNonce := testutils.NewRandomPositiveInt64() - _, addr1 := cltest.MustInsertRandomKey(t, ks) - ethClient.On("PendingNonceAt", mock.Anything, addr1).Return(uint64(randNonce), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) - ctx := testutils.Context(t) - cltest.MustInsertRandomKey(t, ks, *ubig.New(testutils.FixtureChainID)) - - nonce, err := eb.GetNextSequence(ctx, addr1) - require.NoError(t, err) - require.Equal(t, randNonce, int64(nonce)) - - randAddr1 := utils.RandomAddress() - _, err = eb.GetNextSequence(ctx, randAddr1) - require.Error(t, err) - require.Contains(t, err.Error(), fmt.Sprintf("address disabled: %s", randAddr1.Hex())) - - randAddr2 := utils.RandomAddress() - _, err = eb.GetNextSequence(ctx, randAddr2) - require.Error(t, err) - require.Contains(t, err.Error(), fmt.Sprintf("address disabled: %s", randAddr2.Hex())) - -} - -func Test_SetNonceAfterInit(t *testing.T) { - t.Parallel() - - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ks := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - evmcfg := evmtest.NewChainScopedConfig(t, cfg) - checkerFactory := &txmgr.CheckerFactory{Client: ethClient} - randNonce := testutils.NewRandomPositiveInt64() - _, addr1 := cltest.MustInsertRandomKey(t, ks) - ethClient.On("PendingNonceAt", mock.Anything, addr1).Return(uint64(0), errors.New("failed to retrieve nonce at startup")).Once() - ethClient.On("PendingNonceAt", mock.Anything, addr1).Return(uint64(randNonce), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) - - ctx := testutils.Context(t) - nonce, err := eb.GetNextSequence(ctx, addr1) - require.NoError(t, err) - require.Equal(t, randNonce, int64(nonce)) - - // Test that the new nonce is set in the map and does not need a client call to retrieve on subsequent calls - nonce, err = eb.GetNextSequence(ctx, addr1) - require.NoError(t, err) - require.Equal(t, randNonce, int64(nonce)) } -func Test_IncrementNextNonce(t *testing.T) { +func TestEthBroadcaster_NonceTracker_InProgressTx(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) cfg := configtest.NewTestGeneralConfig(t) txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ks := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) evmcfg := evmtest.NewChainScopedConfig(t, cfg) checkerFactory := &txmgr.CheckerFactory{Client: ethClient} - randNonce := testutils.NewRandomPositiveInt64() - _, addr1 := cltest.MustInsertRandomKey(t, ks) - ethClient.On("PendingNonceAt", mock.Anything, addr1).Return(uint64(randNonce), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) + lggr := logger.Test(t) ctx := testutils.Context(t) - nonce, err := eb.GetNextSequence(ctx, addr1) - require.NoError(t, err) - eb.IncrementNextSequence(addr1, nonce) - - nonce, err = eb.GetNextSequence(ctx, addr1) - require.NoError(t, err) - assert.Equal(t, randNonce+1, int64(nonce)) - eb.IncrementNextSequence(addr1, nonce) - nonce, err = eb.GetNextSequence(ctx, addr1) - require.NoError(t, err) - assert.Equal(t, randNonce+2, int64(nonce)) - - randAddr1 := utils.RandomAddress() - _, err = eb.GetNextSequence(ctx, randAddr1) - require.Error(t, err) - assert.Contains(t, err.Error(), fmt.Sprintf("address disabled: %s", randAddr1.Hex())) + t.Run("maintains the proper nonce if there is an in-progress tx during startup", func(t *testing.T) { + inProgressTxNonce := uint64(0) + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { + return tx.Nonce() == inProgressTxNonce + }), fromAddress).Return(commonclient.Successful, nil).Once() - // verify it didnt get changed by any erroring calls - nonce, err = eb.GetNextSequence(ctx, addr1) - require.NoError(t, err) - assert.Equal(t, randNonce+2, int64(nonce)) -} + // Tx with nonce 0 in DB will set local nonce map to value to 1 + mustInsertInProgressEthTxWithAttempt(t, txStore, evmtypes.Nonce(inProgressTxNonce), fromAddress) + nonceTracker := txmgr.NewNonceTracker(lggr, txStore, txmgr.NewEvmTxmClient(ethClient)) + eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, checkerFactory, false, nonceTracker) -func Test_SetNextNonce(t *testing.T) { - t.Parallel() + // Check the local nonce map was set to 1 higher than in-progress tx nonce + nonce := getLocalNextNonce(t, nonceTracker, fromAddress) + require.Equal(t, inProgressTxNonce+1, nonce) - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ks := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - evmcfg := evmtest.NewChainScopedConfig(t, cfg) - checkerFactory := &txmgr.CheckerFactory{Client: ethClient} - _, fromAddress := cltest.MustInsertRandomKey(t, ks) - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() - eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) - ctx := testutils.Context(t) - - t.Run("update next nonce", func(t *testing.T) { - nonce, err := eb.GetNextSequence(ctx, fromAddress) + _, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.NoError(t, err) - assert.Equal(t, int64(0), int64(nonce)) - eb.SetNextSequence(fromAddress, evmtypes.Nonce(24)) - newNextNonce, err := eb.GetNextSequence(ctx, fromAddress) - require.NoError(t, err) - assert.Equal(t, int64(24), int64(newNextNonce)) + // Check the local nonce map maintained nonce 1 higher than in-progress tx nonce + nonce = getLocalNextNonce(t, nonceTracker, fromAddress) + require.Equal(t, inProgressTxNonce+1, nonce) }) } diff --git a/core/chains/evm/txmgr/builder.go b/core/chains/evm/txmgr/builder.go index cd8f5af884a..ba5b54ad29f 100644 --- a/core/chains/evm/txmgr/builder.go +++ b/core/chains/evm/txmgr/builder.go @@ -47,20 +47,19 @@ func NewTxm( // create tx attempt builder txAttemptBuilder := NewEvmTxAttemptBuilder(*client.ConfiguredChainID(), fCfg, keyStore, estimator) txStore := NewTxStore(sqlxDB, lggr, dbConfig) - txNonceSyncer := NewNonceSyncer(txStore, lggr, client) txmCfg := NewEvmTxmConfig(chainConfig) // wrap Evm specific config feeCfg := NewEvmTxmFeeConfig(fCfg) // wrap Evm specific config txmClient := NewEvmTxmClient(client) // wrap Evm specific client chainID := txmClient.ConfiguredChainID() - evmBroadcaster := NewEvmBroadcaster(txStore, txmClient, txmCfg, feeCfg, txConfig, listenerConfig, keyStore, txAttemptBuilder, txNonceSyncer, lggr, checker, chainConfig.NonceAutoSync()) + evmBroadcaster := NewEvmBroadcaster(txStore, txmClient, txmCfg, feeCfg, txConfig, listenerConfig, keyStore, txAttemptBuilder, lggr, checker, chainConfig.NonceAutoSync()) evmTracker := NewEvmTracker(txStore, keyStore, chainID, lggr) evmConfirmer := NewEvmConfirmer(txStore, txmClient, txmCfg, feeCfg, txConfig, dbConfig, keyStore, txAttemptBuilder, lggr) var evmResender *Resender if txConfig.ResendAfterThreshold() > 0 { evmResender = NewEvmResender(lggr, txStore, txmClient, evmTracker, keyStore, txmgr.DefaultResenderPollInterval, chainConfig, txConfig) } - txm = NewEvmTxm(chainID, txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, txNonceSyncer, evmBroadcaster, evmConfirmer, evmResender, evmTracker) + txm = NewEvmTxm(chainID, txmCfg, txConfig, keyStore, lggr, checker, fwdMgr, txAttemptBuilder, txStore, evmBroadcaster, evmConfirmer, evmResender, evmTracker) return txm, nil } @@ -75,13 +74,12 @@ func NewEvmTxm( fwdMgr FwdMgr, txAttemptBuilder TxAttemptBuilder, txStore TxStore, - nonceSyncer NonceSyncer, broadcaster *Broadcaster, confirmer *Confirmer, resender *Resender, tracker *Tracker, ) *Txm { - return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, nonceSyncer, broadcaster, confirmer, resender, tracker) + return txmgr.NewTxm(chainId, cfg, txCfg, keyStore, lggr, checkerFactory, fwdMgr, txAttemptBuilder, txStore, broadcaster, confirmer, resender, tracker) } // NewEvmResender creates a new concrete EvmResender @@ -138,10 +136,10 @@ func NewEvmBroadcaster( listenerConfig txmgrtypes.BroadcasterListenerConfig, keystore KeyStore, txAttemptBuilder TxAttemptBuilder, - nonceSyncer NonceSyncer, logger logger.Logger, checkerFactory TransmitCheckerFactory, autoSyncNonce bool, ) *Broadcaster { - return txmgr.NewBroadcaster(txStore, client, chainConfig, feeConfig, txConfig, listenerConfig, keystore, txAttemptBuilder, nonceSyncer, logger, checkerFactory, autoSyncNonce, evmtypes.GenerateNextNonce) + nonceTracker := NewNonceTracker(logger, txStore, client) + return txmgr.NewBroadcaster(txStore, client, chainConfig, feeConfig, txConfig, listenerConfig, keystore, txAttemptBuilder, nonceTracker, logger, checkerFactory, autoSyncNonce) } diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 8187a390878..9fe347b7a9f 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1742,26 +1742,6 @@ func (o *evmTxStore) HasInProgressTransaction(ctx context.Context, account commo return exists, pkgerrors.Wrap(err, "hasInProgressTransaction failed") } -func (o *evmTxStore) UpdateKeyNextSequence(newNextNonce, currentNextNonce evmtypes.Nonce, address common.Address, chainID *big.Int, qopts ...pg.QOpt) error { - qq := o.q.WithOpts(qopts...) - return qq.Transaction(func(tx pg.Queryer) error { - // We filter by next_nonce here as an optimistic lock to make sure it - // didn't get changed out from under us. Shouldn't happen but can't hurt. - res, err := tx.Exec(`UPDATE evm.key_states SET next_nonce = $1, updated_at = $2 WHERE address = $3 AND next_nonce = $4 AND evm_chain_id = $5`, newNextNonce.Int64(), time.Now(), address, currentNextNonce.Int64(), chainID.String()) - if err != nil { - return pkgerrors.Wrap(err, "NonceSyncer#fastForwardNonceIfNecessary failed to update keys.next_nonce") - } - rowsAffected, err := res.RowsAffected() - if err != nil { - return pkgerrors.Wrap(err, "NonceSyncer#fastForwardNonceIfNecessary failed to get RowsAffected") - } - if rowsAffected == 0 { - return ErrKeyNotUpdated - } - return nil - }) -} - func (o *evmTxStore) countTransactionsWithState(ctx context.Context, fromAddress common.Address, state txmgrtypes.TxState, chainID *big.Int) (count uint32, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 83d2381d007..9d0143d2eda 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1389,7 +1389,7 @@ func TestORM_UpdateTxUnstartedToInProgress(t *testing.T) { evmTxmCfg := txmgr.NewEvmTxmConfig(ccfg.EVM()) ec := evmtest.NewEthClientMockWithDefaultChain(t) txMgr := txmgr.NewEvmTxm(ec.ConfiguredChainID(), evmTxmCfg, ccfg.EVM().Transactions(), nil, logger.Test(t), nil, nil, - nil, txStore, nil, nil, nil, nil, nil) + nil, txStore, nil, nil, nil, nil) err := txMgr.XXXTestAbandon(fromAddress) // mark transaction as abandoned require.NoError(t, err) diff --git a/core/chains/evm/txmgr/models.go b/core/chains/evm/txmgr/models.go index 6633841f40b..be06f5dd5e9 100644 --- a/core/chains/evm/txmgr/models.go +++ b/core/chains/evm/txmgr/models.go @@ -26,7 +26,7 @@ type ( TransactionStore = txmgrtypes.TransactionStore[common.Address, *big.Int, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] KeyStore = txmgrtypes.KeyStore[common.Address, *big.Int, evmtypes.Nonce] TxAttemptBuilder = txmgrtypes.TxAttemptBuilder[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] - NonceSyncer = txmgr.SequenceSyncer[common.Address, common.Hash, common.Hash, evmtypes.Nonce] + NonceTracker = txmgrtypes.SequenceTracker[common.Address, evmtypes.Nonce] TransmitCheckerFactory = txmgr.TransmitCheckerFactory[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] Txm = txmgr.Txm[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, *evmtypes.Receipt, evmtypes.Nonce, gas.EvmFee] TxManager = txmgr.TxManager[*big.Int, *evmtypes.Head, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] diff --git a/core/chains/evm/txmgr/nonce_syncer.go b/core/chains/evm/txmgr/nonce_syncer.go deleted file mode 100644 index 0cb52a1321e..00000000000 --- a/core/chains/evm/txmgr/nonce_syncer.go +++ /dev/null @@ -1,106 +0,0 @@ -package txmgr - -import ( - "context" - "fmt" - "math/big" - - "github.com/ethereum/go-ethereum/common" - pkgerrors "github.com/pkg/errors" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink/v2/common/txmgr" - evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" -) - -// NonceSyncer manages the delicate task of syncing the local nonce with the -// chain nonce in case of divergence. -// -// On startup, we check each key for the nonce value on chain and compare -// it to our local value. -// -// Usually the on-chain nonce will be the same as (or lower than) the -// highest sequence in the DB, in which case we do nothing. -// -// If we are restoring from a backup however, or another wallet has used the -// account, the chain nonce might be higher than our local one. In this -// scenario, we must fastforward the local nonce to match the chain nonce. -// -// The problem with doing this is that now Chainlink does not have any -// ownership or control over potentially pending transactions with nonces -// between our local highest nonce and the chain nonce. If one of those -// transactions is pushed out of the mempool or re-org'd out of the chain, -// we run the risk of being stuck with a gap in the nonce sequence that -// will never be filled. -// -// The solution is to query the chain for our own transactions and take -// ownership of them by writing them to the database and letting the -// EthConfirmer handle them as it would any other transaction. -// -// This is not quite as straightforward as one might expect. We cannot -// query transactions from our account to infinite depth (geth does not -// support this). The best we can do is to query for all transactions sent -// within the past EVM.FinalityDepth blocks and find the ones sent by our -// address(es). -// -// This gives us re-org protection up to EVM.FinalityDepth deep in the -// worst case, which is in line with our other guarantees. -var _ txmgr.SequenceSyncer[common.Address, common.Hash, common.Hash, types.Nonce] = &nonceSyncerImpl{} - -type nonceSyncerImpl struct { - txStore EvmTxStore - client TxmClient - chainID *big.Int - logger logger.Logger -} - -// NewNonceSyncer returns a new syncer -func NewNonceSyncer( - txStore EvmTxStore, - lggr logger.Logger, - ethClient evmclient.Client, -) NonceSyncer { - lggr = logger.Named(lggr, "NonceSyncer") - return &nonceSyncerImpl{ - txStore: txStore, - client: NewEvmTxmClient(ethClient), - chainID: ethClient.ConfiguredChainID(), - logger: lggr, - } -} - -// SyncAll syncs nonces for all enabled keys in parallel -// -// This should only be called once, before the EthBroadcaster has started. -// Calling it later is not safe and could lead to races. -func (s nonceSyncerImpl) Sync(ctx context.Context, addr common.Address, localNonce types.Nonce) (nonce types.Nonce, err error) { - nonce, err = s.fastForwardNonceIfNecessary(ctx, addr, localNonce) - return nonce, pkgerrors.Wrap(err, "NonceSyncer#fastForwardNoncesIfNecessary failed") -} - -func (s nonceSyncerImpl) fastForwardNonceIfNecessary(ctx context.Context, address common.Address, localNonce types.Nonce) (types.Nonce, error) { - chainNonce, err := s.pendingNonceFromEthClient(ctx, address) - if err != nil { - return localNonce, pkgerrors.Wrap(err, "GetNextNonce failed to loadInitialNonceFromEthClient") - } - if chainNonce == 0 { - return localNonce, nil - } - if chainNonce <= localNonce { - return localNonce, nil - } - s.logger.Warnw(fmt.Sprintf("address %s has been used before, either by an external wallet or a different Chainlink node. "+ - "Local nonce is %v but the on-chain nonce for this account was %v. "+ - "It's possible that this node was restored from a backup. If so, transactions sent by the previous node will NOT be re-org protected and in rare cases may need to be manually bumped/resubmitted. "+ - "Please note that using the chainlink keys with an external wallet is NOT SUPPORTED and can lead to missed or stuck transactions. ", - address, localNonce, chainNonce), - "address", address.String(), "localNonce", localNonce, "chainNonce", chainNonce) - - return chainNonce, nil -} - -func (s nonceSyncerImpl) pendingNonceFromEthClient(ctx context.Context, account common.Address) (types.Nonce, error) { - nextNonce, err := s.client.PendingSequenceAt(ctx, account) - return nextNonce, pkgerrors.WithStack(err) -} diff --git a/core/chains/evm/txmgr/nonce_syncer_test.go b/core/chains/evm/txmgr/nonce_syncer_test.go deleted file mode 100644 index d9a741fb3c5..00000000000 --- a/core/chains/evm/txmgr/nonce_syncer_test.go +++ /dev/null @@ -1,114 +0,0 @@ -package txmgr_test - -import ( - "testing" - - "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - - pkgerrors "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" -) - -func Test_NonceSyncer_Sync(t *testing.T) { - t.Parallel() - - t.Run("returns error if PendingNonceAt fails", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - _, from := cltest.MustInsertRandomKey(t, ethKeyStore) - - ns := txmgr.NewNonceSyncer(txStore, logger.Test(t), ethClient) - - ethClient.On("PendingNonceAt", mock.Anything, from).Return(uint64(0), pkgerrors.New("something exploded")) - _, err := ns.Sync(testutils.Context(t), from, types.Nonce(0)) - require.Error(t, err) - assert.Contains(t, err.Error(), "something exploded") - - cltest.AssertCount(t, db, "evm.txes", 0) - cltest.AssertCount(t, db, "evm.tx_attempts", 0) - }) - - t.Run("does nothing if chain nonce reflects local nonce", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - _, from := cltest.MustInsertRandomKey(t, ethKeyStore) - - ns := txmgr.NewNonceSyncer(txStore, logger.Test(t), ethClient) - - ethClient.On("PendingNonceAt", mock.Anything, from).Return(uint64(0), nil) - - nonce, err := ns.Sync(testutils.Context(t), from, 0) - require.Equal(t, nonce.Int64(), int64(0)) - require.NoError(t, err) - - cltest.AssertCount(t, db, "evm.txes", 0) - cltest.AssertCount(t, db, "evm.tx_attempts", 0) - }) - - t.Run("does nothing if chain nonce is behind local nonce", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ks := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - - _, fromAddress := cltest.RandomKey{Nonce: 32}.MustInsert(t, ks) - - ns := txmgr.NewNonceSyncer(txStore, logger.Test(t), ethClient) - - // Used to mock the chain nonce - ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(5), nil) - nonce, err := ns.Sync(testutils.Context(t), fromAddress, types.Nonce(32)) - require.Equal(t, nonce.Int64(), int64(32)) - require.NoError(t, err) - - cltest.AssertCount(t, db, "evm.txes", 0) - cltest.AssertCount(t, db, "evm.tx_attempts", 0) - }) - - t.Run("fast forwards if chain nonce is ahead of local nonce", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewTestGeneralConfig(t) - txStore := cltest.NewTestTxStore(t, db, cfg.Database()) - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() - - _, key1 := cltest.MustInsertRandomKey(t, ethKeyStore) - _, key2 := cltest.RandomKey{Nonce: 32}.MustInsert(t, ethKeyStore) - - key1LocalNonce := types.Nonce(0) - key2LocalNonce := types.Nonce(32) - - ns := txmgr.NewNonceSyncer(txStore, logger.Test(t), ethClient) - - // Used to mock the chain nonce - ethClient.On("PendingNonceAt", mock.Anything, key1).Return(uint64(5), nil).Once() - ethClient.On("PendingNonceAt", mock.Anything, key2).Return(uint64(32), nil).Once() - - syncerNonce, err := ns.Sync(testutils.Context(t), key1, key1LocalNonce) - require.NoError(t, err) - require.Greater(t, syncerNonce, key1LocalNonce) - - syncerNonce, err = ns.Sync(testutils.Context(t), key2, key2LocalNonce) - require.NoError(t, err) - require.Equal(t, syncerNonce, key2LocalNonce) - }) -} diff --git a/core/chains/evm/txmgr/nonce_tracker.go b/core/chains/evm/txmgr/nonce_tracker.go new file mode 100644 index 00000000000..6fb708ed876 --- /dev/null +++ b/core/chains/evm/txmgr/nonce_tracker.go @@ -0,0 +1,186 @@ +package txmgr + +import ( + "context" + "fmt" + "math/big" + "slices" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/jpillora/backoff" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" +) + +type NonceTrackerTxStore interface { + FindLatestSequence(context.Context, common.Address, *big.Int) (evmtypes.Nonce, error) +} + +type NonceTrackerClient interface { + ConfiguredChainID() *big.Int + PendingSequenceAt(context.Context, common.Address) (evmtypes.Nonce, error) +} + +type nonceTracker struct { + lggr logger.SugaredLogger + nextSequenceMap map[common.Address]evmtypes.Nonce + txStore NonceTrackerTxStore + chainID *big.Int + client NonceTrackerClient + enabledAddresses []common.Address + + sequenceLock sync.RWMutex +} + +func NewNonceTracker(lggr logger.Logger, txStore NonceTrackerTxStore, client NonceTrackerClient) *nonceTracker { + lggr = logger.Named(lggr, "NonceTracker") + return &nonceTracker{ + lggr: logger.Sugared(lggr), + txStore: txStore, + chainID: client.ConfiguredChainID(), + client: client, + } +} + +func (s *nonceTracker) LoadNextSequences(ctx context.Context, addresses []common.Address) { + s.sequenceLock.Lock() + defer s.sequenceLock.Unlock() + + s.enabledAddresses = addresses + + s.nextSequenceMap = make(map[common.Address]evmtypes.Nonce) + for _, address := range addresses { + seq, err := s.getSequenceForAddr(ctx, address) + if err == nil { + s.nextSequenceMap[address] = seq + } + } +} + +func (s *nonceTracker) getSequenceForAddr(ctx context.Context, address common.Address) (seq evmtypes.Nonce, err error) { + // Get the highest sequence from the tx table + // Will need to be incremented since this sequence is already used + seq, err = s.txStore.FindLatestSequence(ctx, address, s.chainID) + if err == nil { + seq++ + return seq, nil + } + // Look for nonce on-chain if no tx found for address in TxStore or if error occurred + // Returns the nonce that should be used for the next transaction so no need to increment + nonce, err := s.client.PendingSequenceAt(ctx, address) + if err == nil { + return nonce, nil + } + s.lggr.Criticalw("failed to retrieve next sequence from on-chain for address: ", "address", address.String()) + return seq, err + +} + +// syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent +func (s *nonceTracker) SyncSequence(ctx context.Context, addr common.Address, chStop services.StopChan) { + sequenceSyncRetryBackoff := backoff.Backoff{ + Min: 100 * time.Millisecond, + Max: 5 * time.Second, + Jitter: true, + } + + localSequence, err := s.GetNextSequence(ctx, addr) + // Address not found in map so skip sync + if err != nil { + s.lggr.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err) + return + } + + // Enter loop with retries + var attempt int + for { + select { + case <-chStop: + return + case <-time.After(sequenceSyncRetryBackoff.Duration()): + attempt++ + err := s.SyncOnChain(ctx, addr, localSequence) + if err != nil { + if attempt > 5 { + s.lggr.Criticalw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err) + } else { + s.lggr.Warnw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err) + } + continue + } + return + } + } +} + +func (s *nonceTracker) SyncOnChain(ctx context.Context, addr common.Address, localSequence evmtypes.Nonce) error { + nonce, err := s.client.PendingSequenceAt(ctx, addr) + if err != nil { + return err + } + if nonce > localSequence { + s.lggr.Warnw(fmt.Sprintf("address %s has been used before, either by an external wallet or a different Chainlink node. "+ + "Local nonce is %v but the on-chain nonce for this account was %v. "+ + "It's possible that this node was restored from a backup. If so, transactions sent by the previous node will NOT be re-org protected and in rare cases may need to be manually bumped/resubmitted. "+ + "Please note that using the chainlink keys with an external wallet is NOT SUPPORTED and can lead to missed or stuck transactions. ", + addr, localSequence, nonce), + "address", addr.String(), "localNonce", localSequence, "chainNonce", nonce) + + s.lggr.Infow("Fast-forward sequence", "address", addr, "newNextSequence", nonce, "oldNextSequence", localSequence) + } + + s.sequenceLock.Lock() + defer s.sequenceLock.Unlock() + s.nextSequenceMap[addr] = max(localSequence, nonce) + return nil +} + +func (s *nonceTracker) GetNextSequence(ctx context.Context, address common.Address) (seq evmtypes.Nonce, err error) { + s.sequenceLock.Lock() + defer s.sequenceLock.Unlock() + // Get next sequence from map + seq, exists := s.nextSequenceMap[address] + if exists { + return seq, nil + } + + s.lggr.Infow("address not found in local next sequence map. Attempting to search and populate sequence.", "address", address.String()) + // Check if address is in the enabled address list + if !slices.Contains(s.enabledAddresses, address) { + return seq, fmt.Errorf("address disabled: %s", address) + } + + // Try to retrieve next sequence from tx table or on-chain to load the map + // A scenario could exist where loading the map during startup failed (e.g. All configured RPC's are unreachable at start) + // The expectation is that the node does not fail startup so sequences need to be loaded during runtime + foundSeq, err := s.getSequenceForAddr(ctx, address) + if err != nil { + return seq, fmt.Errorf("failed to find next sequence for address: %s", address) + } + + // Set sequence in map + s.nextSequenceMap[address] = foundSeq + return foundSeq, nil +} + +func (s *nonceTracker) GenerateNextSequence(address common.Address, nonceUsed evmtypes.Nonce) { + s.sequenceLock.Lock() + defer s.sequenceLock.Unlock() + currentNonce := s.nextSequenceMap[address] + + // In most cases, currentNonce would equal nonceUsed + // There is a chance currentNonce is 1 ahead of nonceUsed if the DB contains an in-progress tx during startup + // Incrementing currentNonce, which is already set to the next usable nonce, could lead to a nonce gap. Set the map to the incremented nonceUsed instead. + if currentNonce == nonceUsed || currentNonce == nonceUsed+1 { + s.nextSequenceMap[address] = nonceUsed + 1 + return + } + + // If currentNonce is ahead of even the incremented nonceUsed, maintain the unchanged currentNonce in the map + // This scenario should never occur but logging this discrepancy for visibility + s.lggr.Warnf("Local nonce map value %d for address %s is ahead of the nonce transmitted %d. Maintaining the existing value in the map without incrementing.", currentNonce, address.String(), nonceUsed) +} diff --git a/core/chains/evm/txmgr/nonce_tracker_test.go b/core/chains/evm/txmgr/nonce_tracker_test.go new file mode 100644 index 00000000000..95347c2d580 --- /dev/null +++ b/core/chains/evm/txmgr/nonce_tracker_test.go @@ -0,0 +1,293 @@ +package txmgr_test + +import ( + "errors" + "fmt" + "math/big" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + + clientmock "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + txstoremock "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" +) + +func TestNonceTracker_LoadSequenceMap(t *testing.T) { + t.Parallel() + + ctx := testutils.Context(t) + chainID := big.NewInt(0) + txStore := txstoremock.NewEvmTxStore(t) + + client := clientmock.NewClient(t) + client.On("ConfiguredChainID").Return(chainID) + + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(client)) + + addr1 := common.HexToAddress("0xd5e099c71b797516c10ed0f0d895f429c2781142") + addr2 := common.HexToAddress("0xd5e099c71b797516c10ed0f0d895f429c2781140") + enabledAddresses := []common.Address{addr1, addr2} + + t.Run("set next nonce using entries from tx table", func(t *testing.T) { + randNonce1 := testutils.NewRandomPositiveInt64() + randNonce2 := testutils.NewRandomPositiveInt64() + txStore.On("FindLatestSequence", mock.Anything, addr1, chainID).Return(types.Nonce(randNonce1), nil).Once() + txStore.On("FindLatestSequence", mock.Anything, addr2, chainID).Return(types.Nonce(randNonce2), nil).Once() + + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + seq, err := nonceTracker.GetNextSequence(ctx, addr1) + require.NoError(t, err) + require.Equal(t, types.Nonce(randNonce1+1), seq) + seq, err = nonceTracker.GetNextSequence(ctx, addr2) + require.NoError(t, err) + require.Equal(t, types.Nonce(randNonce2+1), seq) + }) + + t.Run("set next nonce using client when not found in tx table", func(t *testing.T) { + var emptyNonce types.Nonce + txStore.On("FindLatestSequence", mock.Anything, addr1, chainID).Return(emptyNonce, errors.New("no rows")).Once() + txStore.On("FindLatestSequence", mock.Anything, addr2, chainID).Return(emptyNonce, errors.New("no rows")).Once() + + randNonce1 := testutils.NewRandomPositiveInt64() + randNonce2 := testutils.NewRandomPositiveInt64() + client.On("PendingNonceAt", mock.Anything, addr1).Return(uint64(randNonce1), nil).Once() + client.On("PendingNonceAt", mock.Anything, addr2).Return(uint64(randNonce2), nil).Once() + + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + seq, err := nonceTracker.GetNextSequence(ctx, addr1) + require.NoError(t, err) + require.Equal(t, types.Nonce(randNonce1), seq) + seq, err = nonceTracker.GetNextSequence(ctx, addr2) + require.NoError(t, err) + require.Equal(t, types.Nonce(randNonce2), seq) + }) + +} + +func TestNonceTracker_syncOnChain(t *testing.T) { + t.Parallel() + + ctx := testutils.Context(t) + chainID := big.NewInt(0) + txStore := txstoremock.NewEvmTxStore(t) + + client := clientmock.NewClient(t) + client.On("ConfiguredChainID").Return(chainID) + + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(client)) + + addr := common.HexToAddress("0xd5e099c71b797516c10ed0f0d895f429c2781142") + + t.Run("throws error if RPC call fails", func(t *testing.T) { + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(0), errors.New("RPC unavailable")).Once() + + err := nonceTracker.SyncOnChain(ctx, addr, types.Nonce(2)) + require.Error(t, err) + }) + + t.Run("uses local nonce instead of on-chain nonce if on-chain nonce is lower", func(t *testing.T) { + nonce := 2 + newNonce := 5 + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(nonce), nil).Once() + + enabledAddresses := []common.Address{} + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + + // syncOnChain will set the next sequence even if the address is not present in the map + err := nonceTracker.SyncOnChain(ctx, addr, types.Nonce(newNonce)) + require.NoError(t, err) + + seq, err := nonceTracker.GetNextSequence(ctx, addr) + require.NoError(t, err) + require.Equal(t, types.Nonce(newNonce), seq) + }) + + t.Run("fast forwards nonce if on-chain nonce is higher than local nonce", func(t *testing.T) { + nonce := 10 + onChainNonce := 5 + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(nonce), nil).Once() + + enabledAddresses := []common.Address{} + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + + // syncOnChain will set the next sequence even if the address is not present in the map + err := nonceTracker.SyncOnChain(ctx, addr, types.Nonce(onChainNonce)) + require.NoError(t, err) + + seq, err := nonceTracker.GetNextSequence(ctx, addr) + require.NoError(t, err) + require.Equal(t, types.Nonce(nonce), seq) + }) + +} + +func TestNonceTracker_SyncSequence(t *testing.T) { + t.Parallel() + + ctx := testutils.Context(t) + chainID := big.NewInt(0) + txStore := txstoremock.NewEvmTxStore(t) + + client := clientmock.NewClient(t) + client.On("ConfiguredChainID").Return(chainID) + + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(client)) + + addr := common.HexToAddress("0xd5e099c71b797516c10ed0f0d895f429c2781142") + enabledAddresses := []common.Address{addr} + + t.Run("syncs sequence successfully", func(t *testing.T) { + txStoreNonce := 2 + onChainNonce := 3 + txStore.On("FindLatestSequence", mock.Anything, addr, chainID).Return(types.Nonce(txStoreNonce), nil).Once() + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + + var chStop services.StopChan + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(onChainNonce), nil).Once() + nonceTracker.SyncSequence(ctx, addr, chStop) + + seq, err := nonceTracker.GetNextSequence(ctx, addr) + require.NoError(t, err) + require.Equal(t, types.Nonce(onChainNonce), seq) + }) + + t.Run("retries if on-chain syncing fails", func(t *testing.T) { + txStoreNonce := 2 + onChainNonce := 3 + txStore.On("FindLatestSequence", mock.Anything, addr, chainID).Return(types.Nonce(txStoreNonce), nil).Once() + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + + var chStop services.StopChan + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(0), errors.New("RPC unavailable")).Once() + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(onChainNonce), nil).Once() + nonceTracker.SyncSequence(ctx, addr, chStop) + + seq, err := nonceTracker.GetNextSequence(ctx, addr) + require.NoError(t, err) + require.Equal(t, types.Nonce(onChainNonce), seq) + }) +} + +func TestNonceTracker_GetNextSequence(t *testing.T) { + t.Parallel() + + ctx := testutils.Context(t) + chainID := big.NewInt(0) + txStore := txstoremock.NewEvmTxStore(t) + + client := clientmock.NewClient(t) + client.On("ConfiguredChainID").Return(chainID) + + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(client)) + + addr := common.HexToAddress("0xd5e099c71b797516c10ed0f0d895f429c2781142") + + t.Run("fails to get sequence if address doesn't exist in map", func(t *testing.T) { + _, err := nonceTracker.GetNextSequence(ctx, addr) + require.Error(t, err) + + }) + + t.Run("fails to get sequence if address doesn't exist in map and is disabled", func(t *testing.T) { + _, err := nonceTracker.GetNextSequence(ctx, addr) + require.Error(t, err) + require.Contains(t, err.Error(), fmt.Sprintf("address disabled: %s", addr.Hex())) + }) + + t.Run("fails to get sequence if address is enabled, doesn't exist in map, and getSequenceForAddr fails", func(t *testing.T) { + enabledAddresses := []common.Address{addr} + txStore.On("FindLatestSequence", mock.Anything, addr, chainID).Return(types.Nonce(0), errors.New("no rows")).Twice() + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(0), errors.New("RPC unavailable")).Twice() + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + + _, err := nonceTracker.GetNextSequence(ctx, addr) + require.Error(t, err) + require.Contains(t, err.Error(), fmt.Sprintf("failed to find next sequence for address: %s", addr.Hex())) + }) + + t.Run("gets next sequence successfully if there is no entry in map but address is enabled and getSequenceForAddr is successful", func(t *testing.T) { + txStoreNonce := 4 + enabledAddresses := []common.Address{addr} + txStore.On("FindLatestSequence", mock.Anything, addr, chainID).Return(types.Nonce(0), errors.New("no rows")).Once() + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(0), errors.New("RPC unavailable")).Once() + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + + txStore.On("FindLatestSequence", mock.Anything, addr, chainID).Return(types.Nonce(txStoreNonce), nil).Once() + seq, err := nonceTracker.GetNextSequence(ctx, addr) + require.NoError(t, err) + require.Equal(t, types.Nonce(txStoreNonce+1), seq) + + }) +} + +func TestNonceTracker_GenerateNextSequence(t *testing.T) { + t.Parallel() + + ctx := testutils.Context(t) + chainID := big.NewInt(0) + txStore := txstoremock.NewEvmTxStore(t) + + client := clientmock.NewClient(t) + client.On("ConfiguredChainID").Return(chainID) + + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(client)) + + addr := common.HexToAddress("0xd5e099c71b797516c10ed0f0d895f429c2781142") + enabledAddresses := []common.Address{addr} + + randNonce := testutils.NewRandomPositiveInt64() + txStore.On("FindLatestSequence", mock.Anything, addr, chainID).Return(types.Nonce(randNonce), nil).Once() + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + seq, err := nonceTracker.GetNextSequence(ctx, addr) + require.NoError(t, err) + require.Equal(t, types.Nonce(randNonce+1), seq) // Local nonce should be highest nonce in DB + 1 + + nonceTracker.GenerateNextSequence(addr, types.Nonce(randNonce+1)) + + seq, err = nonceTracker.GetNextSequence(ctx, addr) + require.NoError(t, err) + require.Equal(t, types.Nonce(randNonce+2), seq) // GenerateNextSequence increases local nonce by 1 +} + +func Test_SetNonceAfterInit(t *testing.T) { + t.Parallel() + + ctx := testutils.Context(t) + chainID := big.NewInt(0) + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewTestGeneralConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + + client := clientmock.NewClient(t) + client.On("ConfiguredChainID").Return(chainID) + + nonceTracker := txmgr.NewNonceTracker(logger.Test(t), txStore, txmgr.NewEvmTxmClient(client)) + + addr := common.HexToAddress("0xd5e099c71b797516c10ed0f0d895f429c2781142") + enabledAddresses := []common.Address{addr} + randNonce := testutils.NewRandomPositiveInt64() + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(0), errors.New("failed to retrieve nonce at startup")).Once() + client.On("PendingNonceAt", mock.Anything, addr).Return(uint64(randNonce), nil).Once() + nonceTracker.LoadNextSequences(ctx, enabledAddresses) + + nonce, err := nonceTracker.GetNextSequence(ctx, addr) + require.NoError(t, err) + require.Equal(t, randNonce, int64(nonce)) + + // Test that the new nonce is set in the map and does not need a client call to retrieve on subsequent calls + nonce, err = nonceTracker.GetNextSequence(ctx, addr) + require.NoError(t, err) + require.Equal(t, randNonce, int64(nonce)) +} diff --git a/core/chains/evm/types/nonce.go b/core/chains/evm/types/nonce.go index be295bdd2a9..0c3256dc545 100644 --- a/core/chains/evm/types/nonce.go +++ b/core/chains/evm/types/nonce.go @@ -17,7 +17,3 @@ func (n Nonce) Int64() int64 { func (n Nonce) String() string { return strconv.FormatInt(n.Int64(), 10) } - -func GenerateNextNonce(prev Nonce) Nonce { - return prev + 1 -} diff --git a/core/services/vrf/v2/integration_v2_test.go b/core/services/vrf/v2/integration_v2_test.go index 3b7cdfa0a62..5812ee675cd 100644 --- a/core/services/vrf/v2/integration_v2_test.go +++ b/core/services/vrf/v2/integration_v2_test.go @@ -144,7 +144,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M _, _, evmConfig := txmgr.MakeTestConfigs(t) txmConfig := txmgr.NewEvmTxmConfig(evmConfig) txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil, - nil, txStore, nil, nil, nil, nil, nil) + nil, txStore, nil, nil, nil, nil) return txm } diff --git a/core/services/vrf/v2/listener_v2_test.go b/core/services/vrf/v2/listener_v2_test.go index 465e3dcaca9..661772a823b 100644 --- a/core/services/vrf/v2/listener_v2_test.go +++ b/core/services/vrf/v2/listener_v2_test.go @@ -40,7 +40,7 @@ func makeTestTxm(t *testing.T, txStore txmgr.TestEvmTxStore, keyStore keystore.M ec := evmtest.NewEthClientMockWithDefaultChain(t) txmConfig := txmgr.NewEvmTxmConfig(evmConfig) txm := txmgr.NewEvmTxm(ec.ConfiguredChainID(), txmConfig, evmConfig.Transactions(), keyStore.Eth(), logger.TestLogger(t), nil, nil, - nil, txStore, nil, nil, nil, nil, nil) + nil, txStore, nil, nil, nil, nil) return txm }