From 01fbf8e445b2ae4db616d0fc8e10fa4d44895e0b Mon Sep 17 00:00:00 2001 From: amit-momin <108959691+amit-momin@users.noreply.github.com> Date: Thu, 16 Nov 2023 10:34:08 -0600 Subject: [PATCH] Update loading next sequence map to avoid startup failure (#11307) * Updated loading next sequence map to avoid startup failure * Moved logic to populate next sequence map during runtime * Added changelog * Addressed feedback * Addressed feedback --- common/txmgr/broadcaster.go | 77 ++++++---- core/chains/evm/txmgr/broadcaster_test.go | 175 +++++++++++++++------- docs/CHANGELOG.md | 6 + 3 files changed, 176 insertions(+), 82 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 00522abf229..1e3b2fa0a95 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "slices" "sync" "time" @@ -228,10 +229,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star eb.sequenceLock.Lock() defer eb.sequenceLock.Unlock() - eb.nextSequenceMap, err = eb.loadNextSequenceMap(eb.enabledAddresses) - if err != nil { - return errors.Wrap(err, "Broadcaster: failed to load next sequence map") - } + eb.nextSequenceMap = eb.loadNextSequenceMap(eb.enabledAddresses) eb.isStarted = true return nil @@ -287,30 +285,38 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig } // Load the next sequence map using the tx table or on-chain (if not found in tx table) -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) (map[ADDR]SEQ, error) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) map[ADDR]SEQ { ctx, cancel := eb.chStop.NewCtx() defer cancel() nextSequenceMap := make(map[ADDR]SEQ) for _, address := range addresses { - // Get the highest sequence from the tx table - // Will need to be incremented since this sequence is already used - seq, err := eb.txStore.FindLatestSequence(ctx, address, eb.chainID) - if err != nil { - // Look for nonce on-chain if no tx found for address in TxStore or if error occurred - // Returns the nonce that should be used for the next transaction so no need to increment - seq, err = eb.client.PendingSequenceAt(ctx, address) - if err != nil { - return nil, errors.New("failed to retrieve next sequence from on-chain causing failure to load next sequence map on broadcaster startup") - } - + seq, err := eb.getSequenceForAddr(ctx, address) + if err == nil { nextSequenceMap[address] = seq - } else { - nextSequenceMap[address] = eb.generateNextSequence(seq) } } - return nextSequenceMap, nil + return nextSequenceMap +} + +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getSequenceForAddr(ctx context.Context, address ADDR) (seq SEQ, err error) { + // Get the highest sequence from the tx table + // Will need to be incremented since this sequence is already used + seq, err = eb.txStore.FindLatestSequence(ctx, address, eb.chainID) + if err == nil { + seq = eb.generateNextSequence(seq) + return seq, nil + } + // Look for nonce on-chain if no tx found for address in TxStore or if error occurred + // Returns the nonce that should be used for the next transaction so no need to increment + seq, err = eb.client.PendingSequenceAt(ctx, address) + if err == nil { + return seq, nil + } + eb.logger.Criticalw("failed to retrieve next sequence from on-chain for address: ", "address", address.String()) + return seq, err + } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newSequenceSyncBackoff() backoff.Backoff { @@ -393,7 +399,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni // syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR) { sequenceSyncRetryBackoff := eb.newSequenceSyncBackoff() - localSequence, err := eb.GetNextSequence(addr) + localSequence, err := eb.GetNextSequence(ctx, addr) // Address not found in map so skip sync if err != nil { eb.logger.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err) @@ -607,7 +613,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now()) // Check if from_address exists in map to ensure it is valid before broadcasting var sequence SEQ - sequence, err = eb.GetNextSequence(etx.FromAddress) + sequence, err = eb.GetNextSequence(ctx, etx.FromAddress) if err != nil { return err, true } @@ -665,7 +671,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // Check if from_address exists in map to ensure it is valid before broadcasting var sequence SEQ - sequence, err = eb.GetNextSequence(etx.FromAddress) + sequence, err = eb.GetNextSequence(ctx, etx.FromAddress) if err != nil { return err, true } @@ -702,7 +708,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next return nil, errors.Wrap(err, "findNextUnstartedTransactionFromAddress failed") } - sequence, err := eb.GetNextSequence(etx.FromAddress) + sequence, err := eb.GetNextSequence(ctx, etx.FromAddress) if err != nil { return nil, err } @@ -792,15 +798,32 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save } // Used to get the next usable sequence for a transaction -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(address ADDR) (seq SEQ, err error) { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(ctx context.Context, address ADDR) (seq SEQ, err error) { eb.sequenceLock.Lock() defer eb.sequenceLock.Unlock() // Get next sequence from map seq, exists := eb.nextSequenceMap[address] - if !exists { - return seq, errors.New(fmt.Sprint("address not found in next sequence map: ", address)) + if exists { + return seq, nil + } + + eb.logger.Infow("address not found in local next sequence map. Attempting to search and populate sequence.", "address", address.String()) + // Check if address is in the enabled address list + if !slices.Contains(eb.enabledAddresses, address) { + return seq, fmt.Errorf("address disabled: %s", address) } - return seq, nil + + // Try to retrieve next sequence from tx table or on-chain to load the map + // A scenario could exist where loading the map during startup failed (e.g. All configured RPC's are unreachable at start) + // The expectation is that the node does not fail startup so sequences need to be loaded during runtime + foundSeq, err := eb.getSequenceForAddr(ctx, address) + if err != nil { + return seq, fmt.Errorf("failed to find next sequence for address: %s", address) + } + + // Set sequence in map + eb.nextSequenceMap[address] = foundSeq + return foundSeq, nil } // Used to increment the sequence in the mapping to have the next usable one available for the next transaction diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 48b68f9b55c..7967478e624 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -130,6 +130,38 @@ func TestEthBroadcaster_Lifecycle(t *testing.T) { require.NoError(t, eb.XXXTestCloseInternal()) } +// Failure to load next sequnce map should not fail Broadcaster startup +func TestEthBroadcaster_LoadNextSequenceMapFailure_StartupSuccess(t *testing.T) { + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewTestGeneralConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + evmcfg := evmtest.NewChainScopedConfig(t, cfg) + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + estimator := gasmocks.NewEvmFeeEstimator(t) + txBuilder := txmgr.NewEvmTxAttemptBuilder(*ethClient.ConfiguredChainID(), evmcfg.EVM().GasEstimator(), ethKeyStore, estimator) + ethClient.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), errors.New("Getting on-chain nonce failed")) + eb := txmgr.NewEvmBroadcaster( + txStore, + txmgr.NewEvmTxmClient(ethClient), + txmgr.NewEvmTxmConfig(evmcfg.EVM()), + txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), + evmcfg.EVM().Transactions(), + evmcfg.Database().Listener(), + ethKeyStore, + txBuilder, + nil, + logger.TestLogger(t), + &testCheckerFactory{}, + false, + ) + + // Instance starts without error even if loading next sequence map fails + err := eb.Start(testutils.Context(t)) + require.NoError(t, err) +} + func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { db := pgtest.NewSqlxDB(t) cfg := configtest.NewTestGeneralConfig(t) @@ -946,7 +978,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_ResumingFromCrash(t *testing.T) { } func getLocalNextNonce(t *testing.T, eb *txmgr.Broadcaster, fromAddress gethCommon.Address) uint64 { - n, err := eb.GetNextSequence(fromAddress) + n, err := eb.GetNextSequence(testutils.Context(t), fromAddress) require.NoError(t, err) require.NotNil(t, n) return uint64(n) @@ -972,6 +1004,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() eb := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg, &testCheckerFactory{}, false) + ctx := testutils.Context(t) require.NoError(t, utils.JustError(db.Exec(`SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`))) @@ -983,7 +1016,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { }), fromAddress).Return(commonclient.Successful, errors.New("replacement transaction underpriced")).Once() // Do the thing - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) assert.NoError(t, err) assert.False(t, retryable) @@ -1019,7 +1052,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce }), fromAddress).Return(commonclient.Fatal, errors.New(fatalErrorExample)).Once() - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) assert.NoError(t, err) assert.False(t, retryable) @@ -1036,7 +1069,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // Check that the key had its nonce reset var nonce evmtypes.Nonce - nonce, err = eb.GetNextSequence(fromAddress) + nonce, err = eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) // Saved NextNonce must be the same as before because this transaction // was not accepted by the eth node and never can be @@ -1070,7 +1103,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce }), fromAddress).Return(commonclient.Fatal, errors.New(fatalErrorExample)).Once() - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "something exploded in the callback") assert.True(t, retryable) @@ -1092,7 +1125,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { }), fromAddress).Return(commonclient.Fatal, errors.New(fatalErrorExample)).Once() { - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) assert.NoError(t, err) assert.False(t, retryable) } @@ -1105,7 +1138,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { localNextNonce = getLocalNextNonce(t, eb, fromAddress) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(localNextNonce), nil).Once() eb2 := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(evmcfg.EVM().GasEstimator()), evmcfg.EVM().Transactions(), evmcfg.Database().Listener(), ethKeyStore, txBuilder, nil, lggr, &testCheckerFactory{}, false) - retryable, err := eb2.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb2.ProcessUnstartedTxs(ctx, fromAddress) assert.NoError(t, err) assert.False(t, retryable) }) @@ -1127,7 +1160,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // another node even if the primary one returns "exceeds the configured // cap" - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), "tx fee (1.10 ether) exceeds the configured cap (1.00 ether)") assert.Contains(t, err.Error(), "error while sending transaction") @@ -1147,7 +1180,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // Check that the key had its nonce reset var nonce evmtypes.Nonce - nonce, err = eb.GetNextSequence(fromAddress) + nonce, err = eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) // Saved NextNonce must be the same as before because this transaction // was not accepted by the eth node and never can be @@ -1156,7 +1189,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { // On the second try, the tx has been accepted into the mempool ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(localNextNonce+1), nil).Once() - retryable, err = eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err = eb.ProcessUnstartedTxs(ctx, fromAddress) assert.NoError(t, err) assert.False(t, retryable) @@ -1184,7 +1217,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(localNextNonce), nil).Once() // Do the thing - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) require.Contains(t, err.Error(), retryableErrorExample) assert.True(t, retryable) @@ -1207,7 +1240,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce }), fromAddress).Return(commonclient.Successful, nil).Once() - retryable, err = eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err = eb.ProcessUnstartedTxs(ctx, fromAddress) assert.NoError(t, err) assert.False(t, retryable) @@ -1235,7 +1268,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), errors.New("pending nonce fetch failed")).Once() // Do the thing - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) require.Contains(t, err.Error(), retryableErrorExample) require.Contains(t, err.Error(), "pending nonce fetch failed") @@ -1259,7 +1292,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce }), fromAddress).Return(commonclient.Successful, nil).Once() - retryable, err = eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err = eb.ProcessUnstartedTxs(ctx, fromAddress) assert.NoError(t, err) assert.False(t, retryable) @@ -1288,7 +1321,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(localNextNonce+1), nil).Once() // Do the thing - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.NoError(t, err) assert.False(t, retryable) @@ -1330,7 +1363,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { }), fromAddress).Return(commonclient.Successful, nil).Once() // Do the thing - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.NoError(t, err) assert.False(t, retryable) @@ -1366,7 +1399,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { }), fromAddress).Return(commonclient.Retryable, failedToReachNodeError).Once() // Do the thing - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") assert.True(t, retryable) @@ -1397,7 +1430,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { }), fromAddress).Return(commonclient.Successful, errors.New(temporarilyUnderpricedError)).Once() // Do the thing - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) assert.NoError(t, err) assert.False(t, retryable) @@ -1437,7 +1470,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { }), fromAddress).Return(commonclient.Underpriced, errors.New(underpricedError)).Once() // Do the thing - retryable, err := eb2.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb2.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "bumped fee price of 20 gwei is equal to original fee price of 20 gwei. ACTION REQUIRED: This is a configuration error, you must increase either FeeEstimator.BumpPercent or FeeEstimator.BumpMin") assert.True(t, retryable) @@ -1454,7 +1487,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce }), fromAddress).Return(commonclient.InsufficientFunds, errors.New(insufficientEthError)).Once() - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), "insufficient funds for transfer") assert.True(t, retryable) @@ -1484,7 +1517,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce }), fromAddress).Return(commonclient.Retryable, errors.New(nonceGapError)).Once() - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) assert.Contains(t, err.Error(), nonceGapError) assert.True(t, retryable) @@ -1529,7 +1562,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { }), fromAddress).Return(commonclient.Underpriced, errors.New(underpricedError)).Once() // Check gas tip cap verification - retryable, err := eb2.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb2.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "bumped gas tip cap of 1 wei is less than or equal to original gas tip cap of 1 wei") assert.True(t, retryable) @@ -1553,7 +1586,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(localNextNonce, nil).Once() eb2 := NewTestEthBroadcaster(t, txStore, ethClient, ethKeyStore, evmcfg2, &testCheckerFactory{}, false) - retryable, err := eb2.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb2.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "specified gas tip cap of 0 is below min configured gas tip of 1 wei for key") assert.True(t, retryable) @@ -1580,7 +1613,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Errors(t *testing.T) { return tx.Nonce() == localNextNonce && tx.GasTipCap().Cmp(big.NewInt(0).Add(gasTipCapDefault.ToInt(), big.NewInt(0).Mul(evmcfg2.EVM().GasEstimator().BumpMin().ToInt(), big.NewInt(2)))) == 0 }), fromAddress).Return(commonclient.Successful, nil).Once() - retryable, err = eb2.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err = eb2.ProcessUnstartedTxs(ctx, fromAddress) require.NoError(t, err) assert.False(t, retryable) @@ -1612,7 +1645,8 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_KeystoreErrors(t *testing.T) { kst.On("EnabledAddressesForChain", &cltest.FixtureChainID).Return(addresses, nil).Once() ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() eb := NewTestEthBroadcaster(t, txStore, ethClient, kst, evmcfg, &testCheckerFactory{}, false) - _, err := eb.GetNextSequence(fromAddress) + ctx := testutils.Context(t) + _, err := eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) t.Run("tx signing fails", func(t *testing.T) { @@ -1626,7 +1660,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_KeystoreErrors(t *testing.T) { })).Return(&tx, errors.New("could not sign transaction")) // Do the thing - retryable, err := eb.ProcessUnstartedTxs(testutils.Context(t), fromAddress) + retryable, err := eb.ProcessUnstartedTxs(ctx, fromAddress) require.Error(t, err) require.Contains(t, err.Error(), "could not sign transaction") assert.True(t, retryable) @@ -1640,7 +1674,7 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_KeystoreErrors(t *testing.T) { // Check that the key did not have its nonce incremented var nonce types.Nonce - nonce, err = eb.GetNextSequence(fromAddress) + nonce, err = eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) require.Equal(t, int64(localNonce), int64(nonce)) }) @@ -1678,12 +1712,13 @@ func TestEthBroadcaster_IncrementNextNonce(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() eb := NewTestEthBroadcaster(t, txStore, ethClient, kst, evmcfg, &testCheckerFactory{}, false) - nonce, err := eb.GetNextSequence(fromAddress) + ctx := testutils.Context(t) + nonce, err := eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) eb.IncrementNextSequence(fromAddress, nonce) // Nonce bumped to 1 - nonce, err = eb.GetNextSequence(fromAddress) + nonce, err = eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) require.Equal(t, int64(1), int64(nonce)) } @@ -1737,7 +1772,7 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { kst.On("EnabledAddressesForChain", &cltest.FixtureChainID).Return(addresses, nil).Once() ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() eb := txmgr.NewEvmBroadcaster(txStore, txmgr.NewEvmTxmClient(ethClient), evmTxmCfg, txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database().Listener(), kst, txBuilder, nil, lggr, checkerFactory, false) - err := eb.Start(testutils.Context(t)) + err := eb.Start(ctx) assert.NoError(t, err) defer func() { assert.NoError(t, eb.Close()) }() @@ -1763,12 +1798,12 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { testutils.WaitForLogMessage(t, observed, "Fast-forward sequence") // Check nextSequenceMap to make sure it has correct nonce assigned - nonce, err := eb.GetNextSequence(fromAddress) + nonce, err := eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) - assert.Equal(t, strconv.FormatUint(ethNodeNonce, 10), nonce.String()) + require.Equal(t, strconv.FormatUint(ethNodeNonce, 10), nonce.String()) // The disabled key did not get updated - _, err = eb.GetNextSequence(disabledAddress) + _, err = eb.GetNextSequence(ctx, disabledAddress) require.Error(t, err) }) @@ -1797,19 +1832,19 @@ func TestEthBroadcaster_SyncNonce(t *testing.T) { testutils.WaitForLogMessage(t, observed, "Fast-forward sequence") // Check keyState to make sure it has correct nonce assigned - nonce, err := eb.GetNextSequence(fromAddress) + nonce, err := eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) assert.Equal(t, int64(ethNodeNonce), int64(nonce)) // The disabled key did not get updated - _, err = eb.GetNextSequence(disabledAddress) + _, err = eb.GetNextSequence(ctx, disabledAddress) require.Error(t, err) }) - } func Test_LoadSequenceMap(t *testing.T) { t.Parallel() + ctx := testutils.Context(t) t.Run("set next nonce using entries from tx table", func(t *testing.T) { db := pgtest.NewSqlxDB(t) cfg := configtest.NewTestGeneralConfig(t) @@ -1824,9 +1859,9 @@ func Test_LoadSequenceMap(t *testing.T) { cltest.MustInsertUnconfirmedEthTx(t, txStore, int64(1), fromAddress) eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) - nonce, err := eb.GetNextSequence(fromAddress) + nonce, err := eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) - assert.Equal(t, int64(2), int64(nonce)) + require.Equal(t, int64(2), int64(nonce)) }) t.Run("set next nonce using client when not found in tx table", func(t *testing.T) { @@ -1842,9 +1877,9 @@ func Test_LoadSequenceMap(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(10), nil).Once() eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) - nonce, err := eb.GetNextSequence(fromAddress) + nonce, err := eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) - assert.Equal(t, int64(10), int64(nonce)) + require.Equal(t, int64(10), int64(nonce)) }) } @@ -1863,25 +1898,53 @@ func Test_NextNonce(t *testing.T) { _, addr1 := cltest.MustInsertRandomKey(t, ks) ethClient.On("PendingNonceAt", mock.Anything, addr1).Return(uint64(randNonce), nil).Once() eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) - + ctx := testutils.Context(t) cltest.MustInsertRandomKey(t, ks, *utils.NewBig(testutils.FixtureChainID)) - nonce, err := eb.GetNextSequence(addr1) + nonce, err := eb.GetNextSequence(ctx, addr1) require.NoError(t, err) - assert.Equal(t, randNonce, int64(nonce)) + require.Equal(t, randNonce, int64(nonce)) randAddr1 := utils.RandomAddress() - _, err = eb.GetNextSequence(randAddr1) + _, err = eb.GetNextSequence(ctx, randAddr1) require.Error(t, err) - assert.Contains(t, err.Error(), fmt.Sprintf("address not found in next sequence map: %s", randAddr1.Hex())) + require.Contains(t, err.Error(), fmt.Sprintf("address disabled: %s", randAddr1.Hex())) randAddr2 := utils.RandomAddress() - _, err = eb.GetNextSequence(randAddr2) + _, err = eb.GetNextSequence(ctx, randAddr2) require.Error(t, err) - assert.Contains(t, err.Error(), fmt.Sprintf("address not found in next sequence map: %s", randAddr2.Hex())) + require.Contains(t, err.Error(), fmt.Sprintf("address disabled: %s", randAddr2.Hex())) } +func Test_SetNonceAfterInit(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewTestGeneralConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ks := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + evmcfg := evmtest.NewChainScopedConfig(t, cfg) + checkerFactory := &txmgr.CheckerFactory{Client: ethClient} + randNonce := testutils.NewRandomPositiveInt64() + _, addr1 := cltest.MustInsertRandomKey(t, ks) + ethClient.On("PendingNonceAt", mock.Anything, addr1).Return(uint64(0), errors.New("failed to retrieve nonce at startup")).Once() + ethClient.On("PendingNonceAt", mock.Anything, addr1).Return(uint64(randNonce), nil).Once() + eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) + + ctx := testutils.Context(t) + nonce, err := eb.GetNextSequence(ctx, addr1) + require.NoError(t, err) + require.Equal(t, randNonce, int64(nonce)) + + // Test that the new nonce is set in the map and does not need a client call to retrieve on subsequent calls + nonce, err = eb.GetNextSequence(ctx, addr1) + require.NoError(t, err) + require.Equal(t, randNonce, int64(nonce)) +} + func Test_IncrementNextNonce(t *testing.T) { t.Parallel() @@ -1898,26 +1961,27 @@ func Test_IncrementNextNonce(t *testing.T) { ethClient.On("PendingNonceAt", mock.Anything, addr1).Return(uint64(randNonce), nil).Once() eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) - nonce, err := eb.GetNextSequence(addr1) + ctx := testutils.Context(t) + nonce, err := eb.GetNextSequence(ctx, addr1) require.NoError(t, err) eb.IncrementNextSequence(addr1, nonce) - nonce, err = eb.GetNextSequence(addr1) + nonce, err = eb.GetNextSequence(ctx, addr1) require.NoError(t, err) assert.Equal(t, randNonce+1, int64(nonce)) eb.IncrementNextSequence(addr1, nonce) - nonce, err = eb.GetNextSequence(addr1) + nonce, err = eb.GetNextSequence(ctx, addr1) require.NoError(t, err) assert.Equal(t, randNonce+2, int64(nonce)) randAddr1 := utils.RandomAddress() - _, err = eb.GetNextSequence(randAddr1) + _, err = eb.GetNextSequence(ctx, randAddr1) require.Error(t, err) - assert.Contains(t, err.Error(), fmt.Sprintf("address not found in next sequence map: %s", randAddr1.Hex())) + assert.Contains(t, err.Error(), fmt.Sprintf("address disabled: %s", randAddr1.Hex())) // verify it didnt get changed by any erroring calls - nonce, err = eb.GetNextSequence(addr1) + nonce, err = eb.GetNextSequence(ctx, addr1) require.NoError(t, err) assert.Equal(t, randNonce+2, int64(nonce)) } @@ -1936,14 +2000,15 @@ func Test_SetNextNonce(t *testing.T) { _, fromAddress := cltest.MustInsertRandomKey(t, ks) ethClient.On("PendingNonceAt", mock.Anything, fromAddress).Return(uint64(0), nil).Once() eb := NewTestEthBroadcaster(t, txStore, ethClient, ks, evmcfg, checkerFactory, false) + ctx := testutils.Context(t) t.Run("update next nonce", func(t *testing.T) { - nonce, err := eb.GetNextSequence(fromAddress) + nonce, err := eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) assert.Equal(t, int64(0), int64(nonce)) eb.SetNextSequence(fromAddress, evmtypes.Nonce(24)) - newNextNonce, err := eb.GetNextSequence(fromAddress) + newNextNonce, err := eb.GetNextSequence(ctx, fromAddress) require.NoError(t, err) assert.Equal(t, int64(24), int64(newNextNonce)) }) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a10f9dd1c6d..1351c421340 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -36,6 +36,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ... +## 2.7.1 - UNRELEASED + +### Fixed + +- Fixed a bug that causes the node to shutdown if all configured RPC's are unreachable during startup. + ## 2.7.0 - UNRELEASED ### Added