Skip to content

Commit

Permalink
tweak metrics in txpool and node (#744)
Browse files Browse the repository at this point in the history
* tweak metrics in txpool and node

* move metrics in bft to seperate file

* identify block proposed/received for metrics processed tx/gas

* promote bft rejected error && pr comments
  • Loading branch information
libotony authored May 22, 2024
1 parent 605738d commit abd4bcb
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 101 deletions.
10 changes: 2 additions & 8 deletions bft/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/vechain/thor/v2/cache"
"github.com/vechain/thor/v2/chain"
"github.com/vechain/thor/v2/kv"
"github.com/vechain/thor/v2/metrics"
"github.com/vechain/thor/v2/muxdb"
"github.com/vechain/thor/v2/state"
"github.com/vechain/thor/v2/thor"
Expand All @@ -24,11 +23,7 @@ import (

const dataStoreName = "bft.engine"

var (
finalizedKey = []byte("finalized")

metricBlocksCommitted = metrics.LazyLoadCounterVec("block_bft_committed_count", []string{"status"})
)
var finalizedKey = []byte("finalized")

type Finalizer interface {
Finalized() thor.Bytes32
Expand Down Expand Up @@ -137,7 +132,7 @@ func (engine *BFTEngine) CommitBlock(header *block.Header, isPacking bool) error
return err
}
engine.finalized.Store(id)
metricBlocksCommitted().AddWithLabel(1, map[string]string{"status": "finalized"})
metricBlocksCommitted().Add(1)
}
}

Expand All @@ -153,7 +148,6 @@ func (engine *BFTEngine) CommitBlock(header *block.Header, isPacking bool) error
return err
}
engine.casts.Mark(checkpoint, state.Quality)
metricBlocksCommitted().AddWithLabel(1, map[string]string{"status": "proposed"})
}

return nil
Expand Down
14 changes: 14 additions & 0 deletions bft/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The VeChainThor developers

// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying
// file LICENSE or <https://www.gnu.org/licenses/lgpl-3.0.html>

package bft

import (
"github.com/vechain/thor/v2/metrics"
)

var (
metricBlocksCommitted = metrics.LazyLoadCounter("bft_committed_count")
)
69 changes: 6 additions & 63 deletions cmd/thor/node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,71 +6,14 @@
package node

import (
"time"

"github.com/vechain/thor/v2/metrics"
)

var (
metricBlockProposedCount = metrics.LazyLoadCounterVec("block_proposed_count", []string{"status"})
metricBlockProposedTxs = metrics.LazyLoadCounterVec("block_proposed_tx_count", []string{"status"})
metricBlockProposedUsedGas = metrics.LazyLoadCounterVec("block_proposed_used_gas_count", []string{"status"})
metricBlockProposedDuration = metrics.LazyLoadHistogramVec(
"block_proposed_duration_ms", []string{"status"}, metrics.Bucket10s,
)

metricBlockReceivedCount = metrics.LazyLoadCounterVec("block_received_count", []string{"status"})
metricBlockReceivedProcessedTxs = metrics.LazyLoadCounterVec("block_received_processed_tx_count", []string{"status"})
metricBlockReceivedUsedGas = metrics.LazyLoadCounterVec("block_received_used_gas_count", []string{"status"})
metricBlockReceivedDuration = metrics.LazyLoadHistogramVec(
"block_received_duration_ms", []string{"status"}, metrics.Bucket10s,
)

metricChainForkCount = metrics.LazyLoadCounter("chain_fork_count")
metricChainForkSize = metrics.LazyLoadGauge("chain_fork_size")
metricBlockProcessedCount = metrics.LazyLoadCounterVec("block_processed_count", []string{"type", "success"})
metricBlockProcessedTxs = metrics.LazyLoadCounterVec("block_processed_tx_count", []string{"type"})
metricBlockProcessedGas = metrics.LazyLoadCounterVec("block_processed_gas_count", []string{"type"})
metricBlockProcessedDuration = metrics.LazyLoadHistogram("block_processed_duration_ms", metrics.Bucket10s)
metricChainForkCount = metrics.LazyLoadCounter("chain_fork_count")
metricChainForkSize = metrics.LazyLoadGauge("chain_fork_size")
)

