Skip to content

Commit

Permalink
Enable optimistic txm
Browse files Browse the repository at this point in the history
  • Loading branch information
dimriou committed Feb 12, 2024
1 parent 590e48b commit 31cc03f
Show file tree
Hide file tree
Showing 13 changed files with 634 additions and 86 deletions.
2 changes: 1 addition & 1 deletion common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type TxManager[
Trigger(addr ADDR)
CreateTransaction(ctx context.Context, txRequest txmgrtypes.TxRequest[ADDR, TX_HASH]) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error)
RegisterResumeCallback(fn ResumeCallback)
//RegisterResumeCallback(fn ResumeCallback)
SendNativeToken(ctx context.Context, chainID CHAIN_ID, from, to ADDR, value big.Int, gasLimit uint32) (etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Reset(addr ADDR, abandon bool) error
// Find transactions by a field in the TxMeta blob and transaction states
Expand Down
54 changes: 54 additions & 0 deletions core/chains/evm/txmgr/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,65 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
optimistictxm "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/optimistictxm"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
)

var blockTime time.Duration = 2 * time.Second

func NewOptimisticTxm(
db *sqlx.DB,
chainConfig ChainConfig,
fCfg FeeConfig,
txConfig config.Transactions,
dbConfig DatabaseConfig,
listenerConfig ListenerConfig,
client evmclient.Client,
lggr logger.Logger,
logPoller logpoller.LogPoller,
keyStore keystore.Eth,
estimator gas.EvmFeeEstimator,
) (optimistictxm.TxManager, error) {
var fwdMgr FwdMgr

if txConfig.ForwardersEnabled() {
fwdMgr = forwarders.NewFwdMgr(db, client, logPoller, lggr, chainConfig, dbConfig)
} else {
lggr.Info("EvmForwarderManager: Disabled")
}
txAttemptBuilder := NewEvmTxAttemptBuilder(*client.ConfiguredChainID(), fCfg, keyStore, estimator)
txStore := NewTxStore(db, lggr, dbConfig)
txmCfg := NewEvmTxmConfig(chainConfig) // wrap Evm specific config
txmClient := NewEvmTxmClient(client) // wrap Evm specific client
chainID := txmClient.ConfiguredChainID()
sequenceSyncer := optimistictxm.NewSequenceSyncer(lggr, txStore, client)

bcfg := optimistictxm.BroadcasterConfig{
FallbackPollInterval: listenerConfig.FallbackPollInterval(),
MaxInFlight: txConfig.MaxInFlight(),
NonceAutoSync: chainConfig.NonceAutoSync(),
}
evmBroadcaster := optimistictxm.NewBroadcaster(txAttemptBuilder, lggr, txStore, client, bcfg,keyStore, sequenceSyncer)

rcfg := optimistictxm.ResenderConfig{
BumpAfterThreshold: time.Duration(fCfg.BumpThreshold()) * blockTime, // Polygon
MaxBumpCycles: 5,
MaxInFlight: txConfig.MaxInFlight(),
ResendInterval: blockTime,
RPCDefaultBatchSize: chainConfig.RPCDefaultBatchSize(),
}
evmResender := optimistictxm.NewResender(txAttemptBuilder, lggr, txStore, client, keyStore, rcfg)
rc := optimistictxm.ReaperConfig{
ReaperThreshold: txConfig.ReaperThreshold(),
ReaperInterval: txConfig.ReaperInterval(),
}
evmReaper := optimistictxm.NewReaper(lggr, txStore, rc, chainID, client, keyStore)
return optimistictxm.NewTxm(chainID, txmCfg, txConfig, keyStore, lggr, fwdMgr, txAttemptBuilder, txStore, sequenceSyncer, evmBroadcaster, evmResender, evmReaper), nil

}

// NewTxm constructs the necessary dependencies for the EvmTxm (broadcaster, confirmer, etc) and returns a new EvmTxManager
func NewTxm(
db *sqlx.DB,
Expand Down
25 changes: 12 additions & 13 deletions core/chains/evm/txmgr/optimistictxm/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
)

Expand All @@ -32,16 +30,16 @@ const InFlightTransactionRecheckInterval = 1 * time.Second
var ErrTxRemoved = errors.New("tx removed")

