From b1d6bb3fe66a50048c30f6d953a4e6d331ece40b Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Fri, 1 Nov 2024 18:24:41 +0100 Subject: [PATCH 01/21] Batcher for txpool gossip --- txpool/txpool.go | 108 +++++++++++++++++++++++++++++++++-------- types/rlp_marshal.go | 14 ++++++ types/rlp_unmarshal.go | 18 +++++++ types/transaction.go | 2 + 4 files changed, 122 insertions(+), 20 deletions(-) diff --git a/txpool/txpool.go b/txpool/txpool.go index adf9ccb79f..8fe23370d6 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math/big" + "sync" "sync/atomic" "time" @@ -191,6 +192,9 @@ type TxPool struct { localPeerID peer.ID } +var gossipCh chan *types.Transaction +var gossipWG sync.WaitGroup + // NewTxPool returns a new pool for processing incoming transactions. func NewTxPool( logger hclog.Logger, @@ -247,6 +251,72 @@ func (p *TxPool) updatePending(i int64) { metrics.SetGauge([]string{txPoolMetrics, "pending_transactions"}, float32(newPending)) } +func (p *TxPool) startGossipBatchers(batchersNum, batchSize int) { + // 1 channel for all batchers, give it enough space for bulk requests + gossipCh = make(chan *types.Transaction, 4*batchersNum*batchSize) + + for i := 0; i < batchersNum; i++ { + gossipWG.Add(1) + + go p.gossipBatcher(batchSize) + } +} + +func stopGossipBatchers() { + gossipWG.Wait() +} + +func (p *TxPool) gossipBatcher(batchSize int) { + var timer *time.Timer + + defer gossipWG.Done() + batch := make([]*types.Transaction, 0, batchSize) + + // start timer only if we do batching, otherwise we publish immediately + if batchSize > 1 { + timer = time.NewTimer(time.Second) + defer timer.Stop() + } + + for { + select { + case <-p.shutdownCh: + // flush when closing + if len(batch) > 0 { + p.publish(batch) + clear(batch) + } + + return + case <-timer.C: + if len(batch) > 0 { + p.publish(batch) + clear(batch) + } + case tx := <-gossipCh: + batch = append(batch, tx) + if len(batch) >= batchSize { + // publish + p.publish(batch) + clear(batch) + } + } + } +} + +func (p *TxPool) publish(batch []*types.Transaction) { + txs := types.Transactions(batch) + tx := &proto.Txn{ + Raw: &any.Any{ + Value: txs.MarshalRLPTo(nil), + }, + } + + if err := p.topic.Publish(tx); err != nil { + p.logger.Error("failed to topic tx", "err", err) + } +} + // Start runs the pool's main loop in the background. // On each request received, the appropriate handler // is invoked in a separate goroutine. @@ -254,6 +324,9 @@ func (p *TxPool) Start() { // set default value of txpool pending transactions gauge p.updatePending(0) + // start gossip batchers + p.startGossipBatchers(1, 2500) + // run the handler for high gauge level pruning go func() { for { @@ -287,6 +360,7 @@ func (p *TxPool) Start() { func (p *TxPool) Close() { p.eventManager.Close() close(p.shutdownCh) + stopGossipBatchers() // wait for gossip flush } // SetSigner sets the signer the pool will use @@ -312,15 +386,7 @@ func (p *TxPool) AddTx(tx *types.Transaction) error { // broadcast the transaction only if a topic // subscription is present if p.topic != nil { - tx := &proto.Txn{ - Raw: &any.Any{ - Value: tx.MarshalRLP(), - }, - } - - if err := p.topic.Publish(tx); err != nil { - p.logger.Error("failed to topic tx", "err", err) - } + gossipCh <- tx } return nil @@ -975,26 +1041,28 @@ func (p *TxPool) addGossipTx(obj interface{}, peerID peer.ID) { return } - tx := &types.Transaction{} + txs := &types.Transactions{} - // decode tx - if err := tx.UnmarshalRLP(raw.Raw.Value); err != nil { + // decode txs + if err := txs.UnmarshalRLP(raw.Raw.Value); err != nil { p.logger.Error("failed to decode broadcast tx", "err", err) return } - // add tx - if err := p.addTx(gossip, tx); err != nil { - if errors.Is(err, ErrAlreadyKnown) { - if p.logger.IsDebug() { - p.logger.Debug("rejecting known tx (gossip)", "hash", tx.Hash().String()) + // add txs + for _, tx := range *txs { + if err := p.addTx(gossip, tx); err != nil { + if errors.Is(err, ErrAlreadyKnown) { + if p.logger.IsDebug() { + p.logger.Debug("rejecting known tx (gossip)", "hash", tx.Hash().String()) + } + + return } - return + p.logger.Error("failed to add broadcast tx", "err", err, "hash", tx.Hash().String()) } - - p.logger.Error("failed to add broadcast tx", "err", err, "hash", tx.Hash().String()) } } diff --git a/types/rlp_marshal.go b/types/rlp_marshal.go index 99d11c8593..b9e38f3a76 100644 --- a/types/rlp_marshal.go +++ b/types/rlp_marshal.go @@ -189,3 +189,17 @@ func (t *Transaction) MarshalRLPTo(dst []byte) []byte { return MarshalRLPTo(t.MarshalRLPWith, dst) } + +func (t *Transactions) MarshalRLPTo(dst []byte) []byte { + return MarshalRLPTo(t.MarshalRLPWith, dst) +} + +func (t *Transactions) MarshalRLPWith(a *fastrlp.Arena) *fastrlp.Value { + vv := a.NewArray() + + for _, tt := range *t { + vv.Set(tt.MarshalRLPWith(a)) + } + + return vv +} diff --git a/types/rlp_unmarshal.go b/types/rlp_unmarshal.go index 2c3e724a14..cc7f72c77e 100644 --- a/types/rlp_unmarshal.go +++ b/types/rlp_unmarshal.go @@ -229,6 +229,24 @@ func (h *Header) unmarshalRLPFrom(_ *fastrlp.Parser, v *fastrlp.Value) error { return err } +func (t *Transactions) UnmarshalRLP(input []byte) error { + return UnmarshalRlp(t.unmarshalRLPFrom, input) +} + +func (t *Transactions) unmarshalRLPFrom(p *fastrlp.Parser, v *fastrlp.Value) error { + return unmarshalRLPFrom(p, v, func(_ TxType, p *fastrlp.Parser, v *fastrlp.Value) error { + obj := &Transaction{} + + if err := obj.UnmarshalRLPFrom(p, v); err != nil { + return err + } + + *t = append(*t, obj) + + return nil + }) +} + func (r *Receipts) UnmarshalRLP(input []byte) error { return UnmarshalRlp(r.unmarshalRLPFrom, input) } diff --git a/types/transaction.go b/types/transaction.go index aef7e96c3f..a2d735b5c9 100644 --- a/types/transaction.go +++ b/types/transaction.go @@ -66,6 +66,8 @@ type Transaction struct { size atomic.Pointer[uint64] } +type Transactions []*Transaction + // NewTx creates a new transaction. func NewTx(inner TxData) *Transaction { t := new(Transaction) From e37460e1949028c80aabbd25cd8228f671a26ef5 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Fri, 1 Nov 2024 18:55:32 +0100 Subject: [PATCH 02/21] reduce gossip batch timer --- txpool/txpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpool/txpool.go b/txpool/txpool.go index 8fe23370d6..9c2531b615 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -274,7 +274,7 @@ func (p *TxPool) gossipBatcher(batchSize int) { // start timer only if we do batching, otherwise we publish immediately if batchSize > 1 { - timer = time.NewTimer(time.Second) + timer = time.NewTimer(time.Millisecond * 500) defer timer.Stop() } From f177da28367d2b1d97703341fca03d068cab9a36 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Fri, 1 Nov 2024 20:10:52 +0100 Subject: [PATCH 03/21] minor fix --- types/rlp_marshal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/types/rlp_marshal.go b/types/rlp_marshal.go index b9e38f3a76..801f9ce9e2 100644 --- a/types/rlp_marshal.go +++ b/types/rlp_marshal.go @@ -190,7 +190,7 @@ func (t *Transaction) MarshalRLPTo(dst []byte) []byte { return MarshalRLPTo(t.MarshalRLPWith, dst) } -func (t *Transactions) MarshalRLPTo(dst []byte) []byte { +func (t Transactions) MarshalRLPTo(dst []byte) []byte { return MarshalRLPTo(t.MarshalRLPWith, dst) } From ff48089e67f5b4135db24595cdd98b8ea8d29153 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Fri, 1 Nov 2024 21:14:47 +0100 Subject: [PATCH 04/21] reset batch --- txpool/txpool.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/txpool/txpool.go b/txpool/txpool.go index 9c2531b615..7883a77d0a 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -283,29 +283,27 @@ func (p *TxPool) gossipBatcher(batchSize int) { case <-p.shutdownCh: // flush when closing if len(batch) > 0 { - p.publish(batch) - clear(batch) + p.publish(&batch) } return case <-timer.C: if len(batch) > 0 { - p.publish(batch) - clear(batch) + p.publish(&batch) + batch = batch[:0] } case tx := <-gossipCh: batch = append(batch, tx) if len(batch) >= batchSize { - // publish - p.publish(batch) - clear(batch) + p.publish(&batch) + batch = batch[:0] } } } } -func (p *TxPool) publish(batch []*types.Transaction) { - txs := types.Transactions(batch) +func (p *TxPool) publish(batch *[]*types.Transaction) { + txs := types.Transactions(*batch) tx := &proto.Txn{ Raw: &any.Any{ Value: txs.MarshalRLPTo(nil), @@ -315,6 +313,8 @@ func (p *TxPool) publish(batch []*types.Transaction) { if err := p.topic.Publish(tx); err != nil { p.logger.Error("failed to topic tx", "err", err) } + + clear(*batch) } // Start runs the pool's main loop in the background. From bf7d0f79839903db0cc9acffeba72243b24740dd Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Sat, 2 Nov 2024 10:24:11 +0100 Subject: [PATCH 05/21] ticker for txpool gossip batching --- txpool/txpool.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/txpool/txpool.go b/txpool/txpool.go index 7883a77d0a..4f61cff7a2 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -267,16 +267,11 @@ func stopGossipBatchers() { } func (p *TxPool) gossipBatcher(batchSize int) { - var timer *time.Timer - defer gossipWG.Done() batch := make([]*types.Transaction, 0, batchSize) - // start timer only if we do batching, otherwise we publish immediately - if batchSize > 1 { - timer = time.NewTimer(time.Millisecond * 500) - defer timer.Stop() - } + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() for { select { @@ -287,7 +282,7 @@ func (p *TxPool) gossipBatcher(batchSize int) { } return - case <-timer.C: + case <-ticker.C: if len(batch) > 0 { p.publish(&batch) batch = batch[:0] From 4db34765f484f7909c65b7ac9aa80f9198948507 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Sat, 2 Nov 2024 13:13:21 +0100 Subject: [PATCH 06/21] fixed marshaling --- types/rlp_marshal.go | 4 ++++ types/rlp_unmarshal.go | 36 ++++++++++++++++++------------------ 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/types/rlp_marshal.go b/types/rlp_marshal.go index 801f9ce9e2..cb19d88420 100644 --- a/types/rlp_marshal.go +++ b/types/rlp_marshal.go @@ -198,6 +198,10 @@ func (t *Transactions) MarshalRLPWith(a *fastrlp.Arena) *fastrlp.Value { vv := a.NewArray() for _, tt := range *t { + if tt.Type() != LegacyTxType { + vv.Set(a.NewCopyBytes([]byte{byte(tt.Type())})) + } + vv.Set(tt.MarshalRLPWith(a)) } diff --git a/types/rlp_unmarshal.go b/types/rlp_unmarshal.go index cc7f72c77e..cc33b2a1e1 100644 --- a/types/rlp_unmarshal.go +++ b/types/rlp_unmarshal.go @@ -229,24 +229,6 @@ func (h *Header) unmarshalRLPFrom(_ *fastrlp.Parser, v *fastrlp.Value) error { return err } -func (t *Transactions) UnmarshalRLP(input []byte) error { - return UnmarshalRlp(t.unmarshalRLPFrom, input) -} - -func (t *Transactions) unmarshalRLPFrom(p *fastrlp.Parser, v *fastrlp.Value) error { - return unmarshalRLPFrom(p, v, func(_ TxType, p *fastrlp.Parser, v *fastrlp.Value) error { - obj := &Transaction{} - - if err := obj.UnmarshalRLPFrom(p, v); err != nil { - return err - } - - *t = append(*t, obj) - - return nil - }) -} - func (r *Receipts) UnmarshalRLP(input []byte) error { return UnmarshalRlp(r.unmarshalRLPFrom, input) } @@ -376,6 +358,24 @@ func (l *Log) unmarshalRLPFrom(_ *fastrlp.Parser, v *fastrlp.Value) error { return nil } +func (t *Transactions) UnmarshalRLP(input []byte) error { + return UnmarshalRlp(t.unmarshalRLPFrom, input) +} + +func (t *Transactions) unmarshalRLPFrom(p *fastrlp.Parser, v *fastrlp.Value) error { + return unmarshalRLPFrom(p, v, func(txType TxType, p *fastrlp.Parser, v *fastrlp.Value) error { + obj := NewTxWithType(txType) + + if err := obj.UnmarshalRLPFrom(p, v); err != nil { + return err + } + + *t = append(*t, obj) + + return nil + }) +} + // UnmarshalRLP unmarshals transaction from byte slice // Caution: Hash calculation should be done from the outside! func (t *Transaction) UnmarshalRLP(input []byte) error { From e1f891d483ecfdeab4ae8d59617d42d1429a5255 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Sun, 3 Nov 2024 17:40:50 +0100 Subject: [PATCH 07/21] handle last tx when closing (if any) --- txpool/txpool.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/txpool/txpool.go b/txpool/txpool.go index 4f61cff7a2..ad3cc99602 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -263,6 +263,7 @@ func (p *TxPool) startGossipBatchers(batchersNum, batchSize int) { } func stopGossipBatchers() { + close(gossipCh) gossipWG.Wait() } @@ -270,13 +271,23 @@ func (p *TxPool) gossipBatcher(batchSize int) { defer gossipWG.Done() batch := make([]*types.Transaction, 0, batchSize) - ticker := time.NewTicker(time.Millisecond * 500) + tickerPeriod := time.Hour * 24 // reduce empty looping when no batching + if batchSize > 1 { + tickerPeriod = time.Millisecond * 500 + } + + ticker := time.NewTicker(tickerPeriod) defer ticker.Stop() for { select { case <-p.shutdownCh: // flush when closing + tx, ok := <-gossipCh + if ok { + batch = append(batch, tx) + } + if len(batch) > 0 { p.publish(&batch) } @@ -320,7 +331,7 @@ func (p *TxPool) Start() { p.updatePending(0) // start gossip batchers - p.startGossipBatchers(1, 2500) + p.startGossipBatchers(1, 10000) // run the handler for high gauge level pruning go func() { From 3ebf596a349784eb4aff09f62730464aeca27513 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Mon, 4 Nov 2024 10:09:49 +0100 Subject: [PATCH 08/21] improve gossip flushing --- server/server.go | 10 +++++----- txpool/txpool.go | 32 +++++++++++++++----------------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/server/server.go b/server/server.go index 39ae1e2152..ae86b1a966 100644 --- a/server/server.go +++ b/server/server.go @@ -1145,11 +1145,6 @@ func (s *Server) Close() { s.logger.Error("failed to close blockchain", "err", err.Error()) } - // Close the networking layer - if err := s.network.Close(); err != nil { - s.logger.Error("failed to close networking", "err", err.Error()) - } - // Close the consensus layer if err := s.consensus.Close(); err != nil { s.logger.Error("failed to close consensus", "err", err.Error()) @@ -1169,6 +1164,11 @@ func (s *Server) Close() { // Close the txpool's main loop s.txpool.Close() + // Close the networking layer + if err := s.network.Close(); err != nil { + s.logger.Error("failed to close networking", "err", err.Error()) + } + // Close DataDog profiler s.closeDataDogProfiler() diff --git a/txpool/txpool.go b/txpool/txpool.go index ad3cc99602..cb111c8e9f 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -269,6 +269,7 @@ func stopGossipBatchers() { func (p *TxPool) gossipBatcher(batchSize int) { defer gossipWG.Done() + batch := make([]*types.Transaction, 0, batchSize) tickerPeriod := time.Hour * 24 // reduce empty looping when no batching @@ -281,28 +282,25 @@ func (p *TxPool) gossipBatcher(batchSize int) { for { select { - case <-p.shutdownCh: - // flush when closing - tx, ok := <-gossipCh - if ok { - batch = append(batch, tx) - } - - if len(batch) > 0 { - p.publish(&batch) - } - - return case <-ticker.C: if len(batch) > 0 { p.publish(&batch) batch = batch[:0] } - case tx := <-gossipCh: - batch = append(batch, tx) - if len(batch) >= batchSize { - p.publish(&batch) - batch = batch[:0] + case tx, chOpen := <-gossipCh: + if chOpen { + batch = append(batch, tx) + if len(batch) >= batchSize { + p.publish(&batch) + batch = batch[:0] + } + } else { + // flush when closing + if len(batch) > 0 { + p.publish(&batch) + } + + return } } } From 526a57b4b7f3bd0fc10126bed34f0f43a91738dd Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Mon, 4 Nov 2024 16:16:06 +0100 Subject: [PATCH 09/21] LT runner fix --- loadtest/runner/base_load_test_runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loadtest/runner/base_load_test_runner.go b/loadtest/runner/base_load_test_runner.go index aaaeaf5f5c..b5b8c3629e 100644 --- a/loadtest/runner/base_load_test_runner.go +++ b/loadtest/runner/base_load_test_runner.go @@ -535,7 +535,7 @@ func (r *BaseLoadTestRunner) calculateResults(blockInfos map[uint64]*BlockInfo, if _, ok := blockTimeMap[block]; !ok { if currentBlockInfo, ok := blockInfos[block]; !ok { - currentBlock, err := r.client.GetBlockByNumber(jsonrpc.BlockNumber(parentBlockNum), true) + currentBlock, err := r.client.GetBlockByNumber(jsonrpc.BlockNumber(block), true) if err != nil { return err } From 35a523772627f8c418351d6acf91025566d392f9 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Mon, 4 Nov 2024 16:46:05 +0100 Subject: [PATCH 10/21] fix for LT block time calculation --- loadtest/runner/base_load_test_runner.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/loadtest/runner/base_load_test_runner.go b/loadtest/runner/base_load_test_runner.go index b5b8c3629e..344cb9f0f0 100644 --- a/loadtest/runner/base_load_test_runner.go +++ b/loadtest/runner/base_load_test_runner.go @@ -516,22 +516,22 @@ func (r *BaseLoadTestRunner) calculateResults(blockInfos map[uint64]*BlockInfo, for block := range uniqueBlocks { currentBlockTxsNum := 0 - parentBlockNum := block - 1 + nextBlockNum := block + 1 - if _, exists := blockTimeMap[parentBlockNum]; !exists { - if parentBlockInfo, exists := blockInfos[parentBlockNum]; !exists { - parentBlock, err := r.client.GetBlockByNumber(jsonrpc.BlockNumber(parentBlockNum), false) + if _, exists := blockTimeMap[nextBlockNum]; !exists { + if nextBlockInfo, exists := blockInfos[nextBlockNum]; !exists { + nextBlock, err := r.client.GetBlockByNumber(jsonrpc.BlockNumber(nextBlockNum), false) if err != nil { return err } - blockTimeMap[parentBlockNum] = parentBlock.Header.Timestamp + blockTimeMap[nextBlockNum] = nextBlock.Header.Timestamp } else { - blockTimeMap[parentBlockNum] = parentBlockInfo.CreatedAt + blockTimeMap[nextBlockNum] = nextBlockInfo.CreatedAt } } - parentBlockTimestamp := blockTimeMap[parentBlockNum] + nextBlockTimestamp := blockTimeMap[nextBlockNum] if _, ok := blockTimeMap[block]; !ok { if currentBlockInfo, ok := blockInfos[block]; !ok { @@ -553,7 +553,7 @@ func (r *BaseLoadTestRunner) calculateResults(blockInfos map[uint64]*BlockInfo, } currentBlockTimestamp := blockTimeMap[block] - blockTime := math.Abs(float64(currentBlockTimestamp - parentBlockTimestamp)) + blockTime := math.Abs(float64(nextBlockTimestamp - currentBlockTimestamp)) currentBlockTxsPerSecond := float64(currentBlockTxsNum) / blockTime @@ -579,7 +579,7 @@ func (r *BaseLoadTestRunner) calculateResults(blockInfos map[uint64]*BlockInfo, } for _, info := range blockInfos { - info.BlockTime = math.Abs(float64(info.CreatedAt - blockTimeMap[info.Number-1])) + info.BlockTime = math.Abs(float64(blockTimeMap[info.Number+1] - info.CreatedAt)) info.TPS = float64(info.NumTxs) / info.BlockTime } From d3dad47bc8e4b6b51d6523ff0677771a52380fe8 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Mon, 4 Nov 2024 19:03:38 +0100 Subject: [PATCH 11/21] return err if next block is not mined yet --- loadtest/runner/base_load_test_runner.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/loadtest/runner/base_load_test_runner.go b/loadtest/runner/base_load_test_runner.go index 344cb9f0f0..af0e8f002d 100644 --- a/loadtest/runner/base_load_test_runner.go +++ b/loadtest/runner/base_load_test_runner.go @@ -525,6 +525,10 @@ func (r *BaseLoadTestRunner) calculateResults(blockInfos map[uint64]*BlockInfo, return err } + if nextBlock == nil { + return fmt.Errorf("next block %d not mined yet, increase #txs in test", nextBlockNum) + } + blockTimeMap[nextBlockNum] = nextBlock.Header.Timestamp } else { blockTimeMap[nextBlockNum] = nextBlockInfo.CreatedAt From 513ddb55e40fb06a5e16cf88b9116228ff7fbaa7 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Tue, 5 Nov 2024 08:39:09 +0100 Subject: [PATCH 12/21] txpool gossip batch size added to config --- command/server/config/config.go | 2 ++ command/server/params.go | 2 ++ command/server/server.go | 7 +++++ server/config.go | 1 + server/server.go | 1 + txpool/txpool.go | 47 +++++++++++++++++++-------------- 6 files changed, 40 insertions(+), 20 deletions(-) diff --git a/command/server/config/config.go b/command/server/config/config.go index 63ef7e9dd9..e77d4979cc 100644 --- a/command/server/config/config.go +++ b/command/server/config/config.go @@ -69,6 +69,7 @@ type TxPool struct { PriceLimit uint64 `json:"price_limit" yaml:"price_limit"` MaxSlots uint64 `json:"max_slots" yaml:"max_slots"` MaxAccountEnqueued uint64 `json:"max_account_enqueued" yaml:"max_account_enqueued"` + GossipBatchSize uint64 `json:"gossip_batch_size" yaml:"gossip_batch_size"` } // Headers defines the HTTP response headers required to enable CORS. @@ -143,6 +144,7 @@ func DefaultConfig() *Config { PriceLimit: 0, MaxSlots: 4096, MaxAccountEnqueued: 128, + GossipBatchSize: 1, }, LogLevel: "INFO", RestoreFile: "", diff --git a/command/server/params.go b/command/server/params.go index f8fc6b26ab..ce3da427e2 100644 --- a/command/server/params.go +++ b/command/server/params.go @@ -41,6 +41,7 @@ const ( tlsCertFileLocationFlag = "tls-cert-file" tlsKeyFileLocationFlag = "tls-key-file" gossipMessageSizeFlag = "gossip-msg-size" + gossipBatchSizeFlag = "gossip-batch-size" relayerFlag = "relayer" @@ -183,6 +184,7 @@ func (p *serverParams) generateConfig() *server.Config { PriceLimit: p.rawConfig.TxPool.PriceLimit, MaxSlots: p.rawConfig.TxPool.MaxSlots, MaxAccountEnqueued: p.rawConfig.TxPool.MaxAccountEnqueued, + GossipBatchSize: p.rawConfig.TxPool.GossipBatchSize, SecretsManager: p.secretsConfig, RestoreFile: p.getRestoreFilePath(), LogLevel: hclog.LevelFromString(p.rawConfig.LogLevel), diff --git a/command/server/server.go b/command/server/server.go index 8a15b7c9df..8b9178f223 100644 --- a/command/server/server.go +++ b/command/server/server.go @@ -193,6 +193,13 @@ func setFlags(cmd *cobra.Command) { "maximum number of enqueued transactions per account", ) + cmd.Flags().Uint64Var( + ¶ms.rawConfig.TxPool.GossipBatchSize, + gossipBatchSizeFlag, + defaultConfig.TxPool.GossipBatchSize, + "maximum number of transactions in gossip message", + ) + cmd.Flags().StringArrayVar( ¶ms.rawConfig.CorsAllowedOrigins, corsOriginFlag, diff --git a/server/config.go b/server/config.go index 5639726af9..769021c9c9 100644 --- a/server/config.go +++ b/server/config.go @@ -25,6 +25,7 @@ type Config struct { PriceLimit uint64 MaxAccountEnqueued uint64 MaxSlots uint64 + GossipBatchSize uint64 Telemetry *Telemetry Network *network.Config diff --git a/server/server.go b/server/server.go index ae86b1a966..27481505e1 100644 --- a/server/server.go +++ b/server/server.go @@ -384,6 +384,7 @@ func NewServer(config *Config) (*Server, error) { MaxSlots: m.config.MaxSlots, PriceLimit: m.config.PriceLimit, MaxAccountEnqueued: m.config.MaxAccountEnqueued, + GossipBatchSize: m.config.GossipBatchSize, ChainID: big.NewInt(m.config.Chain.Params.ChainID), PeerID: m.network.AddrInfo().ID, }, diff --git a/txpool/txpool.go b/txpool/txpool.go index cb111c8e9f..899588a110 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -101,6 +101,7 @@ type Config struct { PriceLimit uint64 MaxSlots uint64 MaxAccountEnqueued uint64 + GossipBatchSize uint64 ChainID *big.Int PeerID peer.ID } @@ -190,10 +191,15 @@ type TxPool struct { // localPeerID is the peer ID of the local node that is running the txpool localPeerID peer.ID + + // maximum number of transactions in gossip message + gossipBatchSize int } -var gossipCh chan *types.Transaction -var gossipWG sync.WaitGroup +var ( + gossipCh chan *types.Transaction + gossipWG sync.WaitGroup +) // NewTxPool returns a new pool for processing incoming transactions. func NewTxPool( @@ -205,16 +211,17 @@ func NewTxPool( config *Config, ) (*TxPool, error) { pool := &TxPool{ - logger: logger.Named("txpool"), - forks: forks, - store: store, - executables: newPricesQueue(0, nil), - accounts: accountsMap{maxEnqueuedLimit: config.MaxAccountEnqueued}, - index: lookupMap{all: make(map[types.Hash]*types.Transaction)}, - gauge: slotGauge{height: 0, max: config.MaxSlots}, - priceLimit: config.PriceLimit, - chainID: config.ChainID, - localPeerID: config.PeerID, + logger: logger.Named("txpool"), + forks: forks, + store: store, + executables: newPricesQueue(0, nil), + accounts: accountsMap{maxEnqueuedLimit: config.MaxAccountEnqueued}, + index: lookupMap{all: make(map[types.Hash]*types.Transaction)}, + gauge: slotGauge{height: 0, max: config.MaxSlots}, + priceLimit: config.PriceLimit, + chainID: config.ChainID, + localPeerID: config.PeerID, + gossipBatchSize: int(config.GossipBatchSize), // main loop channels promoteReqCh: make(chan promoteRequest), @@ -251,14 +258,14 @@ func (p *TxPool) updatePending(i int64) { metrics.SetGauge([]string{txPoolMetrics, "pending_transactions"}, float32(newPending)) } -func (p *TxPool) startGossipBatchers(batchersNum, batchSize int) { +func (p *TxPool) startGossipBatchers(batchersNum int) { // 1 channel for all batchers, give it enough space for bulk requests - gossipCh = make(chan *types.Transaction, 4*batchersNum*batchSize) + gossipCh = make(chan *types.Transaction, 4*batchersNum*p.gossipBatchSize) for i := 0; i < batchersNum; i++ { gossipWG.Add(1) - go p.gossipBatcher(batchSize) + go p.gossipBatcher() } } @@ -267,13 +274,13 @@ func stopGossipBatchers() { gossipWG.Wait() } -func (p *TxPool) gossipBatcher(batchSize int) { +func (p *TxPool) gossipBatcher() { defer gossipWG.Done() - batch := make([]*types.Transaction, 0, batchSize) + batch := make([]*types.Transaction, 0, p.gossipBatchSize) tickerPeriod := time.Hour * 24 // reduce empty looping when no batching - if batchSize > 1 { + if p.gossipBatchSize > 1 { tickerPeriod = time.Millisecond * 500 } @@ -290,7 +297,7 @@ func (p *TxPool) gossipBatcher(batchSize int) { case tx, chOpen := <-gossipCh: if chOpen { batch = append(batch, tx) - if len(batch) >= batchSize { + if len(batch) >= p.gossipBatchSize { p.publish(&batch) batch = batch[:0] } @@ -329,7 +336,7 @@ func (p *TxPool) Start() { p.updatePending(0) // start gossip batchers - p.startGossipBatchers(1, 10000) + p.startGossipBatchers(1) // run the handler for high gauge level pruning go func() { From a4db278cce4983112afeaab7f445e146d47ad303 Mon Sep 17 00:00:00 2001 From: Branislav Kojic Date: Tue, 5 Nov 2024 10:38:55 +0100 Subject: [PATCH 13/21] Change `gossip_batch_size` variable --- .github/workflows/deploy-network.yml | 3 ++- .github/workflows/destroy-network.yml | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/deploy-network.yml b/.github/workflows/deploy-network.yml index 122a486b02..0e319af064 100644 --- a/.github/workflows/deploy-network.yml +++ b/.github/workflows/deploy-network.yml @@ -148,7 +148,7 @@ jobs: uses: actions/checkout@v4.1.1 with: repository: Ethernal-Tech/blade-deployment - ref: main + ref: gossip-batch-size - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v4.0.1 with: @@ -196,6 +196,7 @@ jobs: sed 's/is_bridge_active: .*/is_bridge_active: ${{ inputs.is_bridge_active }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/is_london_fork_active: .*/is_london_fork_active: ${{ inputs.is_london_fork_active }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/gossip_msg_size: .*/gossip_msg_size: ${{ inputs.gossip_msg_size }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml + sed 's/gossip_batch_size: .*/gossip_batch_size: 10000/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/json_rpc_batch_request_limit: .*/json_rpc_batch_request_limit: 0/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/log_level: .*/log_level: ${{ vars.LOG_LEVEL }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/epoch_reward: .*/epoch_reward: 1000000000/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml diff --git a/.github/workflows/destroy-network.yml b/.github/workflows/destroy-network.yml index 5ee456b903..81349d619f 100644 --- a/.github/workflows/destroy-network.yml +++ b/.github/workflows/destroy-network.yml @@ -63,7 +63,7 @@ jobs: uses: actions/checkout@v4.1.1 with: repository: Ethernal-Tech/blade-deployment - ref: main + ref: gossip-batch-size - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v4.0.1 with: @@ -127,7 +127,7 @@ jobs: uses: actions/checkout@v4.1.1 with: repository: Ethernal-Tech/blade-deployment - ref: main + ref: gossip-batch-size - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v4.0.1 with: From 658f584d308309966845eea85cc5231d89ad0e30 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Tue, 5 Nov 2024 12:07:50 +0100 Subject: [PATCH 14/21] UT fix --- txpool/txpool.go | 33 +++++++++++++++++---------------- txpool/txpool_test.go | 27 +++++++++++++-------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/txpool/txpool.go b/txpool/txpool.go index 899588a110..e2cfd4e5ae 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -194,12 +194,15 @@ type TxPool struct { // maximum number of transactions in gossip message gossipBatchSize int -} -var ( + // channel for gossip batching gossipCh chan *types.Transaction + + // WG for batch flushing gossipWG sync.WaitGroup -) +} + +const batchersNum = 1 // NewTxPool returns a new pool for processing incoming transactions. func NewTxPool( @@ -227,6 +230,7 @@ func NewTxPool( promoteReqCh: make(chan promoteRequest), pruneCh: make(chan struct{}), shutdownCh: make(chan struct{}), + gossipCh: make(chan *types.Transaction, 4*batchersNum*config.GossipBatchSize), } // Attach the event manager @@ -258,24 +262,21 @@ func (p *TxPool) updatePending(i int64) { metrics.SetGauge([]string{txPoolMetrics, "pending_transactions"}, float32(newPending)) } -func (p *TxPool) startGossipBatchers(batchersNum int) { - // 1 channel for all batchers, give it enough space for bulk requests - gossipCh = make(chan *types.Transaction, 4*batchersNum*p.gossipBatchSize) - +func (p *TxPool) startGossipBatchers() { for i := 0; i < batchersNum; i++ { - gossipWG.Add(1) + p.gossipWG.Add(1) go p.gossipBatcher() } } -func stopGossipBatchers() { - close(gossipCh) - gossipWG.Wait() +func (p *TxPool) stopGossipBatchers() { + close(p.gossipCh) + p.gossipWG.Wait() } func (p *TxPool) gossipBatcher() { - defer gossipWG.Done() + defer p.gossipWG.Done() batch := make([]*types.Transaction, 0, p.gossipBatchSize) @@ -294,7 +295,7 @@ func (p *TxPool) gossipBatcher() { p.publish(&batch) batch = batch[:0] } - case tx, chOpen := <-gossipCh: + case tx, chOpen := <-p.gossipCh: if chOpen { batch = append(batch, tx) if len(batch) >= p.gossipBatchSize { @@ -336,7 +337,7 @@ func (p *TxPool) Start() { p.updatePending(0) // start gossip batchers - p.startGossipBatchers(1) + p.startGossipBatchers() // run the handler for high gauge level pruning go func() { @@ -371,7 +372,7 @@ func (p *TxPool) Start() { func (p *TxPool) Close() { p.eventManager.Close() close(p.shutdownCh) - stopGossipBatchers() // wait for gossip flush + p.stopGossipBatchers() // wait for gossip flush } // SetSigner sets the signer the pool will use @@ -397,7 +398,7 @@ func (p *TxPool) AddTx(tx *types.Transaction) error { // broadcast the transaction only if a topic // subscription is present if p.topic != nil { - gossipCh <- tx + p.gossipCh <- tx } return nil diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index adc47379a5..86d36f0a0b 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -718,6 +718,17 @@ func TestAddTxHighPressure(t *testing.T) { func TestAddGossipTx(t *testing.T) { t.Parallel() + getProtoTx := func(signedTx *types.Transaction) *proto.Txn { + batch := []*types.Transaction{signedTx} + txs := types.Transactions(batch) + + return &proto.Txn{ + Raw: &any.Any{ + Value: txs.MarshalRLPTo(nil), + }, + } + } + key, sender := tests.GenerateKeyAndAddr(t) signer := crypto.NewEIP155Signer(100) tx := newTx(types.ZeroAddress, 1, 1, types.LegacyTxType) @@ -739,13 +750,7 @@ func TestAddGossipTx(t *testing.T) { } // send tx - protoTx := &proto.Txn{ - Raw: &any.Any{ - Value: signedTx.MarshalRLP(), - }, - } - - pool.addGossipTx(protoTx, "") + pool.addGossipTx(getProtoTx(signedTx), "") assert.Equal(t, uint64(1), pool.accounts.get(sender).enqueued.length()) }) @@ -769,13 +774,7 @@ func TestAddGossipTx(t *testing.T) { } // send tx - protoTx := &proto.Txn{ - Raw: &any.Any{ - Value: signedTx.MarshalRLP(), - }, - } - - pool.addGossipTx(protoTx, "") + pool.addGossipTx(getProtoTx(signedTx), "") assert.Equal(t, uint64(0), pool.accounts.get(sender).enqueued.length()) }) From dc7611aca1231602ad50e6acbef3aeb93c0bf2b7 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Tue, 5 Nov 2024 12:57:04 +0100 Subject: [PATCH 15/21] update some test times --- blockchain/storagev2/leveldb/leveldb_perf_test.go | 2 +- network/gossip_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/blockchain/storagev2/leveldb/leveldb_perf_test.go b/blockchain/storagev2/leveldb/leveldb_perf_test.go index a611edd11b..43bcfa3201 100644 --- a/blockchain/storagev2/leveldb/leveldb_perf_test.go +++ b/blockchain/storagev2/leveldb/leveldb_perf_test.go @@ -39,7 +39,7 @@ func Benchmark(b *testing.B) { }() blockCount := 1000 - storagev2.BenchmarkStorage(b, blockCount, s, 26, 15) // CI times + storagev2.BenchmarkStorage(b, blockCount, s, 27, 15) // CI times size, err := dbSize(path) require.NoError(b, err) diff --git a/network/gossip_test.go b/network/gossip_test.go index 9099d5dfc9..ff05a3f7e2 100644 --- a/network/gossip_test.go +++ b/network/gossip_test.go @@ -75,7 +75,7 @@ func TestSimpleGossip(t *testing.T) { err := WaitForSubscribers(ctx, publisher, topicName, len(servers)-1) require.NoError(t, err, "Unable to wait for subscribers") - time.Sleep(300 * time.Millisecond) + time.Sleep(500 * time.Millisecond) err = publisherTopic.Publish( &testproto.GenericMessage{ From e70d767f43ca0323ff67bb3c066e9115350bca19 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Tue, 5 Nov 2024 14:45:52 +0100 Subject: [PATCH 16/21] reduce gossip msg size to 16 MB --- .github/workflows/nightly.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index e9e40fc468..3efa3caeb5 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -30,7 +30,7 @@ jobs: max_enqueued: "20000000" is_london_fork_active: true is_bridge_active: false - gossip_msg_size: "33554432" + gossip_msg_size: "16777216" notification: false secrets: AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }} @@ -168,7 +168,7 @@ jobs: max_enqueued: "20000000" is_london_fork_active: true is_bridge_active: false - gossip_msg_size: "33554432" + gossip_msg_size: "16777216" logs: true build_blade_output: ${{ needs.ci.outputs.build_blade }} lint_output: ${{ needs.ci.outputs.lint }} From 1eaeac04418203c6274528abd9875e0c784cc465 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Fri, 8 Nov 2024 13:02:58 +0100 Subject: [PATCH 17/21] avoid sender recovery when tx is available in the pool --- server/server.go | 1 + state/executor.go | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/server/server.go b/server/server.go index 27481505e1..e2b38e154b 100644 --- a/server/server.go +++ b/server/server.go @@ -394,6 +394,7 @@ func NewServer(config *Config) (*Server, error) { } m.txpool.SetSigner(signer) + m.executor.GetPoolTxHook = m.txpool.GetPendingTx } { diff --git a/state/executor.go b/state/executor.go index 6ce041ecc7..b79668ca7a 100644 --- a/state/executor.go +++ b/state/executor.go @@ -46,6 +46,7 @@ type Executor struct { PostHook func(txn *Transition) GenesisPostHook func(*Transition) error + GetPoolTxHook func(types.Hash) (*types.Transaction, bool) IsL1OriginatedToken bool } @@ -176,6 +177,12 @@ func (e *Executor) ProcessBlock( return nil, runtime.ErrOutOfGas } + if t.From() == emptyFrom && t.Type() != types.StateTxType { + if poolTx, ok := e.GetPoolTxHook(t.Hash()); ok { + t.SetFrom(poolTx.From()) + } + } + if err = txn.Write(t); err != nil { e.logger.Error("failed to write transaction to the block", "tx", t, "err", err) From b653efc3b28be06108b6a3cb7748a9fb9414195e Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Tue, 12 Nov 2024 08:52:39 +0100 Subject: [PATCH 18/21] remove txs hash from debug logging --- consensus/polybft/fsm.go | 31 ++++++++++++++----------------- state/executor.go | 23 +++++++++-------------- 2 files changed, 23 insertions(+), 31 deletions(-) diff --git a/consensus/polybft/fsm.go b/consensus/polybft/fsm.go index aa97efb585..d2785a2cb1 100644 --- a/consensus/polybft/fsm.go +++ b/consensus/polybft/fsm.go @@ -204,35 +204,32 @@ func (f *fsm) BuildProposal(currentRound uint64) ([]byte, error) { return nil, err } - if f.logger.GetLevel() <= hclog.Debug { + logLvl := f.logger.GetLevel() + if logLvl <= hclog.Debug { checkpointHash, err := extra.Checkpoint.Hash(f.backend.GetChainID(), f.Height(), stateBlock.Block.Hash()) if err != nil { return nil, fmt.Errorf("failed to calculate proposal hash: %w", err) } - var buf bytes.Buffer - - for i, tx := range stateBlock.Block.Transactions { - if f.logger.IsDebug() { - buf.WriteString(tx.Hash().String()) - } else if f.logger.IsTrace() { - buf.WriteString(tx.String()) - } - - if i != len(stateBlock.Block.Transactions)-1 { - buf.WriteString("\n") - } - } - f.logger.Debug("[FSM.BuildProposal]", "block num", stateBlock.Block.Number(), "round", currentRound, "state root", stateBlock.Block.Header.StateRoot, "proposal hash", checkpointHash.String(), "txs count", len(stateBlock.Block.Transactions), - "txs", buf.String(), - "finsihedIn", time.Since(start), + "finishedIn", time.Since(start), ) + + if logLvl < hclog.Debug { + var buf bytes.Buffer + + for _, tx := range stateBlock.Block.Transactions { + buf.WriteString(tx.String()) + buf.WriteString("\n") + } + + f.logger.Log(logLvl, "[FSM.BuildProposal]", "txs", buf.String()) + } } f.target = stateBlock diff --git a/state/executor.go b/state/executor.go index b79668ca7a..fb2d85732b 100644 --- a/state/executor.go +++ b/state/executor.go @@ -172,7 +172,7 @@ func (e *Executor) ProcessBlock( logLvl = e.logger.GetLevel() ) - for i, t := range block.Transactions { + for _, t := range block.Transactions { if t.Gas() > block.Header.GasLimit { return nil, runtime.ErrOutOfGas } @@ -189,27 +189,22 @@ func (e *Executor) ProcessBlock( return nil, err } - if logLvl <= hclog.Debug { - if e.logger.IsTrace() { - _, _ = buf.WriteString(t.String()) - } - - if e.logger.IsDebug() { - _, _ = buf.WriteString(t.Hash().String()) - } - - if i != len(block.Transactions)-1 { - _, _ = buf.WriteString("\n") - } + if logLvl < hclog.Debug { + buf.WriteString(t.String()) + buf.WriteString("\n") } } if logLvl <= hclog.Debug { var ( logMsg = "[Executor.ProcessBlock] finished." - logArgs = []interface{}{"txs count", len(block.Transactions), "txs", buf.String()} + logArgs = []interface{}{"txs count", len(block.Transactions)} ) + if buf.Len() > 0 { + logArgs = append(logArgs, "txs", buf.String()) + } + e.logger.Log(logLvl, logMsg, logArgs...) } From 85e5c0498897737e674b200d89c761d04a6becb6 Mon Sep 17 00:00:00 2001 From: Branislav Kojic Date: Wed, 13 Nov 2024 09:56:32 +0100 Subject: [PATCH 19/21] Change flag name --- .github/workflows/deploy-network.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy-network.yml b/.github/workflows/deploy-network.yml index 0e319af064..131b46dcae 100644 --- a/.github/workflows/deploy-network.yml +++ b/.github/workflows/deploy-network.yml @@ -196,7 +196,7 @@ jobs: sed 's/is_bridge_active: .*/is_bridge_active: ${{ inputs.is_bridge_active }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/is_london_fork_active: .*/is_london_fork_active: ${{ inputs.is_london_fork_active }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/gossip_msg_size: .*/gossip_msg_size: ${{ inputs.gossip_msg_size }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml - sed 's/gossip_batch_size: .*/gossip_batch_size: 10000/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml + sed 's/tx_gossip_batch_size: .*/tx_gossip_batch_size: 10000/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/json_rpc_batch_request_limit: .*/json_rpc_batch_request_limit: 0/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/log_level: .*/log_level: ${{ vars.LOG_LEVEL }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml sed 's/epoch_reward: .*/epoch_reward: 1000000000/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml From 7592cc8397ec9b4409bcb0ada9aad0f0bea2f868 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Wed, 13 Nov 2024 10:08:55 +0100 Subject: [PATCH 20/21] PR fix --- .github/workflows/nightly.yml | 4 ++-- command/server/config/config.go | 4 ++-- command/server/params.go | 4 ++-- command/server/server.go | 6 +++--- server/config.go | 2 +- server/server.go | 4 ++-- state/executor.go | 8 ++++---- txpool/txpool.go | 34 ++++++++++++++++----------------- types/rlp_encoding_test.go | 21 ++++++++++++++++++++ 9 files changed, 54 insertions(+), 33 deletions(-) diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 3efa3caeb5..6e69c45891 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -30,7 +30,7 @@ jobs: max_enqueued: "20000000" is_london_fork_active: true is_bridge_active: false - gossip_msg_size: "16777216" + gossip_msg_size: "8388608" notification: false secrets: AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }} @@ -168,7 +168,7 @@ jobs: max_enqueued: "20000000" is_london_fork_active: true is_bridge_active: false - gossip_msg_size: "16777216" + gossip_msg_size: "8388608" logs: true build_blade_output: ${{ needs.ci.outputs.build_blade }} lint_output: ${{ needs.ci.outputs.lint }} diff --git a/command/server/config/config.go b/command/server/config/config.go index e77d4979cc..77e1ec29d3 100644 --- a/command/server/config/config.go +++ b/command/server/config/config.go @@ -69,7 +69,7 @@ type TxPool struct { PriceLimit uint64 `json:"price_limit" yaml:"price_limit"` MaxSlots uint64 `json:"max_slots" yaml:"max_slots"` MaxAccountEnqueued uint64 `json:"max_account_enqueued" yaml:"max_account_enqueued"` - GossipBatchSize uint64 `json:"gossip_batch_size" yaml:"gossip_batch_size"` + TxGossipBatchSize uint64 `json:"tx_gossip_batch_size" yaml:"tx_gossip_batch_size"` } // Headers defines the HTTP response headers required to enable CORS. @@ -144,7 +144,7 @@ func DefaultConfig() *Config { PriceLimit: 0, MaxSlots: 4096, MaxAccountEnqueued: 128, - GossipBatchSize: 1, + TxGossipBatchSize: 1, }, LogLevel: "INFO", RestoreFile: "", diff --git a/command/server/params.go b/command/server/params.go index ce3da427e2..ced6dbb5c4 100644 --- a/command/server/params.go +++ b/command/server/params.go @@ -41,7 +41,7 @@ const ( tlsCertFileLocationFlag = "tls-cert-file" tlsKeyFileLocationFlag = "tls-key-file" gossipMessageSizeFlag = "gossip-msg-size" - gossipBatchSizeFlag = "gossip-batch-size" + txGossipBatchSizeFlag = "tx-gossip-batch-size" relayerFlag = "relayer" @@ -184,7 +184,7 @@ func (p *serverParams) generateConfig() *server.Config { PriceLimit: p.rawConfig.TxPool.PriceLimit, MaxSlots: p.rawConfig.TxPool.MaxSlots, MaxAccountEnqueued: p.rawConfig.TxPool.MaxAccountEnqueued, - GossipBatchSize: p.rawConfig.TxPool.GossipBatchSize, + TxGossipBatchSize: p.rawConfig.TxPool.TxGossipBatchSize, SecretsManager: p.secretsConfig, RestoreFile: p.getRestoreFilePath(), LogLevel: hclog.LevelFromString(p.rawConfig.LogLevel), diff --git a/command/server/server.go b/command/server/server.go index 8b9178f223..c3d5cd01bd 100644 --- a/command/server/server.go +++ b/command/server/server.go @@ -194,9 +194,9 @@ func setFlags(cmd *cobra.Command) { ) cmd.Flags().Uint64Var( - ¶ms.rawConfig.TxPool.GossipBatchSize, - gossipBatchSizeFlag, - defaultConfig.TxPool.GossipBatchSize, + ¶ms.rawConfig.TxPool.TxGossipBatchSize, + txGossipBatchSizeFlag, + defaultConfig.TxPool.TxGossipBatchSize, "maximum number of transactions in gossip message", ) diff --git a/server/config.go b/server/config.go index 769021c9c9..3d299fd979 100644 --- a/server/config.go +++ b/server/config.go @@ -25,7 +25,7 @@ type Config struct { PriceLimit uint64 MaxAccountEnqueued uint64 MaxSlots uint64 - GossipBatchSize uint64 + TxGossipBatchSize uint64 Telemetry *Telemetry Network *network.Config diff --git a/server/server.go b/server/server.go index e2b38e154b..8ac53cd4cf 100644 --- a/server/server.go +++ b/server/server.go @@ -384,7 +384,7 @@ func NewServer(config *Config) (*Server, error) { MaxSlots: m.config.MaxSlots, PriceLimit: m.config.PriceLimit, MaxAccountEnqueued: m.config.MaxAccountEnqueued, - GossipBatchSize: m.config.GossipBatchSize, + TxGossipBatchSize: m.config.TxGossipBatchSize, ChainID: big.NewInt(m.config.Chain.Params.ChainID), PeerID: m.network.AddrInfo().ID, }, @@ -394,7 +394,7 @@ func NewServer(config *Config) (*Server, error) { } m.txpool.SetSigner(signer) - m.executor.GetPoolTxHook = m.txpool.GetPendingTx + m.executor.GetPendingTxHook = m.txpool.GetPendingTx } { diff --git a/state/executor.go b/state/executor.go index fb2d85732b..2b643be4fb 100644 --- a/state/executor.go +++ b/state/executor.go @@ -44,9 +44,9 @@ type Executor struct { state State GetHash GetHashByNumberHelper - PostHook func(txn *Transition) - GenesisPostHook func(*Transition) error - GetPoolTxHook func(types.Hash) (*types.Transaction, bool) + PostHook func(txn *Transition) + GenesisPostHook func(*Transition) error + GetPendingTxHook func(types.Hash) (*types.Transaction, bool) IsL1OriginatedToken bool } @@ -178,7 +178,7 @@ func (e *Executor) ProcessBlock( } if t.From() == emptyFrom && t.Type() != types.StateTxType { - if poolTx, ok := e.GetPoolTxHook(t.Hash()); ok { + if poolTx, ok := e.GetPendingTxHook(t.Hash()); ok { t.SetFrom(poolTx.From()) } } diff --git a/txpool/txpool.go b/txpool/txpool.go index e2cfd4e5ae..ea69d8ddf0 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -101,7 +101,7 @@ type Config struct { PriceLimit uint64 MaxSlots uint64 MaxAccountEnqueued uint64 - GossipBatchSize uint64 + TxGossipBatchSize uint64 ChainID *big.Int PeerID peer.ID } @@ -193,7 +193,7 @@ type TxPool struct { localPeerID peer.ID // maximum number of transactions in gossip message - gossipBatchSize int + txGossipBatchSize int // channel for gossip batching gossipCh chan *types.Transaction @@ -214,23 +214,23 @@ func NewTxPool( config *Config, ) (*TxPool, error) { pool := &TxPool{ - logger: logger.Named("txpool"), - forks: forks, - store: store, - executables: newPricesQueue(0, nil), - accounts: accountsMap{maxEnqueuedLimit: config.MaxAccountEnqueued}, - index: lookupMap{all: make(map[types.Hash]*types.Transaction)}, - gauge: slotGauge{height: 0, max: config.MaxSlots}, - priceLimit: config.PriceLimit, - chainID: config.ChainID, - localPeerID: config.PeerID, - gossipBatchSize: int(config.GossipBatchSize), + logger: logger.Named("txpool"), + forks: forks, + store: store, + executables: newPricesQueue(0, nil), + accounts: accountsMap{maxEnqueuedLimit: config.MaxAccountEnqueued}, + index: lookupMap{all: make(map[types.Hash]*types.Transaction)}, + gauge: slotGauge{height: 0, max: config.MaxSlots}, + priceLimit: config.PriceLimit, + chainID: config.ChainID, + localPeerID: config.PeerID, + txGossipBatchSize: int(config.TxGossipBatchSize), // main loop channels promoteReqCh: make(chan promoteRequest), pruneCh: make(chan struct{}), shutdownCh: make(chan struct{}), - gossipCh: make(chan *types.Transaction, 4*batchersNum*config.GossipBatchSize), + gossipCh: make(chan *types.Transaction, 4*batchersNum*config.TxGossipBatchSize), } // Attach the event manager @@ -278,10 +278,10 @@ func (p *TxPool) stopGossipBatchers() { func (p *TxPool) gossipBatcher() { defer p.gossipWG.Done() - batch := make([]*types.Transaction, 0, p.gossipBatchSize) + batch := make([]*types.Transaction, 0, p.txGossipBatchSize) tickerPeriod := time.Hour * 24 // reduce empty looping when no batching - if p.gossipBatchSize > 1 { + if p.txGossipBatchSize > 1 { tickerPeriod = time.Millisecond * 500 } @@ -298,7 +298,7 @@ func (p *TxPool) gossipBatcher() { case tx, chOpen := <-p.gossipCh: if chOpen { batch = append(batch, tx) - if len(batch) >= p.gossipBatchSize { + if len(batch) >= p.txGossipBatchSize { p.publish(&batch) batch = batch[:0] } diff --git a/types/rlp_encoding_test.go b/types/rlp_encoding_test.go index f7e5f5ed52..b4bc61f39a 100644 --- a/types/rlp_encoding_test.go +++ b/types/rlp_encoding_test.go @@ -201,6 +201,7 @@ func TestRLPMarshall_And_Unmarshall_TypedTransaction(t *testing.T) { }), } + // Transaction for _, originalTx := range originalTxs { t.Run(originalTx.Type().String(), func(t *testing.T) { originalTx.ComputeHash() @@ -217,6 +218,26 @@ func TestRLPMarshall_And_Unmarshall_TypedTransaction(t *testing.T) { assert.Equal(t, originalTx.Hash(), unmarshalledTx.Hash()) }) } + + // Transactions + for _, originalTx := range originalTxs { + t.Run(originalTx.Type().String()+"s", func(t *testing.T) { + originalTx.ComputeHash() + + txs := Transactions([]*Transaction{originalTx}) + txsRLP := txs.MarshalRLPTo(nil) + + unmarshalledTxs := &Transactions{} + assert.NoError(t, unmarshalledTxs.UnmarshalRLP(txsRLP)) + + for _, unmarshalledTx := range *unmarshalledTxs { + unmarshalledTx.ComputeHash() + + assert.Equal(t, originalTx.Type(), unmarshalledTx.Type()) + assert.Equal(t, originalTx.Hash(), unmarshalledTx.Hash()) + } + }) + } } func TestRLPMarshall_Unmarshall_Missing_Data(t *testing.T) { From ace5f9445d1b4116668fdbe89a560fa2afad8453 Mon Sep 17 00:00:00 2001 From: Oliver Bundalo Date: Wed, 13 Nov 2024 10:16:31 +0100 Subject: [PATCH 21/21] deployment -> main branch --- .github/workflows/deploy-network.yml | 2 +- .github/workflows/destroy-network.yml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/deploy-network.yml b/.github/workflows/deploy-network.yml index 131b46dcae..f528c9e813 100644 --- a/.github/workflows/deploy-network.yml +++ b/.github/workflows/deploy-network.yml @@ -148,7 +148,7 @@ jobs: uses: actions/checkout@v4.1.1 with: repository: Ethernal-Tech/blade-deployment - ref: gossip-batch-size + ref: main - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v4.0.1 with: diff --git a/.github/workflows/destroy-network.yml b/.github/workflows/destroy-network.yml index 81349d619f..5ee456b903 100644 --- a/.github/workflows/destroy-network.yml +++ b/.github/workflows/destroy-network.yml @@ -63,7 +63,7 @@ jobs: uses: actions/checkout@v4.1.1 with: repository: Ethernal-Tech/blade-deployment - ref: gossip-batch-size + ref: main - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v4.0.1 with: @@ -127,7 +127,7 @@ jobs: uses: actions/checkout@v4.1.1 with: repository: Ethernal-Tech/blade-deployment - ref: gossip-batch-size + ref: main - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v4.0.1 with: