Skip to content

Commit

Permalink
Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
dimriou committed Dec 13, 2024
1 parent cb28335 commit c47781a
Show file tree
Hide file tree
Showing 16 changed files with 446 additions and 165 deletions.
28 changes: 28 additions & 0 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"slices"
"strconv"
"time"

"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/pelletier/go-toml/v2"
Expand Down Expand Up @@ -451,6 +452,20 @@ func (c *Chain) ValidateConfig() (err error) {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "GasEstimator.BumpThreshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if Transactions.AutoPurge.MinAttempts is set for %s", chainType)})
}
}
case chaintype.ChainDualBroadcast:
if c.Transactions.AutoPurge.DetectionApiUrl == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.DetectionApiUrl", Msg: fmt.Sprintf("must be set for %s", chainType)})
}
if c.Transactions.AutoPurge.Threshold == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "Transactions.AutoPurge.Threshold", Msg: fmt.Sprintf("needs to be set if auto-purge feature is enabled for %s", chainType)})
} else if *c.Transactions.AutoPurge.Threshold == 0 {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "Transactions.AutoPurge.Threshold", Value: 0, Msg: fmt.Sprintf("cannot be 0 if auto-purge feature is enabled for %s", chainType)})
}
if c.TxmV2.Enabled != nil && *c.TxmV2.Enabled {
if c.TxmV2.CustomURL == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.CustomURL", Msg: fmt.Sprintf("must be set for %s", chainType)})
}
}
default:
// Bump Threshold is required because the stuck tx heuristic relies on a minimum number of bump attempts to exist
if c.GasEstimator.BumpThreshold == nil {
Expand Down Expand Up @@ -494,6 +509,19 @@ func (t *TxmV2) setFrom(f *TxmV2) {
}
}

func (t *TxmV2) ValidateConfig() (err error) {
if t.Enabled != nil && *t.Enabled {
if t.BlockTime == nil {
err = multierr.Append(err, commonconfig.ErrMissing{Name: "TxmV2.BlockTime", Msg: "must be set if txmv2 feature is enabled"})
return
}
if t.BlockTime.Duration() < 2*time.Second {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "TxmV2.BlockTime", Msg: "must be equal to or greater than 2 seconds"})
}
}
return
}

