diff --git a/chainio/txmgr/geometric.go b/chainio/txmgr/geometric.go new file mode 100644 index 00000000..f9057eee --- /dev/null +++ b/chainio/txmgr/geometric.go @@ -0,0 +1,428 @@ +package txmgr + +import ( + "context" + "errors" + "fmt" + "math/big" + "net/url" + "sync" + "time" + + "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" + "github.com/Layr-Labs/eigensdk-go/chainio/clients/wallet" + "github.com/Layr-Labs/eigensdk-go/chainio/gasoracle" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" +) + +// percentage multiplier for gas price. It needs to be >= 10 to properly replace existing transaction +// e.g. 10 means 10% increase +var ( + gasPricePercentageMultiplier = big.NewInt(10) + hundred = big.NewInt(100) + maxSendTransactionRetry = 3 + queryTickerDuration = 3 * time.Second + ErrTransactionNotBroadcasted = errors.New("transaction not broadcasted") +) + +type transaction struct { + *types.Transaction + TxID wallet.TxID + requestedAt time.Time +} + +type TxnRequest struct { + Tx *types.Transaction + Tag string + Value *big.Int + Metadata interface{} + + requestedAt time.Time + // txAttempts are the transactions that have been attempted to be mined for this request. + // If a transaction hasn't been confirmed within the timeout and a replacement transaction is sent, + // the original transaction hash will be kept in this slice + txAttempts []*transaction +} + +// ReceiptOrErr is a wrapper for a transaction receipt or an error. +// Receipt should be nil if there is an error, and non-nil if there is no error. +// Metadata is the metadata passed in with the transaction request. +type ReceiptOrErr struct { + Receipt *types.Receipt + Metadata interface{} + Err error +} + +type GeometricTxManager struct { + mu sync.Mutex + + ethClient eth.Client + gasOracle gasoracle.GasOracle + wallet wallet.Wallet + numConfirmations int + requestChan chan *TxnRequest + logger logging.Logger + + receiptChan chan *ReceiptOrErr + queueSize int + txnBroadcastTimeout time.Duration + txnRefreshInterval time.Duration + metrics *Metrics +} + +var _ AsyncTxManager = (*GeometricTxManager)(nil) + +func NewTxnManager(ethClient eth.Client, gasOracle gasoracle.GasOracle, wallet wallet.Wallet, numConfirmations, queueSize int, txnBroadcastTimeout time.Duration, txnRefreshInterval time.Duration, logger logging.Logger, metrics *Metrics) *GeometricTxManager { + return &GeometricTxManager{ + ethClient: ethClient, + gasOracle: gasOracle, + wallet: wallet, + numConfirmations: numConfirmations, + requestChan: make(chan *TxnRequest, queueSize), + logger: logger.With("component", "TxnManager"), + receiptChan: make(chan *ReceiptOrErr, queueSize), + queueSize: queueSize, + txnBroadcastTimeout: txnBroadcastTimeout, + txnRefreshInterval: txnRefreshInterval, + metrics: metrics, + } +} + +func NewTxnRequest(tx *types.Transaction, tag string, value *big.Int, metadata interface{}) *TxnRequest { + return &TxnRequest{ + Tx: tx, + Tag: tag, + Value: value, + Metadata: metadata, + + requestedAt: time.Now(), + txAttempts: make([]*transaction, 0), + } +} + +func (t *GeometricTxManager) Start(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case req := <-t.requestChan: + receipt, err := t.monitorTransaction(ctx, req) + if err != nil { + t.receiptChan <- &ReceiptOrErr{ + Receipt: nil, + Metadata: req.Metadata, + Err: err, + } + } else { + t.receiptChan <- &ReceiptOrErr{ + Receipt: receipt, + Metadata: req.Metadata, + Err: nil, + } + if receipt.GasUsed > 0 { + t.metrics.UpdateGasUsed(receipt.GasUsed) + } + } + t.metrics.ObserveLatency("total", float64(time.Since(req.requestedAt).Milliseconds())) + } + } + }() + t.logger.Info("started TxnManager") +} + +// ProcessTransaction sends the transaction and queues the transaction for monitoring. +// It returns an error if the transaction fails to be confirmed for reasons other than timeouts. +// TxnManager monitors the transaction and resends it with a higher gas price if it is not mined without a timeout until the transaction is confirmed or failed. +func (t *GeometricTxManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error { + t.mu.Lock() + defer t.mu.Unlock() + t.logger.Debug("new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap()) + + var txn *types.Transaction + var txID wallet.TxID + var err error + retryFromFailure := 0 + for retryFromFailure < maxSendTransactionRetry { + gasTipCap, gasFeeCap, err := t.gasOracle.GetLatestGasCaps(ctx) + if err != nil { + return fmt.Errorf("failed to get latest gas caps: %w", err) + } + + from, err := t.wallet.SenderAddress(ctx) + if err != nil { + return fmt.Errorf("failed to get sender address: %w", err) + } + txn, err = t.gasOracle.UpdateGas(ctx, req.Tx, req.Value, gasTipCap, gasFeeCap, from) + if err != nil { + return fmt.Errorf("failed to update gas price: %w", err) + } + txID, err = t.wallet.SendTransaction(ctx, txn) + var urlErr *url.Error + didTimeout := false + if errors.As(err, &urlErr) { + didTimeout = urlErr.Timeout() + } + if didTimeout || errors.Is(err, context.DeadlineExceeded) { + t.logger.Warn("failed to send txn due to timeout", "tag", req.Tag, "hash", txn.Hash().Hex(), "numRetries", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err) + retryFromFailure++ + continue + } else if err != nil { + return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, txn.Hash().Hex(), err) + } else { + t.logger.Debug("successfully sent txn", "tag", req.Tag, "txID", txID, "txHash", txn.Hash().Hex()) + break + } + } + + if txn == nil || txID == "" { + return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, req.Tx.Hash().Hex(), err) + } + + req.Tx = txn + req.txAttempts = append(req.txAttempts, &transaction{ + TxID: txID, + Transaction: txn, + requestedAt: time.Now(), + }) + + t.requestChan <- req + t.metrics.UpdateTxQueue(len(t.requestChan)) + return nil +} + +func (t *GeometricTxManager) ReceiptChan() chan *ReceiptOrErr { + return t.receiptChan +} + +// ensureAnyTransactionBroadcasted waits until all given transactions are broadcasted to the network. +func (t *GeometricTxManager) ensureAnyTransactionBroadcasted(ctx context.Context, txs []*transaction) error { + queryTicker := time.NewTicker(queryTickerDuration) + defer queryTicker.Stop() + + for { + for _, tx := range txs { + _, err := t.wallet.GetTransactionReceipt(ctx, tx.TxID) + if err == nil || errors.Is(err, wallet.ErrReceiptNotYetAvailable) { + t.metrics.ObserveLatency("broadcasted", float64(time.Since(tx.requestedAt).Milliseconds())) + return nil + } + } + + // Wait for the next round. + select { + case <-ctx.Done(): + return ctx.Err() + case <-queryTicker.C: + } + } +} + +func (t *GeometricTxManager) ensureAnyTransactionEvaled(ctx context.Context, txs []*transaction) (*types.Receipt, error) { + queryTicker := time.NewTicker(queryTickerDuration) + defer queryTicker.Stop() + var receipt *types.Receipt + var err error + // transactions that need to be queried. Some transactions will be removed from this map depending on their status. + txnsToQuery := make(map[wallet.TxID]*types.Transaction, len(txs)) + for _, tx := range txs { + txnsToQuery[tx.TxID] = tx.Transaction + } + + for { + for txID, tx := range txnsToQuery { + receipt, err = t.wallet.GetTransactionReceipt(ctx, txID) + if err == nil { + chainTip, err := t.ethClient.BlockNumber(ctx) + if err == nil { + if receipt.BlockNumber.Uint64()+uint64(t.numConfirmations) > chainTip { + t.logger.Debug("transaction has been mined but don't have enough confirmations at current chain tip", "txnBlockNumber", receipt.BlockNumber.Uint64(), "numConfirmations", t.numConfirmations, "chainTip", chainTip) + break + } else { + return receipt, nil + } + } else { + t.logger.Debug("failed to get chain tip while waiting for transaction to mine", "err", err) + } + } + + if errors.Is(err, ethereum.NotFound) || errors.Is(err, wallet.ErrReceiptNotYetAvailable) { + t.logger.Debug("Transaction not yet mined", "txID", txID, "txHash", tx.Hash().Hex(), "err", err) + } else if errors.Is(err, wallet.ErrTransactionFailed) { + t.logger.Debug("Transaction failed", "txID", txID, "txHash", tx.Hash().Hex(), "err", err) + delete(txnsToQuery, txID) + } else if errors.Is(err, wallet.ErrNotYetBroadcasted) { + t.logger.Error("Transaction has not been broadcasted to network but attempted to retrieve receipt", "err", err) + } else { + t.logger.Debug("Transaction receipt retrieval failed", "err", err) + } + } + + if len(txnsToQuery) == 0 { + return nil, fmt.Errorf("all transactions failed") + } + + // Wait for the next round. + select { + case <-ctx.Done(): + return receipt, ctx.Err() + case <-queryTicker.C: + } + } +} + +// monitorTransaction waits until the transaction is confirmed (or failed) and resends it with a higher gas price if it is not mined without a timeout. +// It returns the receipt once the transaction has been confirmed. +// It returns an error if the transaction fails to be sent for reasons other than timeouts. +func (t *GeometricTxManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*types.Receipt, error) { + numSpeedUps := 0 + retryFromFailure := 0 + + var receipt *types.Receipt + var err error + + rpcCallAttempt := func() error { + t.logger.Debug("monitoring transaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce()) + + ctxWithTimeout, cancelBroadcastTimeout := context.WithTimeout(ctx, t.txnBroadcastTimeout) + defer cancelBroadcastTimeout() + + // Ensure transactions are broadcasted to the network before querying the receipt. + // This is to avoid querying the receipt of a transaction that hasn't been broadcasted yet. + // For example, when Fireblocks wallet is used, there may be delays in broadcasting the transaction due to latency from cosigning and MPC operations. + err = t.ensureAnyTransactionBroadcasted(ctxWithTimeout, req.txAttempts) + if err != nil && errors.Is(err, context.DeadlineExceeded) { + t.logger.Warn("transaction not broadcasted within timeout", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce()) + fireblocksWallet, ok := t.wallet.(interface { + CancelTransactionBroadcast(ctx context.Context, txID wallet.TxID) (bool, error) + }) + if ok { + // Consider these transactions failed as they haven't been broadcasted within timeout. + // Cancel these transactions to avoid blocking the next transactions. + for _, tx := range req.txAttempts { + cancelled, err := fireblocksWallet.CancelTransactionBroadcast(ctx, tx.TxID) + if err != nil { + t.logger.Warn("failed to cancel Fireblocks transaction broadcast", "txID", tx.TxID, "err", err) + } else if cancelled { + t.logger.Info("cancelled Fireblocks transaction broadcast because it didn't get broadcasted within timeout", "txID", tx.TxID, "timeout", t.txnBroadcastTimeout.String()) + } + } + } + return ErrTransactionNotBroadcasted + } else if err != nil { + t.logger.Error("unexpected error while waiting for Fireblocks transaction to broadcast", "txHash", req.Tx.Hash().Hex(), "err", err) + return err + } + + ctxWithTimeout, cancelEvaluationTimeout := context.WithTimeout(ctx, t.txnRefreshInterval) + defer cancelEvaluationTimeout() + receipt, err = t.ensureAnyTransactionEvaled( + ctxWithTimeout, + req.txAttempts, + ) + return err + } + + for { + err = rpcCallAttempt() + if err == nil { + t.metrics.UpdateSpeedUps(numSpeedUps) + t.metrics.IncrementTxnCount("success") + return receipt, nil + } + + if errors.Is(err, context.DeadlineExceeded) { + if receipt != nil { + t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce()) + continue + } + t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce()) + newTx, err := t.speedUpTxn(ctx, req.Tx, req.Tag) + if err != nil { + t.logger.Error("failed to speed up transaction", "err", err) + t.metrics.IncrementTxnCount("failure") + return nil, err + } + txID, err := t.wallet.SendTransaction(ctx, newTx) + if err != nil { + if retryFromFailure >= maxSendTransactionRetry { + t.logger.Warn("failed to send txn - retries exhausted", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err) + t.metrics.IncrementTxnCount("failure") + return nil, err + } else { + t.logger.Warn("failed to send txn - retrying", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "attempt", retryFromFailure, "maxRetry", maxSendTransactionRetry, "err", err) + } + retryFromFailure++ + continue + } + + t.logger.Debug("successfully sent txn", "tag", req.Tag, "txID", txID, "txHash", newTx.Hash().Hex()) + req.Tx = newTx + req.txAttempts = append(req.txAttempts, &transaction{ + TxID: txID, + Transaction: newTx, + }) + numSpeedUps++ + } else { + t.logger.Error("transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err) + t.metrics.IncrementTxnCount("failure") + return nil, err + } + } +} + +// speedUpTxn increases the gas price of the existing transaction by specified percentage. +// It makes sure the new gas price is not lower than the current gas price. +func (t *GeometricTxManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag string) (*types.Transaction, error) { + prevGasTipCap := tx.GasTipCap() + prevGasFeeCap := tx.GasFeeCap() + // get the gas tip cap and gas fee cap based on current network condition + currentGasTipCap, currentGasFeeCap, err := t.gasOracle.GetLatestGasCaps(ctx) + if err != nil { + return nil, err + } + increasedGasTipCap := increaseGasPrice(prevGasTipCap) + increasedGasFeeCap := increaseGasPrice(prevGasFeeCap) + // make sure increased gas prices are not lower than current gas prices + var newGasTipCap, newGasFeeCap *big.Int + if currentGasTipCap.Cmp(increasedGasTipCap) > 0 { + newGasTipCap = currentGasTipCap + } else { + newGasTipCap = increasedGasTipCap + } + if currentGasFeeCap.Cmp(increasedGasFeeCap) > 0 { + newGasFeeCap = currentGasFeeCap + } else { + newGasFeeCap = increasedGasFeeCap + } + + t.logger.Info("increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap) + from, err := t.wallet.SenderAddress(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get sender address: %w", err) + } + return t.gasOracle.UpdateGas(ctx, tx, tx.Value(), newGasTipCap, newGasFeeCap, from) +} + +// increaseGasPrice increases the gas price by specified percentage. +// i.e. gasPrice + ((gasPrice * gasPricePercentageMultiplier + 99) / 100) +func increaseGasPrice(gasPrice *big.Int) *big.Int { + if gasPrice == nil { + return nil + } + bump := new(big.Int).Mul(gasPrice, gasPricePercentageMultiplier) + bump = roundUpDivideBig(bump, hundred) + return new(big.Int).Add(gasPrice, bump) +} + +func roundUpDivideBig(a, b *big.Int) *big.Int { + if a == nil || b == nil || b.Cmp(big.NewInt(0)) == 0 { + return nil + } + one := new(big.Int).SetUint64(1) + num := new(big.Int).Sub(new(big.Int).Add(a, b), one) // a + b - 1 + res := new(big.Int).Div(num, b) // (a + b - 1) / b + return res +} diff --git a/chainio/txmgr/metrics.go b/chainio/txmgr/metrics.go new file mode 100644 index 00000000..2871444c --- /dev/null +++ b/chainio/txmgr/metrics.go @@ -0,0 +1,86 @@ +package txmgr + +import ( + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const namespace = "txmgr" + +type Metrics struct { + Latency *prometheus.SummaryVec + GasUsed prometheus.Gauge + SpeedUps prometheus.Gauge + TxQueue prometheus.Gauge + NumTx *prometheus.CounterVec +} + +func NewMetrics(reg prometheus.Registerer, subsystem string, logger logging.Logger) *Metrics { + + return &Metrics{ + Latency: promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "latency_ms", + Help: "transaction confirmation latency summary in milliseconds", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + }, + []string{"stage"}, + ), + GasUsed: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "gas_used", + Help: "gas used for onchain batch confirmation", + }, + ), + SpeedUps: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "speedups_count", + Help: "number of times the gas price was increased", + }, + ), + TxQueue: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "tx_queue_len", + Help: "number of transactions in transaction queue", + }, + ), + NumTx: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "txs_total", + Help: "number of transactions processed", + }, + []string{"state"}, + ), + } +} + +func (t *Metrics) ObserveLatency(stage string, latencyMs float64) { + t.Latency.WithLabelValues(stage).Observe(latencyMs) +} + +func (t *Metrics) UpdateGasUsed(gasUsed uint64) { + t.GasUsed.Set(float64(gasUsed)) +} + +func (t *Metrics) UpdateSpeedUps(speedUps int) { + t.SpeedUps.Set(float64(speedUps)) +} + +func (t *Metrics) UpdateTxQueue(txQueue int) { + t.TxQueue.Set(float64(txQueue)) +} + +func (t *Metrics) IncrementTxnCount(state string) { + t.NumTx.WithLabelValues(state).Inc() +}