// evalBlockProposeMetrics captures received block process metrics
func evalBlockReceivedMetrics(f func() error) error {
return evalBlockMetrics(
"received",
metricBlockReceivedCount(),
metricBlockReceivedDuration(),
f,
)
}

// evalBlockProposeMetrics captures proposing block process metrics
func evalBlockProposeMetrics(f func() error) error {
return evalBlockMetrics(
"proposed",
metricBlockProposedCount(),
metricBlockProposedDuration(),
f,
)
}

func evalBlockMetrics(
metricType string,
metricCounter metrics.CountVecMeter,
metricDuration metrics.HistogramVecMeter,
f func() error,
) error {
startTime := time.Now()

if err := f(); err != nil {
status := map[string]string{
"status": "failed",
}
metricCounter.AddWithLabel(1, status)
metricDuration.ObserveWithLabels(time.Since(startTime).Milliseconds(), status)
return err
}

status := map[string]string{
"status": metricType,
}
metricCounter.AddWithLabel(1, status)
metricDuration.ObserveWithLabels(time.Since(startTime).Milliseconds(), status)
return nil
}
50 changes: 27 additions & 23 deletions cmd/thor/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,32 +273,30 @@ func (n *Node) txStashLoop(ctx context.Context) {
}

// guardBlockProcessing adds lock on block processing and maintains block conflicts.
func (n *Node) guardBlockProcessing(blockNum uint32, process func(conflicts uint32) error) func() error {
return func() error {
n.processLock.Lock()
defer n.processLock.Unlock()

if blockNum > n.maxBlockNum {
if blockNum > n.maxBlockNum+1 {
// the block is surely unprocessable now
return errBlockTemporaryUnprocessable
}
n.maxBlockNum = blockNum
return process(0)
func (n *Node) guardBlockProcessing(blockNum uint32, process func(conflicts uint32) error) error {
n.processLock.Lock()
defer n.processLock.Unlock()

if blockNum > n.maxBlockNum {
if blockNum > n.maxBlockNum+1 {
// the block is surely unprocessable now
return errBlockTemporaryUnprocessable
}
n.maxBlockNum = blockNum
return process(0)
}

conflicts, err := n.repo.ScanConflicts(blockNum)
if err != nil {
return err
}
return process(conflicts)
conflicts, err := n.repo.ScanConflicts(blockNum)
if err != nil {
return err
}
return process(conflicts)
}

func (n *Node) processBlock(newBlock *block.Block, stats *blockStats) (bool, error) {
var isTrunk *bool

if err := evalBlockReceivedMetrics(n.guardBlockProcessing(newBlock.Header().Number(), func(conflicts uint32) error {
if err := n.guardBlockProcessing(newBlock.Header().Number(), func(conflicts uint32) error {
// Check whether the block was already there.
// It can be skipped if no conflicts.
if conflicts > 0 {
Expand Down Expand Up @@ -396,26 +394,32 @@ func (n *Node) processBlock(newBlock *block.Block, stats *blockStats) (bool, err
if v, updated := n.bandwidth.Update(newBlock.Header(), time.Duration(realElapsed)); updated {
log.Debug("bandwidth updated", "gps", v)
}

metricBlockReceivedProcessedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"status": "receivedBlock"})
metricBlockReceivedUsedGas().AddWithLabel(int64(newBlock.Header().GasUsed()), map[string]string{"status": "receivedBlock"})
stats.UpdateProcessed(1, len(receipts), execElapsed, commitElapsed, realElapsed, newBlock.Header().GasUsed())

metricBlockProcessedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"type": "received"})
metricBlockProcessedGas().AddWithLabel(int64(newBlock.Header().GasUsed()), map[string]string{"type": "received"})
metricBlockProcessedDuration().Observe(time.Duration(realElapsed).Milliseconds())
return nil
})); err != nil {
}); err != nil {
switch {
case err == errKnownBlock || err == errBFTRejected:
case err == errKnownBlock:
stats.UpdateIgnored(1)
return false, nil
case consensus.IsFutureBlock(err) || err == errParentMissing || err == errBlockTemporaryUnprocessable:
stats.UpdateQueued(1)
case err == errBFTRejected:
// TODO: capture metrics
log.Debug(fmt.Sprintf("block rejected by BFT engine\n%v", newBlock.Header()))
case consensus.IsCritical(err):
msg := fmt.Sprintf(`failed to process block due to consensus failure \n%v\n`, newBlock.Header())
log.Error(msg, "err", err)
default:
log.Error("failed to process block", "err", err)
}
metricBlockProcessedCount().AddWithLabel(1, map[string]string{"type": "proposed", "success": "false"})
return false, err
}
metricBlockProcessedCount().AddWithLabel(1, map[string]string{"type": "proposed", "success": "true"})
return *isTrunk, nil
}

Expand Down
21 changes: 14 additions & 7 deletions cmd/thor/node/packer_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,21 @@ func (n *Node) packerLoop(ctx context.Context) {
}
}

func (n *Node) pack(flow *packer.Flow) error {
func (n *Node) pack(flow *packer.Flow) (err error) {
txs := n.txPool.Executables()
var txsToRemove []*tx.Transaction
defer func() {
for _, tx := range txsToRemove {
n.txPool.Remove(tx.Hash(), tx.ID())
if err == nil {
for _, tx := range txsToRemove {
n.txPool.Remove(tx.Hash(), tx.ID())
}
metricBlockProcessedCount().AddWithLabel(1, map[string]string{"type": "proposed", "success": "true"})
} else {
metricBlockProcessedCount().AddWithLabel(1, map[string]string{"type": "proposed", "success": "false"})
}
}()

return evalBlockProposeMetrics(n.guardBlockProcessing(flow.Number(), func(conflicts uint32) error {
return n.guardBlockProcessing(flow.Number(), func(conflicts uint32) error {
var (
startTime = mclock.Now()
logEnabled = !n.skipLogs && !n.logDBFailed
Expand Down Expand Up @@ -191,8 +196,6 @@ func (n *Node) pack(flow *packer.Flow) error {
n.processFork(newBlock, oldBest.Header.ID())
commitElapsed := mclock.Now() - startTime - execElapsed

metricBlockProposedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"status": "proposedBlock"})
metricBlockProposedUsedGas().AddWithLabel(int64(newBlock.Header().GasUsed()), map[string]string{"status": "proposedBlock"})
n.comm.BroadcastBlock(newBlock)
log.Info("📦 new block packed",
"txs", len(receipts),
Expand All @@ -204,6 +207,10 @@ func (n *Node) pack(flow *packer.Flow) error {
if v, updated := n.bandwidth.Update(newBlock.Header(), time.Duration(realElapsed)); updated {
log.Debug("bandwidth updated", "gps", v)
}

metricBlockProcessedTxs().AddWithLabel(int64(len(receipts)), map[string]string{"type": "proposed"})
metricBlockProcessedGas().AddWithLabel(int64(newBlock.Header().GasUsed()), map[string]string{"type": "proposed"})
metricBlockProcessedDuration().Observe(time.Duration(realElapsed).Milliseconds())
return nil
}))
})
}
1 change: 1 addition & 0 deletions txpool/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (p *TxPool) housekeeping() {
p.executables.Store(executables)
}

metricTxPoolGauge().GaugeWithLabel(0-int64(removed), map[string]string{"source": "washed", "total": "true"})
log.Debug("wash done", ctx...)
}
}
Expand Down

0 comments on commit abd4bcb

Please sign in to comment.