Skip to content

Commit

Permalink
Latest TXMv2 updates
Browse files Browse the repository at this point in the history
  • Loading branch information
dimriou committed Dec 4, 2024
1 parent 1584d28 commit c6e1f2e
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 56 deletions.
30 changes: 15 additions & 15 deletions core/chains/evm/txm/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 22 additions & 8 deletions core/chains/evm/txm/storage/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (
)

const (
// maxQueuedTransactions is the max limit of UnstartedTransactions and ConfirmedTransactions structures.
maxQueuedTransactions = 250
pruneSubset = 3
// pruneSubset controls the subset of confirmed transactions to prune when the structure reaches its max limit.
// i.e. if the value is 3 and the limit is 90, 30 transactions will be pruned.
pruneSubset = 3
)

type InMemoryStore struct {
Expand Down Expand Up @@ -111,6 +114,10 @@ func (m *InMemoryStore) CreateEmptyUnconfirmedTransaction(nonce uint64, gasLimit
return nil, fmt.Errorf("an unconfirmed tx with the same nonce already exists: %v", m.UnconfirmedTransactions[nonce])
}

if _, exists := m.Transactions[nonce]; exists {
return nil, fmt.Errorf("a tx with the same nonce already exists: %v", m.Transactions[nonce])
}

m.UnconfirmedTransactions[nonce] = emptyTx
m.Transactions[emptyTx.ID] = emptyTx

Expand Down Expand Up @@ -165,18 +172,18 @@ func (m *InMemoryStore) FetchUnconfirmedTransactionAtNonceWithCount(latestNonce
return
}

func (m *InMemoryStore) MarkTransactionsConfirmed(latestNonce uint64) ([]uint64, []uint64, error) {
func (m *InMemoryStore) MarkConfirmedAndReorgedTransactions(latestNonce uint64) ([]*types.Transaction, []uint64, error) {
m.Lock()
defer m.Unlock()

var confirmedTransactionIDs []uint64
var confirmedTransactions []*types.Transaction
for _, tx := range m.UnconfirmedTransactions {
if tx.Nonce == nil {
return nil, nil, fmt.Errorf("nonce for txID: %v is empty", tx.ID)
}
if *tx.Nonce < latestNonce {
tx.State = types.TxConfirmed
confirmedTransactionIDs = append(confirmedTransactionIDs, tx.ID)
confirmedTransactions = append(confirmedTransactions, tx.DeepCopy())
m.ConfirmedTransactions[*tx.Nonce] = tx
delete(m.UnconfirmedTransactions, *tx.Nonce)
}
Expand All @@ -201,9 +208,9 @@ func (m *InMemoryStore) MarkTransactionsConfirmed(latestNonce uint64) ([]uint64,
m.lggr.Debugf("Confirmed transactions map for address: %v reached max limit of: %d. Pruned 1/3 of the oldest confirmed transactions. TxIDs: %v",
m.address, maxQueuedTransactions, prunedTxIDs)
}
sort.Slice(confirmedTransactionIDs, func(i, j int) bool { return confirmedTransactionIDs[i] < confirmedTransactionIDs[j] })
sort.Slice(confirmedTransactions, func(i, j int) bool { return confirmedTransactions[i].ID < confirmedTransactions[j].ID })
sort.Slice(unconfirmedTransactionIDs, func(i, j int) bool { return unconfirmedTransactionIDs[i] < unconfirmedTransactionIDs[j] })
return confirmedTransactionIDs, unconfirmedTransactionIDs, nil
return confirmedTransactions, unconfirmedTransactionIDs, nil
}

func (m *InMemoryStore) MarkUnconfirmedTransactionPurgeable(nonce uint64) error {
Expand Down Expand Up @@ -232,6 +239,9 @@ func (m *InMemoryStore) UpdateTransactionBroadcast(txID uint64, txNonce uint64,
// Set the same time for both the tx and its attempt
now := time.Now()
unconfirmedTx.LastBroadcastAt = now
if unconfirmedTx.InitialBroadcastAt.IsZero() {
unconfirmedTx.InitialBroadcastAt = now
}
a, err := unconfirmedTx.FindAttemptByHash(attemptHash)
if err != nil {
return err
Expand All @@ -254,6 +264,10 @@ func (m *InMemoryStore) UpdateUnstartedTransactionWithNonce(nonce uint64) (*type
return nil, fmt.Errorf("an unconfirmed tx with the same nonce already exists: %v", m.UnconfirmedTransactions[nonce])
}

if _, exists := m.Transactions[nonce]; exists {
return nil, fmt.Errorf("a tx with the same nonce already exists: %v", m.Transactions[nonce])
}

tx := m.UnstartedTransactions[0]
tx.Nonce = &nonce
tx.State = types.TxUnconfirmed
Expand Down Expand Up @@ -315,8 +329,8 @@ func (m *InMemoryStore) MarkTxFatal(*types.Transaction) error {

// Orchestrator
func (m *InMemoryStore) FindTxWithIdempotencyKey(idempotencyKey *string) *types.Transaction {
m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()

if idempotencyKey != nil {
for _, tx := range m.Transactions {
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txm/storage/inmemory_store_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func (m *InMemoryStoreManager) FetchUnconfirmedTransactionAtNonceWithCount(_ con
return nil, 0, fmt.Errorf(StoreNotFoundForAddress, fromAddress)
}

func (m *InMemoryStoreManager) MarkTransactionsConfirmed(_ context.Context, nonce uint64, fromAddress common.Address) (confirmedTxIDs []uint64, unconfirmedTxIDs []uint64, err error) {
func (m *InMemoryStoreManager) MarkConfirmedAndReorgedTransactions(_ context.Context, nonce uint64, fromAddress common.Address) (confirmedTxs []*types.Transaction, unconfirmedTxIDs []uint64, err error) {
if store, exists := m.InMemoryStoreMap[fromAddress]; exists {
confirmedTxIDs, unconfirmedTxIDs, err = store.MarkTransactionsConfirmed(nonce)
confirmedTxs, unconfirmedTxIDs, err = store.MarkConfirmedAndReorgedTransactions(nonce)
return
}
return nil, nil, fmt.Errorf(StoreNotFoundForAddress, fromAddress)
Expand Down
30 changes: 24 additions & 6 deletions core/chains/evm/txm/storage/inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestMarkTransactionsConfirmed(t *testing.T) {

t.Run("returns 0 if there are no transactions", func(t *testing.T) {
m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID)
un, cn, err := m.MarkTransactionsConfirmed(100)
un, cn, err := m.MarkConfirmedAndReorgedTransactions(100)
require.NoError(t, err)
assert.Empty(t, un)
assert.Empty(t, cn)
Expand All @@ -206,11 +206,27 @@ func TestMarkTransactionsConfirmed(t *testing.T) {
ctx2, err := insertUnconfirmedTransaction(m, 1)
require.NoError(t, err)

ctxs, utxs, err := m.MarkTransactionsConfirmed(1)
ctxs, utxs, err := m.MarkConfirmedAndReorgedTransactions(1)
require.NoError(t, err)
assert.Equal(t, types.TxConfirmed, ctx1.State)
assert.Equal(t, types.TxUnconfirmed, ctx2.State)
assert.Equal(t, ctxs[0], ctx1.ID)
assert.Equal(t, ctxs[0].ID, ctx1.ID) // Ensure order
assert.Empty(t, utxs)
})

t.Run("state remains the same if nonce didn't change", func(t *testing.T) {
m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID)
ctx1, err := insertConfirmedTransaction(m, 0)
require.NoError(t, err)

ctx2, err := insertUnconfirmedTransaction(m, 1)
require.NoError(t, err)

ctxs, utxs, err := m.MarkConfirmedAndReorgedTransactions(1)
require.NoError(t, err)
assert.Equal(t, types.TxConfirmed, ctx1.State)
assert.Equal(t, types.TxUnconfirmed, ctx2.State)
assert.Empty(t, ctxs)
assert.Empty(t, utxs)
})

Expand All @@ -222,7 +238,7 @@ func TestMarkTransactionsConfirmed(t *testing.T) {
ctx2, err := insertConfirmedTransaction(m, 1)
require.NoError(t, err)

ctxs, utxs, err := m.MarkTransactionsConfirmed(1)
ctxs, utxs, err := m.MarkConfirmedAndReorgedTransactions(1)
require.NoError(t, err)
assert.Equal(t, types.TxConfirmed, ctx1.State)
assert.Equal(t, types.TxUnconfirmed, ctx2.State)
Expand All @@ -237,7 +253,7 @@ func TestMarkTransactionsConfirmed(t *testing.T) {
require.NoError(t, err)
}
assert.Len(t, m.ConfirmedTransactions, maxQueuedTransactions)
_, _, err := m.MarkTransactionsConfirmed(maxQueuedTransactions)
_, _, err := m.MarkConfirmedAndReorgedTransactions(maxQueuedTransactions)
require.NoError(t, err)
assert.Len(t, m.ConfirmedTransactions, (maxQueuedTransactions - maxQueuedTransactions/pruneSubset))
})
Expand Down Expand Up @@ -294,6 +310,7 @@ func TestUpdateTransactionBroadcast(t *testing.T) {
require.NoError(t, m.UpdateTransactionBroadcast(0, nonce, hash))
assert.False(t, tx.LastBroadcastAt.IsZero())
assert.False(t, attempt.BroadcastAt.IsZero())
assert.False(t, tx.InitialBroadcastAt.IsZero())
})
}

Expand All @@ -308,7 +325,7 @@ func TestUpdateUnstartedTransactionWithNonce(t *testing.T) {
assert.Nil(t, tx)
})

t.Run("fails if there is already another unstarted transaction with the same nonce", func(t *testing.T) {
t.Run("fails if there is already another unconfirmed transaction with the same nonce", func(t *testing.T) {
var nonce uint64
m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID)
insertUnstartedTransaction(m)
Expand All @@ -328,6 +345,7 @@ func TestUpdateUnstartedTransactionWithNonce(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, nonce, *tx.Nonce)
assert.Equal(t, types.TxUnconfirmed, tx.State)
assert.Empty(t, m.UnstartedTransactions)
})
}

Expand Down
39 changes: 27 additions & 12 deletions core/chains/evm/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type TxStore interface {
CreateEmptyUnconfirmedTransaction(context.Context, common.Address, uint64, uint64) (*types.Transaction, error)
CreateTransaction(context.Context, *types.TxRequest) (*types.Transaction, error)
FetchUnconfirmedTransactionAtNonceWithCount(context.Context, uint64, common.Address) (*types.Transaction, int, error)
MarkTransactionsConfirmed(context.Context, uint64, common.Address) ([]uint64, []uint64, error)
MarkConfirmedAndReorgedTransactions(context.Context, uint64, common.Address) ([]*types.Transaction, []uint64, error)
MarkUnconfirmedTransactionPurgeable(context.Context, uint64, common.Address) error
UpdateTransactionBroadcast(context.Context, uint64, uint64, common.Hash, common.Address) error
UpdateUnstartedTransactionWithNonce(context.Context, common.Address, uint64) (*types.Transaction, error)
Expand Down Expand Up @@ -78,6 +78,10 @@ var (
Name: "txm_num_nonce_gaps",
Help: "Total number of nonce gaps created that the transaction manager had to fill.",
}, []string{"chainID"})
promTimeUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "txm_time_until_tx_confirmed",
Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.",
}, []string{"chainID"})
)

type Config struct {
Expand Down Expand Up @@ -275,7 +279,7 @@ func (t *Txm) broadcastTransaction(ctx context.Context, address common.Address)
return false, err
}

// Optimistically send up to 1/3 of the maxInFlightTransactions. After that threshold, broadcast more cautiously
// Optimistically send up to 1/maxInFlightSubset of the maxInFlightTransactions. After that threshold, broadcast more cautiously
// by checking the pending nonce so no more than maxInFlightTransactions/3 can get stuck simultaneously i.e. due
// to insufficient balance. We're making this trade-off to avoid storing stuck transactions and making unnecessary
// RPC calls. The upper limit is always maxInFlightTransactions regardless of the pending nonce.
Expand All @@ -296,17 +300,15 @@ func (t *Txm) broadcastTransaction(ctx context.Context, address common.Address)
}
}

tx, err := t.txStore.UpdateUnstartedTransactionWithNonce(ctx, address, t.getNonce(address))
nonce := t.getNonce(address)
tx, err := t.txStore.UpdateUnstartedTransactionWithNonce(ctx, address, nonce)
if err != nil {
return false, err
}
if tx == nil {
return false, nil
}
nonce := t.getNonce(address)
tx.Nonce = &nonce
t.setNonce(address, *tx.Nonce+1)
tx.State = types.TxUnconfirmed
t.setNonce(address, nonce+1)

if err := t.createAndSendAttempt(ctx, tx, address); err != nil {
return true, err
Expand Down Expand Up @@ -337,7 +339,7 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio
start := time.Now()
txErr := t.client.SendTransaction(ctx, tx, attempt)
tx.AttemptCount++
t.lggr.Infow("Broadcasted attempt", "tx", tx, "attempt", attempt, "duration", time.Since(start), "txErr: ", txErr)
t.lggr.Infow("Broadcasted attempt", "tx", tx.PrettyPrint(), "attempt", attempt.PrettyPrint(), "duration", time.Since(start), "txErr: ", txErr)
if txErr != nil && t.errorHandler != nil {
if err = t.errorHandler.HandleError(tx, txErr, t.attemptBuilder, t.client, t.txStore, t.setNonce, false); err != nil {
return
Expand All @@ -363,12 +365,13 @@ func (t *Txm) backfillTransactions(ctx context.Context, address common.Address)
return false, err
}

confirmedTransactionIDs, unconfirmedTransactionIDs, err := t.txStore.MarkTransactionsConfirmed(ctx, latestNonce, address)
confirmedTransactions, unconfirmedTransactionIDs, err := t.txStore.MarkConfirmedAndReorgedTransactions(ctx, latestNonce, address)
if err != nil {
return false, err
}
if len(confirmedTransactionIDs) > 0 || len(unconfirmedTransactionIDs) > 0 {
promNumConfirmedTxs.WithLabelValues(t.chainID.String()).Add(float64(len(confirmedTransactionIDs)))
if len(confirmedTransactions) > 0 || len(unconfirmedTransactionIDs) > 0 {
promNumConfirmedTxs.WithLabelValues(t.chainID.String()).Add(float64(len(confirmedTransactions)))
confirmedTransactionIDs := extractMetrics(confirmedTransactions, t.chainID)
t.lggr.Infof("Confirmed transaction IDs: %v . Re-orged transaction IDs: %v", confirmedTransactionIDs, unconfirmedTransactionIDs)
}

Expand Down Expand Up @@ -405,7 +408,8 @@ func (t *Txm) backfillTransactions(ctx context.Context, address common.Address)
if tx.AttemptCount >= maxAllowedAttempts {
return true, fmt.Errorf("reached max allowed attempts for txID: %d. TXM won't broadcast any more attempts."+
"If this error persists, it means the transaction won't be confirmed and the TXM needs to be restarted."+
"Look for any error messages from previous attempts that may indicate why this happened, i.e. wallet is out of funds. Tx: %v", tx.ID, tx)
"Look for any error messages from previous broadcasted attempts that may indicate why this happened, i.e. wallet is out of funds. Tx: %v", tx.ID,
tx.PrettyPrintWithAttempts())
}

if time.Since(tx.LastBroadcastAt) > (t.config.BlockTime*time.Duration(t.config.RetryBlockThreshold)) || tx.LastBroadcastAt.IsZero() {
Expand All @@ -424,3 +428,14 @@ func (t *Txm) createAndSendEmptyTx(ctx context.Context, latestNonce uint64, addr
}
return t.createAndSendAttempt(ctx, tx, address)
}

func extractMetrics(txs []*types.Transaction, chainID *big.Int) []uint64 {
confirmedTxIDs := make([]uint64, 0, len(txs))
for _, tx := range txs {
confirmedTxIDs = append(confirmedTxIDs, tx.ID)
if !tx.InitialBroadcastAt.IsZero() {
promTimeUntilTxConfirmed.WithLabelValues(chainID.String()).Observe(float64(time.Since(tx.InitialBroadcastAt)))
}
}
return confirmedTxIDs
}
5 changes: 3 additions & 2 deletions core/chains/evm/txm/txm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,11 @@ func TestBackfillTransactions(t *testing.T) {
require.ErrorContains(t, err, "latest nonce fail")
})

t.Run("fails if MarkTransactionsConfirmed fails", func(t *testing.T) {
t.Run("fails if MarkConfirmedAndReorgedTransactions fails", func(t *testing.T) {
txm := NewTxm(logger.Test(t), testutils.FixtureChainID, client, ab, storage, nil, config, keystore)
client.On("NonceAt", mock.Anything, address, mock.Anything).Return(uint64(0), nil).Once()
storage.On("MarkTransactionsConfirmed", mock.Anything, mock.Anything, address).Return([]uint64{}, []uint64{}, errors.New("marking transactions confirmed failed")).Once()
storage.On("MarkConfirmedAndReorgedTransactions", mock.Anything, mock.Anything, address).
Return([]*types.Transaction{}, []uint64{}, errors.New("marking transactions confirmed failed")).Once()
bo, err := txm.backfillTransactions(ctx, address)
require.Error(t, err)
assert.False(t, bo)
Expand Down
Loading

0 comments on commit c6e1f2e

Please sign in to comment.