Skip to content

Commit

Permalink
Generic Keystore for tx Manager (#8650)
Browse files Browse the repository at this point in the history
Co-authored-by: Prashant Yadav <[email protected]>
Merging for @simsonraj
  • Loading branch information
simsonraj authored Mar 29, 2023
1 parent 31d7757 commit dffc9f0
Show file tree
Hide file tree
Showing 17 changed files with 372 additions and 294 deletions.
12 changes: 12 additions & 0 deletions common/txmgr/types/keystore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package types

import "github.com/smartcontractkit/chainlink/v2/core/services/pg"

// KeyStore encompasses the subset of keystore used by txmgr
type KeyStore[ADDR any, ID any, S any] interface {
CheckEnabled(address ADDR, chainID ID) error
NextSequence(address ADDR, chainID ID, qopts ...pg.QOpt) (S, error)
EnabledAddressesForChain(chainId ID) ([]ADDR, error)
IncrementNextSequence(address ADDR, chainID ID, currentSequence S, qopts ...pg.QOpt) error
SubscribeToKeyChanges() (ch chan struct{}, unsub func())
}
23 changes: 11 additions & 12 deletions core/chains/evm/monitor/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -170,16 +169,16 @@ func (w *worker) Work() {
}

func (w *worker) WorkCtx(ctx context.Context) {
keys, err := w.bm.ethKeyStore.EnabledKeysForChain(w.bm.chainID)
enabledAddresses, err := w.bm.ethKeyStore.EnabledAddressesForChain(w.bm.chainID)
if err != nil {
w.bm.logger.Error("BalanceMonitor: error getting keys", err)
}

var wg sync.WaitGroup

wg.Add(len(keys))
for _, key := range keys {
go func(k ethkey.KeyV2) {
wg.Add(len(enabledAddresses))
for _, key := range enabledAddresses {
go func(k gethCommon.Address) {
defer wg.Done()
w.checkAccountBalance(ctx, k)
}(key)
Expand All @@ -190,24 +189,24 @@ func (w *worker) WorkCtx(ctx context.Context) {
// Approximately ETH block time
const ethFetchTimeout = 15 * time.Second

func (w *worker) checkAccountBalance(ctx context.Context, k ethkey.KeyV2) {
func (w *worker) checkAccountBalance(ctx context.Context, address gethCommon.Address) {
ctx, cancel := context.WithTimeout(ctx, ethFetchTimeout)
defer cancel()

bal, err := w.bm.ethClient.BalanceAt(ctx, k.Address, nil)
bal, err := w.bm.ethClient.BalanceAt(ctx, address, nil)
if err != nil {
w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s", k.Address.Hex()),
w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s", address.Hex()),
"error", err,
"address", k.Address,
"address", address,
)
} else if bal == nil {
w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s: invariant violation, bal may not be nil", k.Address.Hex()),
w.bm.logger.Errorw(fmt.Sprintf("BalanceMonitor: error getting balance for key %s: invariant violation, bal may not be nil", address.Hex()),
"error", err,
"address", k.Address,
"address", address,
)
} else {
ethBal := assets.Eth(*bal)
w.bm.updateBalance(ethBal, k.Address)
w.bm.updateBalance(ethBal, address)
}
}

Expand Down
44 changes: 20 additions & 24 deletions core/chains/evm/txmgr/eth_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/label"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
Expand Down Expand Up @@ -102,8 +101,8 @@ type EthBroadcaster struct {
ethTxInsertListener pg.Subscription
eventBroadcaster pg.EventBroadcaster

ks KeyStore
keyStates []ethkey.State
ks txmgrtypes.KeyStore[gethCommon.Address, *big.Int, int64]
addresses []gethCommon.Address

checkerFactory TransmitCheckerFactory

Expand All @@ -119,9 +118,10 @@ type EthBroadcaster struct {
}

// NewEthBroadcaster returns a new concrete EthBroadcaster
func NewEthBroadcaster(orm ORM, ethClient evmclient.Client, config Config, keystore KeyStore,
func NewEthBroadcaster(orm ORM, ethClient evmclient.Client, config Config,
keystore txmgrtypes.KeyStore[gethCommon.Address, *big.Int, int64],
eventBroadcaster pg.EventBroadcaster,
keyStates []ethkey.State, resumeCallback ResumeCallback,
addresses []gethCommon.Address, resumeCallback ResumeCallback,
txAttemptBuilder txmgrtypes.TxAttemptBuilder[*evmtypes.Head, gas.EvmFee, gethCommon.Address, gethCommon.Hash, EthTx, EthTxAttempt],
logger logger.Logger, checkerFactory TransmitCheckerFactory, autoSyncNonce bool) *EthBroadcaster {

Expand All @@ -137,7 +137,7 @@ func NewEthBroadcaster(orm ORM, ethClient evmclient.Client, config Config, keyst
config: config,
eventBroadcaster: eventBroadcaster,
ks: keystore,
keyStates: keyStates,
addresses: addresses,
checkerFactory: checkerFactory,
triggers: triggers,
chStop: make(chan struct{}),
Expand All @@ -155,10 +155,10 @@ func (eb *EthBroadcaster) Start(ctx context.Context) error {
return errors.Wrap(err, "EthBroadcaster could not start")
}

eb.wg.Add(len(eb.keyStates))
for _, k := range eb.keyStates {
eb.wg.Add(len(eb.addresses))
for _, k := range eb.addresses {
triggerCh := make(chan struct{}, 1)
eb.triggers[k.Address.Address()] = triggerCh
eb.triggers[k] = triggerCh
go eb.monitorEthTxs(k, triggerCh)
}

Expand Down Expand Up @@ -244,20 +244,20 @@ func (eb *EthBroadcaster) newResendBackoff() backoff.Backoff {
}
}

func (eb *EthBroadcaster) monitorEthTxs(k ethkey.State, triggerCh chan struct{}) {
func (eb *EthBroadcaster) monitorEthTxs(k gethCommon.Address, triggerCh chan struct{}) {
defer eb.wg.Done()

ctx, cancel := utils.ContextFromChan(eb.chStop)
defer cancel()

if eb.autoSyncNonce {
eb.logger.Debugw("Auto-syncing nonce", "address", k.Address)
eb.logger.Debugw("Auto-syncing nonce", "address", k)
eb.SyncNonce(ctx, k)
if ctx.Err() != nil {
return
}
} else {
eb.logger.Debugw("Skipping nonce auto-sync", "address", k.Address)
eb.logger.Debugw("Skipping nonce auto-sync", "address", k)
}

// errorRetryCh allows retry on exponential backoff in case of timeout or
Expand Down Expand Up @@ -306,17 +306,13 @@ func (eb *EthBroadcaster) monitorEthTxs(k ethkey.State, triggerCh chan struct{})
}

// syncNonce tries to sync the key nonce, retrying indefinitely until success
func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k ethkey.State) {
if k.Disabled {
eb.logger.Infow("Skipping nonce sync for disabled key", "address", k.Address)
return
}
func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k gethCommon.Address) {
syncer := NewNonceSyncer(eb.orm, eb.logger, eb.ethClient, eb.ks)
nonceSyncRetryBackoff := eb.newNonceSyncBackoff()
if err := syncer.Sync(ctx, k); err != nil {
// Enter retry loop with backoff
var attempt int
eb.logger.Errorw("Failed to sync with on-chain nonce", "address", k.Address, "attempt", attempt, "err", err)
eb.logger.Errorw("Failed to sync with on-chain nonce", "address", k, "attempt", attempt, "err", err)
for {
select {
case <-eb.chStop:
Expand All @@ -326,10 +322,10 @@ func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k ethkey.State) {

if err := syncer.Sync(ctx, k); err != nil {
if attempt > 5 {
eb.logger.Criticalw("Failed to sync with on-chain nonce", "address", k.Address, "attempt", attempt, "err", err)
eb.logger.Criticalw("Failed to sync with on-chain nonce", "address", k, "attempt", attempt, "err", err)
eb.SvcErrBuffer.Append(err)
} else {
eb.logger.Warnw("Failed to sync with on-chain nonce", "address", k.Address, "attempt", attempt, "err", err)
eb.logger.Warnw("Failed to sync with on-chain nonce", "address", k, "attempt", attempt, "err", err)
}
continue
}
Expand All @@ -341,8 +337,8 @@ func (eb *EthBroadcaster) SyncNonce(ctx context.Context, k ethkey.State) {

// ProcessUnstartedEthTxs picks up and handles all eth_txes in the queue
// revive:disable:error-return
func (eb *EthBroadcaster) ProcessUnstartedEthTxs(ctx context.Context, keyState ethkey.State) (err error, retryable bool) {
return eb.processUnstartedEthTxs(ctx, keyState.Address.Address())
func (eb *EthBroadcaster) ProcessUnstartedEthTxs(ctx context.Context, address gethCommon.Address) (err error, retryable bool) {
return eb.processUnstartedEthTxs(ctx, address)
}

// NOTE: This MUST NOT be run concurrently for the same address or it could
Expand Down Expand Up @@ -749,11 +745,11 @@ func (eb *EthBroadcaster) saveFatallyErroredTransaction(lgr logger.Logger, etx *
}

func (eb *EthBroadcaster) getNextNonce(address gethCommon.Address) (nonce int64, err error) {
return eb.ks.GetNextNonce(address, &eb.chainID)
return eb.ks.NextSequence(address, &eb.chainID)
}

func (eb *EthBroadcaster) incrementNextNonce(address gethCommon.Address, currentNonce int64, qopts ...pg.QOpt) error {
return eb.ks.IncrementNextNonce(address, &eb.chainID, currentNonce, qopts...)
return eb.ks.IncrementNextSequence(address, &eb.chainID, currentNonce, qopts...)
}

func observeTimeUntilBroadcast(chainID big.Int, createdAt, broadcastAt time.Time) {
Expand Down
Loading

0 comments on commit dffc9f0

Please sign in to comment.