Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automation custom 2 7 1 nov27 #11390

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 50 additions & 27 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -243,10 +244,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star

eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap, err = eb.loadNextSequenceMap(eb.enabledAddresses)
if err != nil {
return errors.Wrap(err, "Broadcaster: failed to load next sequence map")
}
eb.nextSequenceMap = eb.loadNextSequenceMap(eb.enabledAddresses)

eb.isStarted = true
return nil
Expand Down Expand Up @@ -326,30 +324,38 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) txIn
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) (map[ADDR]SEQ, error) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) map[ADDR]SEQ {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()

nextSequenceMap := make(map[ADDR]SEQ)
for _, address := range addresses {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err := eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err != nil {
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err != nil {
return nil, errors.New("failed to retrieve next sequence from on-chain causing failure to load next sequence map on broadcaster startup")
}

seq, err := eb.getSequenceForAddr(ctx, address)
if err == nil {
nextSequenceMap[address] = seq
} else {
nextSequenceMap[address] = eb.generateNextSequence(seq)
}
}

return nextSequenceMap, nil
return nextSequenceMap
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getSequenceForAddr(ctx context.Context, address ADDR) (seq SEQ, err error) {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err = eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err == nil {
seq = eb.generateNextSequence(seq)
return seq, nil
}
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err == nil {
return seq, nil
}
eb.logger.Criticalw("failed to retrieve next sequence from on-chain for address: ", "address", address.String())
return seq, err

}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newSequenceSyncBackoff() backoff.Backoff {
Expand Down Expand Up @@ -432,7 +438,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni
// syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR) {
sequenceSyncRetryBackoff := eb.newSequenceSyncBackoff()
localSequence, err := eb.GetNextSequence(addr)
localSequence, err := eb.GetNextSequence(ctx, addr)
// Address not found in map so skip sync
if err != nil {
eb.logger.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err)
Expand Down Expand Up @@ -646,7 +652,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
Expand Down Expand Up @@ -704,7 +710,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand

// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(etx.FromAddress)
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
Expand Down Expand Up @@ -741,7 +747,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return nil, errors.Wrap(err, "findNextUnstartedTransactionFromAddress failed")
}

sequence, err := eb.GetNextSequence(etx.FromAddress)
sequence, err := eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -826,15 +832,32 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
}

// Used to get the next usable sequence for a transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(address ADDR) (seq SEQ, err error) {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(ctx context.Context, address ADDR) (seq SEQ, err error) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
// Get next sequence from map
seq, exists := eb.nextSequenceMap[address]
if !exists {
return seq, errors.New(fmt.Sprint("address not found in next sequence map: ", address))
if exists {
return seq, nil
}

eb.logger.Infow("address not found in local next sequence map. Attempting to search and populate sequence.", "address", address.String())
// Check if address is in the enabled address list
if !slices.Contains(eb.enabledAddresses, address) {
return seq, fmt.Errorf("address disabled: %s", address)
}
return seq, nil

// Try to retrieve next sequence from tx table or on-chain to load the map
// A scenario could exist where loading the map during startup failed (e.g. All configured RPC's are unreachable at start)
// The expectation is that the node does not fail startup so sequences need to be loaded during runtime
foundSeq, err := eb.getSequenceForAddr(ctx, address)
if err != nil {
return seq, fmt.Errorf("failed to find next sequence for address: %s", address)
}

// Set sequence in map
eb.nextSequenceMap[address] = foundSeq
return foundSeq, nil
}

// Used to increment the sequence in the mapping to have the next usable one available for the next transaction
Expand Down
Loading
Loading