From abd4bcb3132639f0e309db77a2e67f2251dcd986 Mon Sep 17 00:00:00 2001 From: libotony Date: Wed, 22 May 2024 17:28:01 +0800 Subject: [PATCH] tweak metrics in txpool and node (#744) * tweak metrics in txpool and node * move metrics in bft to seperate file * identify block proposed/received for metrics processed tx/gas * promote bft rejected error && pr comments --- bft/engine.go | 10 ++---- bft/metrics.go | 14 ++++++++ cmd/thor/node/metrics.go | 69 ++++-------------------------------- cmd/thor/node/node.go | 50 ++++++++++++++------------ cmd/thor/node/packer_loop.go | 21 +++++++---- txpool/tx_pool.go | 1 + 6 files changed, 64 insertions(+), 101 deletions(-) create mode 100644 bft/metrics.go diff --git a/bft/engine.go b/bft/engine.go index fe13b09cf..2e3f6a73f 100644 --- a/bft/engine.go +++ b/bft/engine.go @@ -14,7 +14,6 @@ import ( "github.com/vechain/thor/v2/cache" "github.com/vechain/thor/v2/chain" "github.com/vechain/thor/v2/kv" - "github.com/vechain/thor/v2/metrics" "github.com/vechain/thor/v2/muxdb" "github.com/vechain/thor/v2/state" "github.com/vechain/thor/v2/thor" @@ -24,11 +23,7 @@ import ( const dataStoreName = "bft.engine" -var ( - finalizedKey = []byte("finalized") - - metricBlocksCommitted = metrics.LazyLoadCounterVec("block_bft_committed_count", []string{"status"}) -) +var finalizedKey = []byte("finalized") type Finalizer interface { Finalized() thor.Bytes32 @@ -137,7 +132,7 @@ func (engine *BFTEngine) CommitBlock(header *block.Header, isPacking bool) error return err } engine.finalized.Store(id) - metricBlocksCommitted().AddWithLabel(1, map[string]string{"status": "finalized"}) + metricBlocksCommitted().Add(1) } } @@ -153,7 +148,6 @@ func (engine *BFTEngine) CommitBlock(header *block.Header, isPacking bool) error return err } engine.casts.Mark(checkpoint, state.Quality) - metricBlocksCommitted().AddWithLabel(1, map[string]string{"status": "proposed"}) } return nil diff --git a/bft/metrics.go b/bft/metrics.go new file mode 100644 index 000000000..600b305ab --- /dev/null +++ b/bft/metrics.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +package bft + +import ( + "github.com/vechain/thor/v2/metrics" +) + +var ( + metricBlocksCommitted = metrics.LazyLoadCounter("bft_committed_count") +) diff --git a/cmd/thor/node/metrics.go b/cmd/thor/node/metrics.go index ecac17120..c56717d78 100644 --- a/cmd/thor/node/metrics.go +++ b/cmd/thor/node/metrics.go @@ -6,71 +6,14 @@ package node import ( - "time" - "github.com/vechain/thor/v2/metrics" ) var ( - metricBlockProposedCount = metrics.LazyLoadCounterVec("block_proposed_count", []string{"status"}) - metricBlockProposedTxs = metrics.LazyLoadCounterVec("block_proposed_tx_count", []string{"status"}) - metricBlockProposedUsedGas = metrics.LazyLoadCounterVec("block_proposed_used_gas_count", []string{"status"}) - metricBlockProposedDuration = metrics.LazyLoadHistogramVec( - "block_proposed_duration_ms", []string{"status"}, metrics.Bucket10s, - ) - - metricBlockReceivedCount = metrics.LazyLoadCounterVec("block_received_count", []string{"status"}) - metricBlockReceivedProcessedTxs = metrics.LazyLoadCounterVec("block_received_processed_tx_count", []string{"status"}) - metricBlockReceivedUsedGas = metrics.LazyLoadCounterVec("block_received_used_gas_count", []string{"status"}) - metricBlockReceivedDuration = metrics.LazyLoadHistogramVec( - "block_received_duration_ms", []string{"status"}, metrics.Bucket10s, - ) - - metricChainForkCount = metrics.LazyLoadCounter("chain_fork_count") - metricChainForkSize = metrics.LazyLoadGauge("chain_fork_size") + metricBlockProcessedCount = metrics.LazyLoadCounterVec("block_processed_count", []string{"type", "success"}) + metricBlockProcessedTxs = metrics.LazyLoadCounterVec("block_processed_tx_count", []string{"type"}) + metricBlockProcessedGas = metrics.LazyLoadCounterVec("block_processed_gas_count", []string{"type"}) + metricBlockProcessedDuration = metrics.LazyLoadHistogram("block_processed_duration_ms", metrics.Bucket10s) + metricChainForkCount = metrics.LazyLoadCounter("chain_fork_count") + metricChainForkSize = metrics.LazyLoadGauge("chain_fork_size") ) - -// evalBlockProposeMetrics captures received block process metrics -func evalBlockReceivedMetrics(f func() error) error { - return evalBlockMetrics( - "received", - metricBlockReceivedCount(), - metricBlockReceivedDuration(), - f, - ) -} - -// evalBlockProposeMetrics captures proposing block process metrics -func evalBlockProposeMetrics(f func() error) error { - return evalBlockMetrics( - "proposed", - metricBlockProposedCount(), - metricBlockProposedDuration(), - f, - ) -} - -func evalBlockMetrics( - metricType string, - metricCounter metrics.CountVecMeter, - metricDuration metrics.HistogramVecMeter, - f func() error, -) error { - startTime := time.Now() - - if err := f(); err != nil { - status := map[string]string{ - "status": "failed", - } - metricCounter.AddWithLabel(1, status) - metricDuration.ObserveWithLabels(time.Since(startTime).Milliseconds(), status) - return err - } - - status := map[string]string{ - "status": metricType, - } - metricCounter.AddWithLabel(1, status) - metricDuration.ObserveWithLabels(time.Since(startTime).Milliseconds(), status) - return nil -} diff --git a/cmd/thor/node/node.go b/cmd/thor/node/node.go index 990600a70..001e4190d 100644 --- a/cmd/thor/node/node.go +++ b/cmd/thor/node/node.go @@ -273,32 +273,30 @@ func (n *Node) txStashLoop(ctx context.Context) { } // guardBlockProcessing adds lock on block processing and maintains block conflicts. -func (n *Node) guardBlockProcessing(blockNum uint32, process func(conflicts uint32) error) func() error { - return func() error { - n.processLock.Lock() - defer n.processLock.Unlock() - - if blockNum > n.maxBlockNum { - if blockNum > n.maxBlockNum+1 { - // the block is surely unprocessable now - return errBlockTemporaryUnprocessable - } - n.maxBlockNum = blockNum - return process(0) +func (n *Node) guardBlockProcessing(blockNum uint32, process func(conflicts uint32) error) error { + n.processLock.Lock() + defer n.processLock.Unlock() + + if blockNum > n.maxBlockNum { + if blockNum > n.maxBlockNum+1 { + // the block is surely unprocessable now + return errBlockTemporaryUnprocessable } + n.maxBlockNum = blockNum + return process(0) + } - conflicts, err := n.repo.ScanConflicts(blockNum) - if err != nil { - return err - } - return process(conflicts) + conflicts, err := n.repo.ScanConflicts(blockNum) + if err != nil { + return err } + return process(conflicts) } func (n *Node) processBlock(newBlock *block.Block, stats *blockStats) (bool, error) { var isTrunk *bool - if err := evalBlockReceivedMetrics(n.guardBlockProcessing(newBlock.Header().Number(), func(conflicts uint32) error { + if err := n.guardBlockProcessing(newBlock.Header().Number(), func(conflicts uint32) error { // Check whether the block was already there. // It can be skipped if no conflicts. if conflicts > 0 { @@ -396,26 +394,32 @@ func (n *Node) processBlock(newBlock *block.Block, stats *blockStats) (bool, err if v, updated := n.bandwidth.Update(newBlock.Header(), time.Duration(realElapsed)); updated { log.Debug("bandwidth updated", "gps", v) } - - metricBlockReceivedProcessedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"status": "receivedBlock"}) - metricBlockReceivedUsedGas().AddWithLabel(int64(newBlock.Header().GasUsed()), map[string]string{"status": "receivedBlock"}) stats.UpdateProcessed(1, len(receipts), execElapsed, commitElapsed, realElapsed, newBlock.Header().GasUsed()) + + metricBlockProcessedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"type": "received"}) + metricBlockProcessedGas().AddWithLabel(int64(newBlock.Header().GasUsed()), map[string]string{"type": "received"}) + metricBlockProcessedDuration().Observe(time.Duration(realElapsed).Milliseconds()) return nil - })); err != nil { + }); err != nil { switch { - case err == errKnownBlock || err == errBFTRejected: + case err == errKnownBlock: stats.UpdateIgnored(1) return false, nil case consensus.IsFutureBlock(err) || err == errParentMissing || err == errBlockTemporaryUnprocessable: stats.UpdateQueued(1) + case err == errBFTRejected: + // TODO: capture metrics + log.Debug(fmt.Sprintf("block rejected by BFT engine\n%v", newBlock.Header())) case consensus.IsCritical(err): msg := fmt.Sprintf(`failed to process block due to consensus failure \n%v\n`, newBlock.Header()) log.Error(msg, "err", err) default: log.Error("failed to process block", "err", err) } + metricBlockProcessedCount().AddWithLabel(1, map[string]string{"type": "proposed", "success": "false"}) return false, err } + metricBlockProcessedCount().AddWithLabel(1, map[string]string{"type": "proposed", "success": "true"}) return *isTrunk, nil } diff --git a/cmd/thor/node/packer_loop.go b/cmd/thor/node/packer_loop.go index 0cafdf5e1..aa43252af 100644 --- a/cmd/thor/node/packer_loop.go +++ b/cmd/thor/node/packer_loop.go @@ -106,16 +106,21 @@ func (n *Node) packerLoop(ctx context.Context) { } } -func (n *Node) pack(flow *packer.Flow) error { +func (n *Node) pack(flow *packer.Flow) (err error) { txs := n.txPool.Executables() var txsToRemove []*tx.Transaction defer func() { - for _, tx := range txsToRemove { - n.txPool.Remove(tx.Hash(), tx.ID()) + if err == nil { + for _, tx := range txsToRemove { + n.txPool.Remove(tx.Hash(), tx.ID()) + } + metricBlockProcessedCount().AddWithLabel(1, map[string]string{"type": "proposed", "success": "true"}) + } else { + metricBlockProcessedCount().AddWithLabel(1, map[string]string{"type": "proposed", "success": "false"}) } }() - return evalBlockProposeMetrics(n.guardBlockProcessing(flow.Number(), func(conflicts uint32) error { + return n.guardBlockProcessing(flow.Number(), func(conflicts uint32) error { var ( startTime = mclock.Now() logEnabled = !n.skipLogs && !n.logDBFailed @@ -191,8 +196,6 @@ func (n *Node) pack(flow *packer.Flow) error { n.processFork(newBlock, oldBest.Header.ID()) commitElapsed := mclock.Now() - startTime - execElapsed - metricBlockProposedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"status": "proposedBlock"}) - metricBlockProposedUsedGas().AddWithLabel(int64(newBlock.Header().GasUsed()), map[string]string{"status": "proposedBlock"}) n.comm.BroadcastBlock(newBlock) log.Info("📦 new block packed", "txs", len(receipts), @@ -204,6 +207,10 @@ func (n *Node) pack(flow *packer.Flow) error { if v, updated := n.bandwidth.Update(newBlock.Header(), time.Duration(realElapsed)); updated { log.Debug("bandwidth updated", "gps", v) } + + metricBlockProcessedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"type": "proposed"}) + metricBlockProcessedGas().AddWithLabel(int64(newBlock.Header().GasUsed()), map[string]string{"type": "proposed"}) + metricBlockProcessedDuration().Observe(time.Duration(realElapsed).Milliseconds()) return nil - })) + }) } diff --git a/txpool/tx_pool.go b/txpool/tx_pool.go index 9e0d37ff7..32889d13a 100644 --- a/txpool/tx_pool.go +++ b/txpool/tx_pool.go @@ -132,6 +132,7 @@ func (p *TxPool) housekeeping() { p.executables.Store(executables) } + metricTxPoolGauge().GaugeWithLabel(0-int64(removed), map[string]string{"source": "washed", "total": "true"}) log.Debug("wash done", ctx...) } }