Skip to content

Commit

Permalink
Txpool gossip batch #428
Browse files Browse the repository at this point in the history
Txpool gossip batch
  • Loading branch information
oliverbundalo authored Nov 13, 2024
2 parents 5a7e449 + 853c6a9 commit 7ddf17c
Show file tree
Hide file tree
Showing 18 changed files with 252 additions and 96 deletions.
1 change: 1 addition & 0 deletions .github/workflows/deploy-network.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ jobs:
sed 's/is_bridge_active: .*/is_bridge_active: ${{ inputs.is_bridge_active }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml
sed 's/is_london_fork_active: .*/is_london_fork_active: ${{ inputs.is_london_fork_active }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml
sed 's/gossip_msg_size: .*/gossip_msg_size: ${{ inputs.gossip_msg_size }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml
sed 's/tx_gossip_batch_size: .*/tx_gossip_batch_size: 10000/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml
sed 's/json_rpc_batch_request_limit: .*/json_rpc_batch_request_limit: 0/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml
sed 's/log_level: .*/log_level: ${{ vars.LOG_LEVEL }}/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml
sed 's/epoch_reward: .*/epoch_reward: 1000000000/g' group_vars/all.yml > group_vars/all.yml.tmp && mv group_vars/all.yml.tmp group_vars/all.yml
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
max_enqueued: "20000000"
is_london_fork_active: true
is_bridge_active: false
gossip_msg_size: "33554432"
gossip_msg_size: "8388608"
notification: false
secrets:
AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }}
Expand Down Expand Up @@ -168,7 +168,7 @@ jobs:
max_enqueued: "20000000"
is_london_fork_active: true
is_bridge_active: false
gossip_msg_size: "33554432"
gossip_msg_size: "8388608"
logs: true
build_blade_output: ${{ needs.ci.outputs.build_blade }}
lint_output: ${{ needs.ci.outputs.lint }}
Expand Down
2 changes: 1 addition & 1 deletion blockchain/storagev2/leveldb/leveldb_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Benchmark(b *testing.B) {
}()

blockCount := 1000
storagev2.BenchmarkStorage(b, blockCount, s, 26, 15) // CI times
storagev2.BenchmarkStorage(b, blockCount, s, 27, 15) // CI times

size, err := dbSize(path)
require.NoError(b, err)
Expand Down
2 changes: 2 additions & 0 deletions command/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type TxPool struct {
PriceLimit uint64 `json:"price_limit" yaml:"price_limit"`
MaxSlots uint64 `json:"max_slots" yaml:"max_slots"`
MaxAccountEnqueued uint64 `json:"max_account_enqueued" yaml:"max_account_enqueued"`
TxGossipBatchSize uint64 `json:"tx_gossip_batch_size" yaml:"tx_gossip_batch_size"`
}

// Headers defines the HTTP response headers required to enable CORS.
Expand Down Expand Up @@ -143,6 +144,7 @@ func DefaultConfig() *Config {
PriceLimit: 0,
MaxSlots: 4096,
MaxAccountEnqueued: 128,
TxGossipBatchSize: 1,
},
LogLevel: "INFO",
RestoreFile: "",
Expand Down
2 changes: 2 additions & 0 deletions command/server/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
tlsCertFileLocationFlag = "tls-cert-file"
tlsKeyFileLocationFlag = "tls-key-file"
gossipMessageSizeFlag = "gossip-msg-size"
txGossipBatchSizeFlag = "tx-gossip-batch-size"

relayerFlag = "relayer"

