From b1d6bb3fe66a50048c30f6d953a4e6d331ece40b Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Fri, 1 Nov 2024 18:24:41 +0100 Subject: [PATCH] Batcher for txpool gossip --- txpool/txpool.go | 108 +++++++++++++++++++++++++++++++++-------- types/rlp_marshal.go | 14 ++++++ types/rlp_unmarshal.go | 18 +++++++ types/transaction.go | 2 + 4 files changed, 122 insertions(+), 20 deletions(-) diff --git a/txpool/txpool.go b/txpool/txpool.go index adf9ccb79f..8fe23370d6 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math/big" + "sync" "sync/atomic" "time" @@ -191,6 +192,9 @@ type TxPool struct { localPeerID peer.ID } +var gossipCh chan *types.Transaction +var gossipWG sync.WaitGroup + // NewTxPool returns a new pool for processing incoming transactions. func NewTxPool( logger hclog.Logger, @@ -247,6 +251,72 @@ func (p *TxPool) updatePending(i int64) { metrics.SetGauge([]string{txPoolMetrics, "pending_transactions"}, float32(newPending)) } +func (p *TxPool) startGossipBatchers(batchersNum, batchSize int) { + // 1 channel for all batchers, give it enough space for bulk requests + gossipCh = make(chan *types.Transaction, 4*batchersNum*batchSize) + + for i := 0; i < batchersNum; i++ { + gossipWG.Add(1) + + go p.gossipBatcher(batchSize) + } +} + +func stopGossipBatchers() { + gossipWG.Wait() +} + +func (p *TxPool) gossipBatcher(batchSize int) { + var timer *time.Timer + + defer gossipWG.Done() + batch := make([]*types.Transaction, 0, batchSize) + + // start timer only if we do batching, otherwise we publish immediately + if batchSize > 1 { + timer = time.NewTimer(time.Second) + defer timer.Stop() + } + + for { + select { + case <-p.shutdownCh: + // flush when closing + if len(batch) > 0 { + p.publish(batch) + clear(batch) + } + + return + case <-timer.C: + if len(batch) > 0 { + p.publish(batch) + clear(batch) + } + case tx := <-gossipCh: + batch = append(batch, tx) + if len(batch) >= batchSize { + // publish + p.publish(batch) + clear(batch) + } + } + } +} + +func (p *TxPool) publish(batch []*types.Transaction) { + txs := types.Transactions(batch) + tx := &proto.Txn{ + Raw: &any.Any{ + Value: txs.MarshalRLPTo(nil), + }, + } + + if err := p.topic.Publish(tx); err != nil { + p.logger.Error("failed to topic tx", "err", err) + } +} + // Start runs the pool's main loop in the background. // On each request received, the appropriate handler // is invoked in a separate goroutine. @@ -254,6 +324,9 @@ func (p *TxPool) Start() { // set default value of txpool pending transactions gauge p.updatePending(0) + // start gossip batchers + p.startGossipBatchers(1, 2500) + // run the handler for high gauge level pruning go func() { for { @@ -287,6 +360,7 @@ func (p *TxPool) Start() { func (p *TxPool) Close() { p.eventManager.Close() close(p.shutdownCh) + stopGossipBatchers() // wait for gossip flush } // SetSigner sets the signer the pool will use @@ -312,15 +386,7 @@ func (p *TxPool) AddTx(tx *types.Transaction) error { // broadcast the transaction only if a topic // subscription is present if p.topic != nil { - tx := &proto.Txn{ - Raw: &any.Any{ - Value: tx.MarshalRLP(), - }, - } - - if err := p.topic.Publish(tx); err != nil { - p.logger.Error("failed to topic tx", "err", err) - } + gossipCh <- tx } return nil @@ -975,26 +1041,28 @@ func (p *TxPool) addGossipTx(obj interface{}, peerID peer.ID) { return } - tx := &types.Transaction{} + txs := &types.Transactions{} - // decode tx - if err := tx.UnmarshalRLP(raw.Raw.Value); err != nil { + // decode txs + if err := txs.UnmarshalRLP(raw.Raw.Value); err != nil { p.logger.Error("failed to decode broadcast tx", "err", err) return } - // add tx - if err := p.addTx(gossip, tx); err != nil { - if errors.Is(err, ErrAlreadyKnown) { - if p.logger.IsDebug() { - p.logger.Debug("rejecting known tx (gossip)", "hash", tx.Hash().String()) + // add txs + for _, tx := range *txs { + if err := p.addTx(gossip, tx); err != nil { + if errors.Is(err, ErrAlreadyKnown) { + if p.logger.IsDebug() { + p.logger.Debug("rejecting known tx (gossip)", "hash", tx.Hash().String()) + } + + return } - return + p.logger.Error("failed to add broadcast tx", "err", err, "hash", tx.Hash().String()) } - - p.logger.Error("failed to add broadcast tx", "err", err, "hash", tx.Hash().String()) } } diff --git a/types/rlp_marshal.go b/types/rlp_marshal.go index 99d11c8593..b9e38f3a76 100644 --- a/types/rlp_marshal.go +++ b/types/rlp_marshal.go @@ -189,3 +189,17 @@ func (t *Transaction) MarshalRLPTo(dst []byte) []byte { return MarshalRLPTo(t.MarshalRLPWith, dst) } + +func (t *Transactions) MarshalRLPTo(dst []byte) []byte { + return MarshalRLPTo(t.MarshalRLPWith, dst) +} + +func (t *Transactions) MarshalRLPWith(a *fastrlp.Arena) *fastrlp.Value { + vv := a.NewArray() + + for _, tt := range *t { + vv.Set(tt.MarshalRLPWith(a)) + } + + return vv +} diff --git a/types/rlp_unmarshal.go b/types/rlp_unmarshal.go index 2c3e724a14..cc7f72c77e 100644 --- a/types/rlp_unmarshal.go +++ b/types/rlp_unmarshal.go @@ -229,6 +229,24 @@ func (h *Header) unmarshalRLPFrom(_ *fastrlp.Parser, v *fastrlp.Value) error { return err } +func (t *Transactions) UnmarshalRLP(input []byte) error { + return UnmarshalRlp(t.unmarshalRLPFrom, input) +} + +func (t *Transactions) unmarshalRLPFrom(p *fastrlp.Parser, v *fastrlp.Value) error { + return unmarshalRLPFrom(p, v, func(_ TxType, p *fastrlp.Parser, v *fastrlp.Value) error { + obj := &Transaction{} + + if err := obj.UnmarshalRLPFrom(p, v); err != nil { + return err + } + + *t = append(*t, obj) + + return nil + }) +} + func (r *Receipts) UnmarshalRLP(input []byte) error { return UnmarshalRlp(r.unmarshalRLPFrom, input) } diff --git a/types/transaction.go b/types/transaction.go index aef7e96c3f..a2d735b5c9 100644 --- a/types/transaction.go +++ b/types/transaction.go @@ -66,6 +66,8 @@ type Transaction struct { size atomic.Pointer[uint64] } +type Transactions []*Transaction + // NewTx creates a new transaction. func NewTx(inner TxData) *Transaction { t := new(Transaction)