Skip to content

Commit

Permalink
Update loading next sequence map to avoid startup failure (#11307)
Browse files Browse the repository at this point in the history
* Updated loading next sequence map to avoid startup failure

* Moved logic to populate next sequence map during runtime

* Added changelog

* Addressed feedback

* Addressed feedback
  • Loading branch information
amit-momin authored Nov 16, 2023
1 parent 6fd7be8 commit 01fbf8e
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 82 deletions.
77 changes: 50 additions & 27 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -228,10 +229,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star

eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap, err = eb.loadNextSequenceMap(eb.enabledAddresses)
if err != nil {
return errors.Wrap(err, "Broadcaster: failed to load next sequence map")
}
eb.nextSequenceMap = eb.loadNextSequenceMap(eb.enabledAddresses)

eb.isStarted = true
return nil
Expand Down Expand Up @@ -287,30 +285,38 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) (map[ADDR]SEQ, error) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) map[ADDR]SEQ {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()

nextSequenceMap := make(map[ADDR]SEQ)
for _, address := range addresses {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err := eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err != nil {
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err != nil {
return nil, errors.New("failed to retrieve next sequence from on-chain causing failure to load next sequence map on broadcaster startup")
}

seq, err := eb.getSequenceForAddr(ctx, address)
if err == nil {
nextSequenceMap[address] = seq
} else {
nextSequenceMap[address] = eb.generateNextSequence(seq)
}
}

return nextSequenceMap, nil
return nextSequenceMap
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getSequenceForAddr(ctx context.Context, address ADDR) (seq SEQ, err error) {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err = eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err == nil {
seq = eb.generateNextSequence(seq)
return seq, nil
}
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err == nil {
return seq, nil
}
eb.logger.Criticalw("failed to retrieve next sequence from on-chain for address: ", "address", address.String())
return seq, err

}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newSequenceSyncBackoff() backoff.Backoff {
Expand Down Expand Up @@ -393,7 +399,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni
// syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR) {
sequenceSyncRetryBackoff := eb.newSequenceSyncBackoff()
localSequence, err := eb.GetNextSequence(addr)
localSequence, err := eb.GetNextSequence(ctx, addr)
// Address not found in map so skip sync
if err != nil {
eb.logger.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err)
Expand Down Expand Up @@ -607,7 +613,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
Expand Down Expand Up @@ -665,7 +671,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand

// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
Expand Down Expand Up @@ -702,7 +708,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return nil, errors.Wrap(err, "findNextUnstartedTransactionFromAddress failed")
}

sequence, err := eb.GetNextSequence(etx.FromAddress)
sequence, err := eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -792,15 +798,32 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
}

// Used to get the next usable sequence for a transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(address ADDR) (seq SEQ, err error) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(ctx context.Context, address ADDR) (seq SEQ, err error) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
// Get next sequence from map
seq, exists := eb.nextSequenceMap[address]
if !exists {
return seq, errors.New(fmt.Sprint("address not found in next sequence map: ", address))
if exists {
return seq, nil
}

eb.logger.Infow("address not found in local next sequence map. Attempting to search and populate sequence.", "address", address.String())
// Check if address is in the enabled address list
if !slices.Contains(eb.enabledAddresses, address) {
return seq, fmt.Errorf("address disabled: %s", address)
}
return seq, nil

// Try to retrieve next sequence from tx table or on-chain to load the map
// A scenario could exist where loading the map during startup failed (e.g. All configured RPC's are unreachable at start)
// The expectation is that the node does not fail startup so sequences need to be loaded during runtime
foundSeq, err := eb.getSequenceForAddr(ctx, address)
if err != nil {
return seq, fmt.Errorf("failed to find next sequence for address: %s", address)
}

// Set sequence in map
eb.nextSequenceMap[address] = foundSeq
return foundSeq, nil
}

// Used to increment the sequence in the mapping to have the next usable one available for the next transaction
Expand Down
Loading

0 comments on commit 01fbf8e

Please sign in to comment.