type Transactions struct {
ForwardersEnabled *bool
MaxInFlight *uint32
Expand Down
15 changes: 8 additions & 7 deletions core/chains/evm/txm/attempt_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ type AttemptBuilderKeystore interface {
}

type attemptBuilder struct {
chainID *big.Int
priceMax *assets.Wei
gas.EvmFeeEstimator
keystore AttemptBuilderKeystore
chainID *big.Int
priceMaxKey func(common.Address) *assets.Wei
keystore AttemptBuilderKeystore
}

func NewAttemptBuilder(chainID *big.Int, priceMax *assets.Wei, estimator gas.EvmFeeEstimator, keystore AttemptBuilderKeystore) *attemptBuilder {
func NewAttemptBuilder(chainID *big.Int, priceMaxKey func(common.Address) *assets.Wei, estimator gas.EvmFeeEstimator, keystore AttemptBuilderKeystore) *attemptBuilder {
return &attemptBuilder{
chainID: chainID,
priceMax: priceMax,
priceMaxKey: priceMaxKey,
EvmFeeEstimator: estimator,
keystore: keystore,
}
}

func (a *attemptBuilder) NewAttempt(ctx context.Context, lggr logger.Logger, tx *types.Transaction, dynamic bool) (*types.Attempt, error) {
fee, estimatedGasLimit, err := a.EvmFeeEstimator.GetFee(ctx, tx.Data, tx.SpecifiedGasLimit, a.priceMax, &tx.FromAddress, &tx.ToAddress)
fee, estimatedGasLimit, err := a.EvmFeeEstimator.GetFee(ctx, tx.Data, tx.SpecifiedGasLimit, a.priceMaxKey(tx.FromAddress), &tx.FromAddress, &tx.ToAddress)
if err != nil {
return nil, err
}
Expand All @@ -48,7 +48,7 @@ func (a *attemptBuilder) NewAttempt(ctx context.Context, lggr logger.Logger, tx
}

func (a *attemptBuilder) NewBumpAttempt(ctx context.Context, lggr logger.Logger, tx *types.Transaction, previousAttempt types.Attempt) (*types.Attempt, error) {
bumpedFee, bumpedFeeLimit, err := a.EvmFeeEstimator.BumpFee(ctx, previousAttempt.Fee, tx.SpecifiedGasLimit, a.priceMax, nil)
bumpedFee, bumpedFeeLimit, err := a.EvmFeeEstimator.BumpFee(ctx, previousAttempt.Fee, tx.SpecifiedGasLimit, a.priceMaxKey(tx.FromAddress), nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,6 +114,7 @@ func (a *attemptBuilder) newLegacyAttempt(ctx context.Context, tx *types.Transac
Fee: gas.EvmFee{GasPrice: gasPrice},
Hash: signedTx.Hash(),
GasLimit: estimatedGasLimit,
Type: evmtypes.LegacyTxType,
SignedTransaction: signedTx,
}

Expand Down
93 changes: 93 additions & 0 deletions core/chains/evm/txm/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package txm

import (
"context"
"fmt"
"math/big"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"
)

var (
promNumBroadcastedTxs = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "txm_num_broadcasted_transactions",
Help: "Total number of successful broadcasted transactions.",
}, []string{"chainID"})
promNumConfirmedTxs = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "txm_num_confirmed_transactions",
Help: "Total number of confirmed transactions. Note that this can happen multiple times per transaction in the case of re-orgs or when filling the nonce for untracked transactions.",
}, []string{"chainID"})
promNumNonceGaps = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "txm_num_nonce_gaps",
Help: "Total number of nonce gaps created that the transaction manager had to fill.",
}, []string{"chainID"})
promTimeUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "txm_time_until_tx_confirmed",
Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.",
}, []string{"chainID"})
)

type txmMetrics struct {
metrics.Labeler
chainID *big.Int
numBroadcastedTxs metric.Int64Counter
numConfirmedTxs metric.Int64Counter
numNonceGaps metric.Int64Counter
timeUntilTxConfirmed metric.Float64Histogram
}

func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) {
numBroadcastedTxs, err := beholder.GetMeter().Int64Counter("txm_num_broadcasted_transactions")
if err != nil {
return nil, fmt.Errorf("failed to register broadcasted txs number: %w", err)
}

numConfirmedTxs, err := beholder.GetMeter().Int64Counter("txm_num_confirmed_transactions")
if err != nil {
return nil, fmt.Errorf("failed to register confirmed txs number: %w", err)
}

numNonceGaps, err := beholder.GetMeter().Int64Counter("txm_num_nonce_gaps")
if err != nil {
return nil, fmt.Errorf("failed to register nonce gaps number: %w", err)
}

timeUntilTxConfirmed, err := beholder.GetMeter().Float64Histogram("txm_time_until_tx_confirmed")
if err != nil {
return nil, fmt.Errorf("failed to register time until tx confirmed: %w", err)
}

return &txmMetrics{
chainID: chainID,
Labeler: metrics.NewLabeler().With("chainID", chainID.String()),
numBroadcastedTxs: numBroadcastedTxs,
numConfirmedTxs: numConfirmedTxs,
numNonceGaps: numNonceGaps,
timeUntilTxConfirmed: timeUntilTxConfirmed,
}, nil
}

func (m *txmMetrics) IncrementNumBroadcastedTxs(ctx context.Context) {
promNumBroadcastedTxs.WithLabelValues(m.chainID.String()).Add(float64(1))
m.numBroadcastedTxs.Add(ctx, 1)
}