type TxAttemptBuilder interface {
NewAttempt(context.Context, txmgr.Tx, logger.Logger) (txmgr.TxAttempt, error)
NewAttempt(context.Context, Tx, logger.Logger) (TxAttempt, error)
}

type BroadcasterTxStore interface {
CountUnconfirmedTransactions(context.Context, common.Address, *big.Int) (uint32, error)
CountUnstartedTransactions(context.Context, common.Address, *big.Int) (uint32, error)
FindNextUnstartedTransactionFromAddress(context.Context, *txmgr.Tx, common.Address, *big.Int) error
BroadcasterUpdateTxUnstartedToInProgress(context.Context, *txmgr.Tx) error
BroadcasterGetTxInProgress(context.Context, common.Address) (*txmgr.Tx, error)
UpdateTxInProgressToUnconfirmed(context.Context, *txmgr.Tx) error
FindNextUnstartedTransactionFromAddress(context.Context, *Tx, common.Address, *big.Int) error
BroadcasterUpdateTxUnstartedToInProgress(context.Context, *Tx) error
BroadcasterGetTxInProgress(context.Context, common.Address) (*Tx, error)
UpdateTxInProgressToUnconfirmed(context.Context, *Tx) error
}

type BroadcasterClient interface {
Expand All @@ -65,6 +63,7 @@ type SequenceSyncer interface {
GetNextSequence(context.Context, common.Address) (evmtypes.Nonce, error)
IncrementNextSequence(common.Address)
SyncSequence(context.Context, common.Address, services.StopChan)
SyncOnChain(ctx context.Context, addr common.Address, localSequence evmtypes.Nonce) error
}

type Broadcaster struct {
Expand Down Expand Up @@ -270,10 +269,10 @@ func (b *Broadcaster) ProcessUnstartedTxs(ctx context.Context, fromAddress commo
}
}

func (b *Broadcaster) nextUnstartedTransactionWithSequence(fromAddress common.Address) (*txmgr.Tx, error) {
func (b *Broadcaster) nextUnstartedTransactionWithSequence(fromAddress common.Address) (*Tx, error) {
ctx, cancel := b.chStop.NewCtx()
defer cancel()
tx := &txmgr.Tx{}
tx := &Tx{}
if err := b.txStore.FindNextUnstartedTransactionFromAddress(ctx, tx, fromAddress, b.chainID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
Expand All @@ -287,7 +286,7 @@ func (b *Broadcaster) nextUnstartedTransactionWithSequence(fromAddress common.Ad
}
tx.Sequence = &sequence

if tx.State != commontxmgr.TxUnstarted {
if tx.State != TxUnstarted {
return nil, fmt.Errorf("invariant violation: expected transaction %v to be unstarted, it was %s", tx.ID, tx.State)
}

Expand All @@ -312,8 +311,8 @@ func (b *Broadcaster) handleAnyInProgressTx(ctx context.Context, fromAddress com
return b.handleInProgressTx(ctx, *tx)
}

func (b *Broadcaster) handleInProgressTx(ctx context.Context, tx txmgr.Tx) error {
if tx.State != commontxmgr.TxInProgress {
func (b *Broadcaster) handleInProgressTx(ctx context.Context, tx Tx) error {
if tx.State != TxInProgress {
return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", tx.ID, tx.State)
}

Expand All @@ -323,7 +322,7 @@ func (b *Broadcaster) handleInProgressTx(ctx context.Context, tx txmgr.Tx) error
}

lgr := tx.GetLogger(logger.With(b.lggr))
signedTx, err := txmgr.GetGethSignedTx(attempt.SignedRawTx)
signedTx, err := GetGethSignedTx(attempt.SignedRawTx)
if err != nil {
b.lggr.Criticalw("Fatal error signing transaction", "err", err, "tx", tx)
return fmt.Errorf("error while sending transaction %s (tx ID %d): %w", attempt.Hash.String(), tx.ID, err)
Expand Down
37 changes: 19 additions & 18 deletions core/chains/evm/txmgr/optimistictxm/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package txm
package txm_test

import (
"context"
Expand All @@ -18,6 +18,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
gasmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
txm "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/optimistictxm"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
Expand All @@ -33,7 +34,7 @@ func TestBroadcaster_Lifecycle(t *testing.T) {

cfg, db := heavyweight.FullTestDBV2(t, nil)
evmcfg := evmtest.NewChainScopedConfig(t, cfg)
bcfg := BroadcasterConfig{
bcfg := txm.BroadcasterConfig{
FallbackPollInterval: evmcfg.Database().Listener().FallbackPollInterval(),
MaxInFlight: evmcfg.EVM().Transactions().MaxInFlight(),
NonceAutoSync: false,
Expand All @@ -46,8 +47,8 @@ func TestBroadcaster_Lifecycle(t *testing.T) {
cltest.MustInsertRandomKeyReturningState(t, ks)
estimator := gasmocks.NewEvmFeeEstimator(t)
txBuilder := txmgr.NewEvmTxAttemptBuilder(*chainID, evmcfg.EVM().GasEstimator(), ks, estimator)
ss := NewSequenceSyncer(lggr, txStore, client)
b := NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, ks, ss)
ss := txm.NewSequenceSyncer(lggr, txStore, client)
b := txm.NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, ks, ss)

// Can't close an unstarted instance
err := b.Close()
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestBroadcaster_ProcessUnstartedTxs_InProgress(t *testing.T) {

cfg, db := heavyweight.FullTestDBV2(t, nil)
evmcfg := evmtest.NewChainScopedConfig(t, cfg)
bcfg := BroadcasterConfig{
bcfg := txm.BroadcasterConfig{
FallbackPollInterval: evmcfg.Database().Listener().FallbackPollInterval(),
MaxInFlight: evmcfg.EVM().Transactions().MaxInFlight(),
NonceAutoSync: false,
Expand All @@ -92,8 +93,8 @@ func TestBroadcaster_ProcessUnstartedTxs_InProgress(t *testing.T) {

estimator := gasmocks.NewEvmFeeEstimator(t)
txBuilder := txmgr.NewEvmTxAttemptBuilder(*chainID, evmcfg.EVM().GasEstimator(), keyStore, estimator)
ss := NewSequenceSyncer(lggr, txStore, client)
b := NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)
ss := txm.NewSequenceSyncer(lggr, txStore, client)
b := txm.NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)

ctx := testutils.Context(t)
encodedPayload := []byte{1, 2, 3}
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestBroadcaster_ProcessUnstartedTxs_Unstarted(t *testing.T) {

estimator := gasmocks.NewEvmFeeEstimator(t)
txBuilder := txmgr.NewEvmTxAttemptBuilder(*chainID, evmcfg.EVM().GasEstimator(), keyStore, estimator)
ss := NewSequenceSyncer(lggr, txStore, client)
ss := txm.NewSequenceSyncer(lggr, txStore, client)

ctx := testutils.Context(t)
encodedPayload := []byte{1, 2, 3}
Expand All @@ -195,18 +196,18 @@ func TestBroadcaster_ProcessUnstartedTxs_Unstarted(t *testing.T) {
timeNow := time.Now()
nonce := evmtypes.Nonce(0)
t.Run("skips check if MaxInFlight is 0", func(t *testing.T) {
bcfg := BroadcasterConfig{
bcfg := txm.BroadcasterConfig{
FallbackPollInterval: evmcfg.Database().Listener().FallbackPollInterval(),
MaxInFlight: 0,
NonceAutoSync: false,
}
b := NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)
b := txm.NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)
err := b.ProcessUnstartedTxs(ctx, utils.RandomAddress())
require.NoError(t, err)
})

t.Run("picks up a new unstarted tx if in flight txs are less than threshold", func(t *testing.T) {
bcfg := BroadcasterConfig{
bcfg := txm.BroadcasterConfig{
FallbackPollInterval: evmcfg.Database().Listener().FallbackPollInterval(),
MaxInFlight: 3,
NonceAutoSync: false,
Expand Down Expand Up @@ -244,7 +245,7 @@ func TestBroadcaster_ProcessUnstartedTxs_Unstarted(t *testing.T) {
estimator.On("GetFee", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(gas.EvmFee{Legacy: assets.GWei(32)}, uint32(500), nil).Once()
client.On("SendTransaction", mock.Anything, mock.Anything).Return(nil).Once()

b := NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)
b := txm.NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)
require.NoError(t, b.ProcessUnstartedTxs(ctx, addr1))
// Nonce should have been incremented after successful broadcast
seq, err = ss.GetNextSequence(ctx, addr1)
Expand All @@ -253,7 +254,7 @@ func TestBroadcaster_ProcessUnstartedTxs_Unstarted(t *testing.T) {
})

t.Run("picks up a new unstarted tx and returns error if tx fails", func(t *testing.T) {
bcfg := BroadcasterConfig{
bcfg := txm.BroadcasterConfig{
FallbackPollInterval: evmcfg.Database().Listener().FallbackPollInterval(),
MaxInFlight: evmcfg.EVM().Transactions().MaxInFlight(),
NonceAutoSync: false,
Expand All @@ -270,7 +271,7 @@ func TestBroadcaster_ProcessUnstartedTxs_Unstarted(t *testing.T) {

db := pgtest.NewSqlxDB(t)
txStore := cltest.NewTestTxStore(t, db, cfg.Database())
ss := NewSequenceSyncer(lggr, txStore, client)
ss := txm.NewSequenceSyncer(lggr, txStore, client)
require.NoError(t, txStore.InsertTx(&txUnstarted))

client.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), nil).Once()
Expand All @@ -283,12 +284,12 @@ func TestBroadcaster_ProcessUnstartedTxs_Unstarted(t *testing.T) {
client.On("SendTransaction", mock.Anything, mock.Anything).Return(errors.New("RPC error")).Once()
client.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), nil).Once()

b := NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)
b := txm.NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)
require.Error(t, b.ProcessUnstartedTxs(ctx, addr1))
})

t.Run("marks unstarted tx unconfirmed if tx fails but on-chain pending nonce increases", func(t *testing.T) {
bcfg := BroadcasterConfig{
bcfg := txm.BroadcasterConfig{
FallbackPollInterval: evmcfg.Database().Listener().FallbackPollInterval(),
MaxInFlight: evmcfg.EVM().Transactions().MaxInFlight(),
NonceAutoSync: false,
Expand All @@ -305,7 +306,7 @@ func TestBroadcaster_ProcessUnstartedTxs_Unstarted(t *testing.T) {

db := pgtest.NewSqlxDB(t)
txStore := cltest.NewTestTxStore(t, db, cfg.Database())
ss := NewSequenceSyncer(lggr, txStore, client)
ss := txm.NewSequenceSyncer(lggr, txStore, client)
require.NoError(t, txStore.InsertTx(&txUnstarted))

client.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), nil).Once()
Expand All @@ -318,7 +319,7 @@ func TestBroadcaster_ProcessUnstartedTxs_Unstarted(t *testing.T) {
client.On("SendTransaction", mock.Anything, mock.Anything).Return(errors.New("RPC error")).Once()
client.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(1), nil).Once()

b := NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)
b := txm.NewBroadcaster(txBuilder, lggr, txStore, client, bcfg, keyStore, ss)
require.NoError(t, b.ProcessUnstartedTxs(ctx, addr1))
})
}
29 changes: 29 additions & 0 deletions core/chains/evm/txmgr/optimistictxm/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package txm

import (
"bytes"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"

txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
)

const (
TxUnstarted = txmgrtypes.TxState("unstarted")
TxInProgress = txmgrtypes.TxState("in_progress")
TxFatalError = txmgrtypes.TxState("fatal_error")
TxUnconfirmed = txmgrtypes.TxState("unconfirmed")
TxConfirmed = txmgrtypes.TxState("confirmed")
TxConfirmedMissingReceipt = txmgrtypes.TxState("confirmed_missing_receipt")
)

// GetGethSignedTx decodes the SignedRawTx into a types.Transaction struct
func GetGethSignedTx(signedRawTx []byte) (*types.Transaction, error) {
s := rlp.NewStream(bytes.NewReader(signedRawTx), 0)
signedTx := new(types.Transaction)
if err := signedTx.DecodeRLP(s); err != nil {
return nil, err
}
return signedTx, nil
}
Loading

0 comments on commit 31cc03f

Please sign in to comment.