Expand Down Expand Up @@ -183,6 +184,7 @@ func (p *serverParams) generateConfig() *server.Config {
PriceLimit: p.rawConfig.TxPool.PriceLimit,
MaxSlots: p.rawConfig.TxPool.MaxSlots,
MaxAccountEnqueued: p.rawConfig.TxPool.MaxAccountEnqueued,
TxGossipBatchSize: p.rawConfig.TxPool.TxGossipBatchSize,
SecretsManager: p.secretsConfig,
RestoreFile: p.getRestoreFilePath(),
LogLevel: hclog.LevelFromString(p.rawConfig.LogLevel),
Expand Down
7 changes: 7 additions & 0 deletions command/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ func setFlags(cmd *cobra.Command) {
"maximum number of enqueued transactions per account",
)

cmd.Flags().Uint64Var(
&params.rawConfig.TxPool.TxGossipBatchSize,
txGossipBatchSizeFlag,
defaultConfig.TxPool.TxGossipBatchSize,
"maximum number of transactions in gossip message",
)

cmd.Flags().StringArrayVar(
&params.rawConfig.CorsAllowedOrigins,
corsOriginFlag,
Expand Down
31 changes: 14 additions & 17 deletions consensus/polybft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,35 +204,32 @@ func (f *fsm) BuildProposal(currentRound uint64) ([]byte, error) {
return nil, err
}

if f.logger.GetLevel() <= hclog.Debug {
logLvl := f.logger.GetLevel()
if logLvl <= hclog.Debug {
checkpointHash, err := extra.Checkpoint.Hash(f.backend.GetChainID(), f.Height(), stateBlock.Block.Hash())
if err != nil {
return nil, fmt.Errorf("failed to calculate proposal hash: %w", err)
}

var buf bytes.Buffer

for i, tx := range stateBlock.Block.Transactions {
if f.logger.IsDebug() {
buf.WriteString(tx.Hash().String())
} else if f.logger.IsTrace() {
buf.WriteString(tx.String())
}

if i != len(stateBlock.Block.Transactions)-1 {
buf.WriteString("\n")
}
}

f.logger.Debug("[FSM.BuildProposal]",
"block num", stateBlock.Block.Number(),
"round", currentRound,
"state root", stateBlock.Block.Header.StateRoot,
"proposal hash", checkpointHash.String(),
"txs count", len(stateBlock.Block.Transactions),
"txs", buf.String(),
"finsihedIn", time.Since(start),
"finishedIn", time.Since(start),
)

if logLvl < hclog.Debug {
var buf bytes.Buffer

for _, tx := range stateBlock.Block.Transactions {
buf.WriteString(tx.String())
buf.WriteString("\n")
}

f.logger.Log(logLvl, "[FSM.BuildProposal]", "txs", buf.String())
}
}

f.target = stateBlock
Expand Down
24 changes: 14 additions & 10 deletions loadtest/runner/base_load_test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,26 +516,30 @@ func (r *BaseLoadTestRunner) calculateResults(blockInfos map[uint64]*BlockInfo,

for block := range uniqueBlocks {
currentBlockTxsNum := 0
parentBlockNum := block - 1
nextBlockNum := block + 1

if _, exists := blockTimeMap[parentBlockNum]; !exists {
if parentBlockInfo, exists := blockInfos[parentBlockNum]; !exists {
parentBlock, err := r.client.GetBlockByNumber(jsonrpc.BlockNumber(parentBlockNum), false)
if _, exists := blockTimeMap[nextBlockNum]; !exists {
if nextBlockInfo, exists := blockInfos[nextBlockNum]; !exists {
nextBlock, err := r.client.GetBlockByNumber(jsonrpc.BlockNumber(nextBlockNum), false)
if err != nil {
return err
}

blockTimeMap[parentBlockNum] = parentBlock.Header.Timestamp
if nextBlock == nil {
return fmt.Errorf("next block %d not mined yet, increase #txs in test", nextBlockNum)
}

blockTimeMap[nextBlockNum] = nextBlock.Header.Timestamp
} else {
blockTimeMap[parentBlockNum] = parentBlockInfo.CreatedAt
blockTimeMap[nextBlockNum] = nextBlockInfo.CreatedAt
}
}

parentBlockTimestamp := blockTimeMap[parentBlockNum]
nextBlockTimestamp := blockTimeMap[nextBlockNum]

if _, ok := blockTimeMap[block]; !ok {
if currentBlockInfo, ok := blockInfos[block]; !ok {
currentBlock, err := r.client.GetBlockByNumber(jsonrpc.BlockNumber(parentBlockNum), true)
currentBlock, err := r.client.GetBlockByNumber(jsonrpc.BlockNumber(block), true)
if err != nil {
return err
}
Expand All @@ -553,7 +557,7 @@ func (r *BaseLoadTestRunner) calculateResults(blockInfos map[uint64]*BlockInfo,
}

currentBlockTimestamp := blockTimeMap[block]
blockTime := math.Abs(float64(currentBlockTimestamp - parentBlockTimestamp))
blockTime := math.Abs(float64(nextBlockTimestamp - currentBlockTimestamp))

currentBlockTxsPerSecond := float64(currentBlockTxsNum) / blockTime

Expand All @@ -579,7 +583,7 @@ func (r *BaseLoadTestRunner) calculateResults(blockInfos map[uint64]*BlockInfo,
}

for _, info := range blockInfos {
info.BlockTime = math.Abs(float64(info.CreatedAt - blockTimeMap[info.Number-1]))
info.BlockTime = math.Abs(float64(blockTimeMap[info.Number+1] - info.CreatedAt))
info.TPS = float64(info.NumTxs) / info.BlockTime
}

Expand Down
2 changes: 1 addition & 1 deletion network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestSimpleGossip(t *testing.T) {
err := WaitForSubscribers(ctx, publisher, topicName, len(servers)-1)
require.NoError(t, err, "Unable to wait for subscribers")

time.Sleep(300 * time.Millisecond)
time.Sleep(500 * time.Millisecond)

err = publisherTopic.Publish(
&testproto.GenericMessage{
Expand Down
1 change: 1 addition & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Config struct {
PriceLimit uint64
MaxAccountEnqueued uint64
MaxSlots uint64
TxGossipBatchSize uint64

Telemetry *Telemetry
Network *network.Config
Expand Down
12 changes: 7 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ func NewServer(config *Config) (*Server, error) {
MaxSlots: m.config.MaxSlots,
PriceLimit: m.config.PriceLimit,
MaxAccountEnqueued: m.config.MaxAccountEnqueued,
TxGossipBatchSize: m.config.TxGossipBatchSize,
ChainID: big.NewInt(m.config.Chain.Params.ChainID),
PeerID: m.network.AddrInfo().ID,
},
Expand All @@ -393,6 +394,7 @@ func NewServer(config *Config) (*Server, error) {
}

m.txpool.SetSigner(signer)
m.executor.GetPendingTxHook = m.txpool.GetPendingTx
}

{
Expand Down Expand Up @@ -1145,11 +1147,6 @@ func (s *Server) Close() {
s.logger.Error("failed to close blockchain", "err", err.Error())
}

// Close the networking layer
if err := s.network.Close(); err != nil {
s.logger.Error("failed to close networking", "err", err.Error())
}

// Close the consensus layer
if err := s.consensus.Close(); err != nil {
s.logger.Error("failed to close consensus", "err", err.Error())
Expand All @@ -1169,6 +1166,11 @@ func (s *Server) Close() {
// Close the txpool's main loop
s.txpool.Close()

// Close the networking layer
if err := s.network.Close(); err != nil {
s.logger.Error("failed to close networking", "err", err.Error())
}

// Close DataDog profiler
s.closeDataDogProfiler()

Expand Down
34 changes: 18 additions & 16 deletions state/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ type Executor struct {
state State
GetHash GetHashByNumberHelper

PostHook func(txn *Transition)
GenesisPostHook func(*Transition) error
PostHook func(txn *Transition)
GenesisPostHook func(*Transition) error
GetPendingTxHook func(types.Hash) (*types.Transaction, bool)

IsL1OriginatedToken bool
}
Expand Down Expand Up @@ -171,38 +172,39 @@ func (e *Executor) ProcessBlock(
logLvl = e.logger.GetLevel()
)

for i, t := range block.Transactions {
for _, t := range block.Transactions {
if t.Gas() > block.Header.GasLimit {
return nil, runtime.ErrOutOfGas
}

if t.From() == emptyFrom && t.Type() != types.StateTxType {
if poolTx, ok := e.GetPendingTxHook(t.Hash()); ok {
t.SetFrom(poolTx.From())
}
}

if err = txn.Write(t); err != nil {
e.logger.Error("failed to write transaction to the block", "tx", t, "err", err)

return nil, err
}

if logLvl <= hclog.Debug {
if e.logger.IsTrace() {
_, _ = buf.WriteString(t.String())
}

if e.logger.IsDebug() {
_, _ = buf.WriteString(t.Hash().String())
}

if i != len(block.Transactions)-1 {
_, _ = buf.WriteString("\n")
}
if logLvl < hclog.Debug {
buf.WriteString(t.String())
buf.WriteString("\n")
}
}

if logLvl <= hclog.Debug {
var (
logMsg = "[Executor.ProcessBlock] finished."
logArgs = []interface{}{"txs count", len(block.Transactions), "txs", buf.String()}
logArgs = []interface{}{"txs count", len(block.Transactions)}
)

if buf.Len() > 0 {
logArgs = append(logArgs, "txs", buf.String())
}

e.logger.Log(logLvl, logMsg, logArgs...)
}

Expand Down
Loading

0 comments on commit 7ddf17c

Please sign in to comment.