Skip to content

Commit

Permalink
Batcher for txpool gossip
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverbundalo committed Nov 1, 2024
1 parent a044bd1 commit b1d6bb3
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 20 deletions.
108 changes: 88 additions & 20 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -191,6 +192,9 @@ type TxPool struct {
localPeerID peer.ID
}

var gossipCh chan *types.Transaction
var gossipWG sync.WaitGroup

// NewTxPool returns a new pool for processing incoming transactions.
func NewTxPool(
logger hclog.Logger,
Expand Down Expand Up @@ -247,13 +251,82 @@ func (p *TxPool) updatePending(i int64) {
metrics.SetGauge([]string{txPoolMetrics, "pending_transactions"}, float32(newPending))
}

func (p *TxPool) startGossipBatchers(batchersNum, batchSize int) {
// 1 channel for all batchers, give it enough space for bulk requests
gossipCh = make(chan *types.Transaction, 4*batchersNum*batchSize)

for i := 0; i < batchersNum; i++ {
gossipWG.Add(1)

go p.gossipBatcher(batchSize)
}
}

func stopGossipBatchers() {
gossipWG.Wait()
}

func (p *TxPool) gossipBatcher(batchSize int) {
var timer *time.Timer

defer gossipWG.Done()
batch := make([]*types.Transaction, 0, batchSize)

// start timer only if we do batching, otherwise we publish immediately
if batchSize > 1 {
timer = time.NewTimer(time.Second)
defer timer.Stop()
}

for {
select {
case <-p.shutdownCh:
// flush when closing
if len(batch) > 0 {
p.publish(batch)
clear(batch)
}

return
case <-timer.C:
if len(batch) > 0 {
p.publish(batch)
clear(batch)
}
case tx := <-gossipCh:
batch = append(batch, tx)
if len(batch) >= batchSize {
// publish
p.publish(batch)
clear(batch)
}
}
}
}

func (p *TxPool) publish(batch []*types.Transaction) {
txs := types.Transactions(batch)
tx := &proto.Txn{
Raw: &any.Any{
Value: txs.MarshalRLPTo(nil),
},
}

if err := p.topic.Publish(tx); err != nil {
p.logger.Error("failed to topic tx", "err", err)
}
}

// Start runs the pool's main loop in the background.
// On each request received, the appropriate handler
// is invoked in a separate goroutine.
func (p *TxPool) Start() {
// set default value of txpool pending transactions gauge
p.updatePending(0)

// start gossip batchers
p.startGossipBatchers(1, 2500)

// run the handler for high gauge level pruning
go func() {
for {
Expand Down Expand Up @@ -287,6 +360,7 @@ func (p *TxPool) Start() {
func (p *TxPool) Close() {
p.eventManager.Close()
close(p.shutdownCh)
stopGossipBatchers() // wait for gossip flush
}

// SetSigner sets the signer the pool will use
Expand All @@ -312,15 +386,7 @@ func (p *TxPool) AddTx(tx *types.Transaction) error {
// broadcast the transaction only if a topic
// subscription is present
if p.topic != nil {
tx := &proto.Txn{
Raw: &any.Any{
Value: tx.MarshalRLP(),
},
}

if err := p.topic.Publish(tx); err != nil {
p.logger.Error("failed to topic tx", "err", err)
}
gossipCh <- tx
}

return nil
Expand Down Expand Up @@ -975,26 +1041,28 @@ func (p *TxPool) addGossipTx(obj interface{}, peerID peer.ID) {
return
}

tx := &types.Transaction{}
txs := &types.Transactions{}

// decode tx
if err := tx.UnmarshalRLP(raw.Raw.Value); err != nil {
// decode txs
if err := txs.UnmarshalRLP(raw.Raw.Value); err != nil {
p.logger.Error("failed to decode broadcast tx", "err", err)

return
}

// add tx
if err := p.addTx(gossip, tx); err != nil {
if errors.Is(err, ErrAlreadyKnown) {
if p.logger.IsDebug() {
p.logger.Debug("rejecting known tx (gossip)", "hash", tx.Hash().String())
// add txs
for _, tx := range *txs {
if err := p.addTx(gossip, tx); err != nil {
if errors.Is(err, ErrAlreadyKnown) {
if p.logger.IsDebug() {
p.logger.Debug("rejecting known tx (gossip)", "hash", tx.Hash().String())
}

return
}

return
p.logger.Error("failed to add broadcast tx", "err", err, "hash", tx.Hash().String())
}

p.logger.Error("failed to add broadcast tx", "err", err, "hash", tx.Hash().String())
}
}

Expand Down
14 changes: 14 additions & 0 deletions types/rlp_marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,17 @@ func (t *Transaction) MarshalRLPTo(dst []byte) []byte {

return MarshalRLPTo(t.MarshalRLPWith, dst)
}

func (t *Transactions) MarshalRLPTo(dst []byte) []byte {
return MarshalRLPTo(t.MarshalRLPWith, dst)
}

func (t *Transactions) MarshalRLPWith(a *fastrlp.Arena) *fastrlp.Value {
vv := a.NewArray()

for _, tt := range *t {
vv.Set(tt.MarshalRLPWith(a))
}

return vv
}
18 changes: 18 additions & 0 deletions types/rlp_unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,24 @@ func (h *Header) unmarshalRLPFrom(_ *fastrlp.Parser, v *fastrlp.Value) error {
return err
}

func (t *Transactions) UnmarshalRLP(input []byte) error {
return UnmarshalRlp(t.unmarshalRLPFrom, input)
}

func (t *Transactions) unmarshalRLPFrom(p *fastrlp.Parser, v *fastrlp.Value) error {
return unmarshalRLPFrom(p, v, func(_ TxType, p *fastrlp.Parser, v *fastrlp.Value) error {
obj := &Transaction{}

if err := obj.UnmarshalRLPFrom(p, v); err != nil {
return err
}

*t = append(*t, obj)

return nil
})
}

func (r *Receipts) UnmarshalRLP(input []byte) error {
return UnmarshalRlp(r.unmarshalRLPFrom, input)
}
Expand Down
2 changes: 2 additions & 0 deletions types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Transaction struct {
size atomic.Pointer[uint64]
}

type Transactions []*Transaction

// NewTx creates a new transaction.
func NewTx(inner TxData) *Transaction {
t := new(Transaction)
Expand Down

0 comments on commit b1d6bb3

Please sign in to comment.