Skip to content

Commit e376873

Browse files
committed
refactor broadcast
1 parent 8256c5d commit e376873

File tree

1 file changed

+18
-36
lines changed

1 file changed

+18
-36
lines changed

common/client/multi_node.go

+18-36
Original file line numberDiff line numberDiff line change
@@ -561,36 +561,16 @@ type sendTxResult struct {
561561
ResultCode SendTxReturnCode
562562
}
563563

564-
// broadcastTxAsync - creates a goroutine that sends transaction to the node. Returns false, if MultiNode is Stopped
565564
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) broadcastTxAsync(ctx context.Context,
566-
n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX, txResults chan sendTxResult, wg *sync.WaitGroup) {
567-
defer wg.Done()
568-
565+
n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX) sendTxResult {
569566
txErr := n.RPC().SendTransaction(ctx, tx)
570567
c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
571568
resultCode := c.classifySendTxError(tx, txErr)
572569
if resultCode != Successful && resultCode != TransactionAlreadyKnown {
573570
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
574571
}
575572

576-
// we expected txResults to have sufficient buffer, otherwise we are not interested in the response
577-
// and can drop it
578-
select {
579-
case txResults <- sendTxResult{Err: txErr, ResultCode: resultCode}:
580-
default:
581-
}
582-
}
583-
584-
func fanOut[T any](source chan T, destinations ...chan T) {
585-
for t := range source {
586-
for _, dest := range destinations {
587-
dest <- t
588-
}
589-
}
590-
591-
for _, dest := range destinations {
592-
close(dest)
593-
}
573+
return sendTxResult{Err: txErr, ResultCode: resultCode}
594574
}
595575

596576
// collectTxResults - refer to SendTransaction comment for implementation details,
@@ -710,29 +690,31 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
710690
c.wg.Add(len(c.sendonlys))
711691
// fire-n-forget, as sendOnlyNodes can not be trusted with result reporting
712692
for _, n := range c.sendonlys {
713-
go c.broadcastTxAsync(ctx, n, tx, nil, &c.wg)
693+
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) {
694+
defer c.wg.Done()
695+
c.broadcastTxAsync(ctx, n, tx)
696+
}(n)
714697
}
715698

716-
// signal when all the primary nodes done broadcasting tx
717-
inTxResults := make(chan sendTxResult, len(c.nodes))
718699
var primaryBroadcastWg sync.WaitGroup
719700
primaryBroadcastWg.Add(len(c.nodes))
720-
c.wg.Add(1)
721-
go func() {
722-
// wait for primary nodes to finish the broadcast before closing the channel
723-
primaryBroadcastWg.Wait()
724-
close(inTxResults)
725-
c.wg.Done()
726-
}()
727-
701+
txResultsToReport := make(chan sendTxResult, len(c.nodes))
728702
for _, n := range c.nodes {
729-
go c.broadcastTxAsync(ctx, n, tx, inTxResults, &primaryBroadcastWg)
703+
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) {
704+
defer primaryBroadcastWg.Done()
705+
result := c.broadcastTxAsync(ctx, n, tx)
706+
// both channels are sufficiently buffered, so we won't be locked
707+
txResultsToReport <- result
708+
txResults <- result
709+
}(n)
730710
}
731711

732-
txResultsToReport := make(chan sendTxResult, len(c.nodes))
733712
c.wg.Add(1)
734713
go func() {
735-
fanOut(inTxResults, txResultsToReport, txResults)
714+
// wait for primary nodes to finish the broadcast before closing the channel
715+
primaryBroadcastWg.Wait()
716+
close(txResultsToReport)
717+
close(txResults)
736718
c.wg.Done()
737719
}()
738720

0 commit comments

Comments
 (0)