Skip to content

Commit

Permalink
common/txmgr: initialize map before goroutines race ahead (#11452)
Browse files Browse the repository at this point in the history
* common/txmgr: initiliaze map before goroutines race ahead

* pass ctx from Start()
  • Loading branch information
jmank88 authored Dec 1, 2023
1 parent 8ffd084 commit b8084cb
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
18 changes: 7 additions & 11 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ func NewBroadcaster[

// Start starts Broadcaster service.
// The provided context can be used to terminate Start sequence.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start(_ context.Context) error {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start(ctx context.Context) error {
return eb.StartOnce("Broadcaster", func() (err error) {
return eb.startInternal()
return eb.startInternal(ctx)
})
}

// startInternal can be called multiple times, in conjunction with closeInternal. The TxMgr uses this functionality to reset broadcaster multiple times in its own lifetime.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) startInternal() error {
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) startInternal(ctx context.Context) error {
eb.initSync.Lock()
defer eb.initSync.Unlock()
if eb.isStarted {
Expand All @@ -222,16 +222,15 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
eb.wg = sync.WaitGroup{}
eb.wg.Add(len(eb.enabledAddresses))
eb.triggers = make(map[ADDR]chan struct{})
eb.sequenceLock.Lock()
eb.nextSequenceMap = eb.loadNextSequenceMap(ctx, eb.enabledAddresses)
eb.sequenceLock.Unlock()
for _, addr := range eb.enabledAddresses {
triggerCh := make(chan struct{}, 1)
eb.triggers[addr] = triggerCh
go eb.monitorTxs(addr, triggerCh)
}

eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap = eb.loadNextSequenceMap(eb.enabledAddresses)

eb.isStarted = true
return nil
}
Expand Down Expand Up @@ -286,10 +285,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) map[ADDR]SEQ {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(ctx context.Context, addresses []ADDR) map[ADDR]SEQ {
nextSequenceMap := make(map[ADDR]SEQ)
for _, address := range addresses {
seq, err := eb.getSequenceForAddr(ctx, address)
Expand Down
4 changes: 2 additions & 2 deletions common/txmgr/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXDeliverB
tr.mb.Deliver(blockHeight)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestStartInternal() error {
return eb.startInternal()
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestStartInternal(ctx context.Context) error {
return eb.startInternal(ctx)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestCloseInternal() error {
Expand Down
6 changes: 4 additions & 2 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,18 +339,20 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
wg.Add(2)
go func() {
defer wg.Done()
ctx, cancel := b.chStop.NewCtx()
defer cancel()
// Retry indefinitely on failure
backoff := utils.NewRedialBackoff()
for {
select {
case <-time.After(backoff.Duration()):
if err := b.broadcaster.startInternal(); err != nil {
if err := b.broadcaster.startInternal(ctx); err != nil {
logger.Criticalw(b.logger, "Failed to start Broadcaster", "err", err)
b.SvcErrBuffer.Append(err)
continue
}
return
case <-b.chStop:
case <-ctx.Done():
stopOnce.Do(func() { stopped = true })
return
}
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func TestEthBroadcaster_Lifecycle(t *testing.T) {
require.Error(t, eb.XXXTestCloseInternal())

// Can successfully startInternal a previously closed instance
require.NoError(t, eb.XXXTestStartInternal())
require.NoError(t, eb.XXXTestStartInternal(ctx))
// Can't startInternal already started instance
require.Error(t, eb.XXXTestStartInternal())
require.Error(t, eb.XXXTestStartInternal(ctx))
// Can successfully closeInternal again
require.NoError(t, eb.XXXTestCloseInternal())
}
Expand Down

0 comments on commit b8084cb

Please sign in to comment.