From b8084cb357582fdf43f64b1e08c38e1b457ba4ad Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Fri, 1 Dec 2023 14:27:44 -0600 Subject: [PATCH] common/txmgr: initialize map before goroutines race ahead (#11452) * common/txmgr: initiliaze map before goroutines race ahead * pass ctx from Start() --- common/txmgr/broadcaster.go | 18 +++++++----------- common/txmgr/test_helpers.go | 4 ++-- common/txmgr/txmgr.go | 6 ++++-- core/chains/evm/txmgr/broadcaster_test.go | 4 ++-- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index ab620c51be3..54ae653f662 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -194,14 +194,14 @@ func NewBroadcaster[ // Start starts Broadcaster service. // The provided context can be used to terminate Start sequence. -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start(_ context.Context) error { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Start(ctx context.Context) error { return eb.StartOnce("Broadcaster", func() (err error) { - return eb.startInternal() + return eb.startInternal(ctx) }) } // startInternal can be called multiple times, in conjunction with closeInternal. The TxMgr uses this functionality to reset broadcaster multiple times in its own lifetime. -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) startInternal() error { +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) startInternal(ctx context.Context) error { eb.initSync.Lock() defer eb.initSync.Unlock() if eb.isStarted { @@ -222,16 +222,15 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star eb.wg = sync.WaitGroup{} eb.wg.Add(len(eb.enabledAddresses)) eb.triggers = make(map[ADDR]chan struct{}) + eb.sequenceLock.Lock() + eb.nextSequenceMap = eb.loadNextSequenceMap(ctx, eb.enabledAddresses) + eb.sequenceLock.Unlock() for _, addr := range eb.enabledAddresses { triggerCh := make(chan struct{}, 1) eb.triggers[addr] = triggerCh go eb.monitorTxs(addr, triggerCh) } - eb.sequenceLock.Lock() - defer eb.sequenceLock.Unlock() - eb.nextSequenceMap = eb.loadNextSequenceMap(eb.enabledAddresses) - eb.isStarted = true return nil } @@ -286,10 +285,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig } // Load the next sequence map using the tx table or on-chain (if not found in tx table) -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) map[ADDR]SEQ { - ctx, cancel := eb.chStop.NewCtx() - defer cancel() - +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(ctx context.Context, addresses []ADDR) map[ADDR]SEQ { nextSequenceMap := make(map[ADDR]SEQ) for _, address := range addresses { seq, err := eb.getSequenceForAddr(ctx, address) diff --git a/common/txmgr/test_helpers.go b/common/txmgr/test_helpers.go index 0f128a23af4..6c0c5680ea7 100644 --- a/common/txmgr/test_helpers.go +++ b/common/txmgr/test_helpers.go @@ -22,8 +22,8 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXDeliverB tr.mb.Deliver(blockHeight) } -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestStartInternal() error { - return eb.startInternal() +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestStartInternal(ctx context.Context) error { + return eb.startInternal(ctx) } func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXTestCloseInternal() error { diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 3fbdb852f8b..228ab4ec8bf 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -339,18 +339,20 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() wg.Add(2) go func() { defer wg.Done() + ctx, cancel := b.chStop.NewCtx() + defer cancel() // Retry indefinitely on failure backoff := utils.NewRedialBackoff() for { select { case <-time.After(backoff.Duration()): - if err := b.broadcaster.startInternal(); err != nil { + if err := b.broadcaster.startInternal(ctx); err != nil { logger.Criticalw(b.logger, "Failed to start Broadcaster", "err", err) b.SvcErrBuffer.Append(err) continue } return - case <-b.chStop: + case <-ctx.Done(): stopOnce.Do(func() { stopped = true }) return } diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 26e78344b93..c6e05b0954b 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -124,9 +124,9 @@ func TestEthBroadcaster_Lifecycle(t *testing.T) { require.Error(t, eb.XXXTestCloseInternal()) // Can successfully startInternal a previously closed instance - require.NoError(t, eb.XXXTestStartInternal()) + require.NoError(t, eb.XXXTestStartInternal(ctx)) // Can't startInternal already started instance - require.Error(t, eb.XXXTestStartInternal()) + require.Error(t, eb.XXXTestStartInternal(ctx)) // Can successfully closeInternal again require.NoError(t, eb.XXXTestCloseInternal()) }