func (m *txmMetrics) IncrementNumConfirmedTxs(ctx context.Context, confirmedTransactions int) {
promNumConfirmedTxs.WithLabelValues(m.chainID.String()).Add(float64(confirmedTransactions))
m.numConfirmedTxs.Add(ctx, int64(confirmedTransactions))
}

func (m *txmMetrics) IncrementNumNonceGaps(ctx context.Context) {
promNumNonceGaps.WithLabelValues(m.chainID.String()).Add(float64(1))
m.numNonceGaps.Add(ctx, 1)
}

func (m *txmMetrics) RecordTimeUntilTxConfirmed(ctx context.Context, duration float64) {
promTimeUntilTxConfirmed.WithLabelValues(m.chainID.String()).Observe(duration)
m.timeUntilTxConfirmed.Record(ctx, duration)
}
23 changes: 15 additions & 8 deletions core/chains/evm/txm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (
type OrchestratorTxStore interface {
Add(addresses ...common.Address) error
FetchUnconfirmedTransactionAtNonceWithCount(context.Context, uint64, common.Address) (*txmtypes.Transaction, int, error)
FindTxWithIdempotencyKey(context.Context, *string) (*txmtypes.Transaction, error)
FindTxWithIdempotencyKey(context.Context, string) (*txmtypes.Transaction, error)
}

type OrchestratorKeystore interface {
CheckEnabled(ctx context.Context, address common.Address, chainID *big.Int) error
EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error)
}

Expand Down Expand Up @@ -120,15 +121,15 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) Close() (merr error) {
merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop ForwarderManager: %w", err))
}
}
if err := o.txm.Close(); err != nil {
merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop Txm: %w", err))
}
if err := o.attemptBuilder.Close(); err != nil {
// TODO: hacky fix for DualBroadcast
if !strings.Contains(err.Error(), "already been stopped") {
merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop AttemptBuilder: %w", err))
}
}
if err := o.txm.Close(); err != nil {
merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop Txm: %w", err))
}
return merr
})
}
Expand Down Expand Up @@ -172,14 +173,20 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) OnNewLongestChain(ctx context.Context,

func (o *Orchestrator[BLOCK_HASH, HEAD]) CreateTransaction(ctx context.Context, request txmgrtypes.TxRequest[common.Address, common.Hash]) (tx txmgrtypes.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], err error) {
var wrappedTx *txmtypes.Transaction
wrappedTx, err = o.txStore.FindTxWithIdempotencyKey(ctx, request.IdempotencyKey)
if err != nil {
return
if request.IdempotencyKey != nil {
wrappedTx, err = o.txStore.FindTxWithIdempotencyKey(ctx, *request.IdempotencyKey)
if err != nil {
return
}
}

if wrappedTx != nil {
o.lggr.Infof("Found Tx with IdempotencyKey: %v. Returning existing Tx without creating a new one.", *wrappedTx.IdempotencyKey)
} else {
if kErr := o.keystore.CheckEnabled(ctx, request.FromAddress, o.chainID); kErr != nil {
return tx, fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", request.FromAddress, o.chainID.String(), kErr)
}

var pipelineTaskRunID uuid.NullUUID
if request.PipelineTaskRunID != nil {
pipelineTaskRunID.UUID = *request.PipelineTaskRunID
Expand Down Expand Up @@ -324,7 +331,7 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) GetForwarderForEOAOCR2Feeds(ctx context

func (o *Orchestrator[BLOCK_HASH, HEAD]) GetTransactionStatus(ctx context.Context, transactionID string) (status commontypes.TransactionStatus, err error) {
// Loads attempts and receipts in the transaction
tx, err := o.txStore.FindTxWithIdempotencyKey(ctx, &transactionID)
tx, err := o.txStore.FindTxWithIdempotencyKey(ctx, transactionID)
if err != nil || tx == nil {
return status, fmt.Errorf("failed to find transaction with IdempotencyKey %s: %w", transactionID, err)
}
Expand Down
Loading

0 comments on commit c47781a

Please sign in to comment.