Skip to content

Commit

Permalink
latest finalized block metrics (#12339)
Browse files Browse the repository at this point in the history
* Add LatestFinalizedBlock to HeadTracker

* Added LatestFinalizedHead to Head

* remove unused func

* fix flakey nil pointer

* improve logs & address lint issue

* nitpicks

* fixed copy on heads on MarkFinalized

* error instead of panic

* return error instead of panic

* nitpicks

* Finalized block based history depth

* simplify trimming

* nit fixes

* fix build issues caused by merge

* regen

* FIx rpc client mock generation

* nit fixes

* nit fixes

* update comments

* ensure that we trim redundant blocks both in slice and in chain in Heads
handle corner case for multiple uncle blocks at the end of the slice

* nit fix

* Update common/headtracker/head_tracker.go

Co-authored-by: Dimitris Grigoriou <[email protected]>

* HeadTracker backfill test with 0 finality depth

* latest finalized block metrics

* changelog & go generate fix

* move nodeConfig back into the test pkg

* rollback fields renaming

* nit

* changeset

* removed unused func

* Set default value for FinalizedBlockPollInterval

* updated docs

---------

Co-authored-by: Dimitris Grigoriou <[email protected]>
  • Loading branch information
dhaidashenko and dimriou authored Mar 26, 2024
1 parent 2e3248e commit 96d2fe1
Show file tree
Hide file tree
Showing 34 changed files with 607 additions and 177 deletions.
5 changes: 5 additions & 0 deletions .changeset/poor-melons-vanish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Add the `pool_rpc_node_highest_finalized_block` metric that tracks the highest finalized block seen per RPC. If `FinalityTagEnabled = true`, a positive `NodePool.FinalizedBlockPollInterval` is needed to collect the metric. If the finality tag is not enabled, the metric is populated with a calculated latest finalized block based on the latest head and finality depth.
18 changes: 18 additions & 0 deletions common/client/mock_head_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions common/client/mock_node_client_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions common/client/mocks/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package mocks

import (
"time"

commonconfig "github.com/smartcontractkit/chainlink/v2/common/config"
)

type ChainConfig struct {
IsFinalityTagEnabled bool
FinalityDepthVal uint32
NoNewHeadsThresholdVal time.Duration
ChainTypeVal commonconfig.ChainType
}

func (t ChainConfig) ChainType() commonconfig.ChainType {
return t.ChainTypeVal
}

func (t ChainConfig) NodeNoNewHeadsThreshold() time.Duration {
return t.NoNewHeadsThresholdVal
}

func (t ChainConfig) FinalityDepth() uint32 {
return t.FinalityDepthVal
}

func (t ChainConfig) FinalityTagEnabled() bool {
return t.IsFinalityTagEnabled
}
34 changes: 22 additions & 12 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"

commonconfig "github.com/smartcontractkit/chainlink/v2/common/config"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

Expand Down Expand Up @@ -43,6 +44,14 @@ type NodeConfig interface {
SelectionMode() string
SyncThreshold() uint32
NodeIsSyncingEnabled() bool
FinalizedBlockPollInterval() time.Duration
}

type ChainConfig interface {
NodeNoNewHeadsThreshold() time.Duration
FinalityDepth() uint32
FinalityTagEnabled() bool
ChainType() commonconfig.ChainType
}

//go:generate mockery --quiet --name Node --structname mockNode --filename "mock_node_test.go" --inpackage --case=underscore
Expand Down Expand Up @@ -73,14 +82,14 @@ type node[
RPC NodeClient[CHAIN_ID, HEAD],
] struct {
services.StateMachine
lfcLog logger.Logger
name string
id int32
chainID CHAIN_ID
nodePoolCfg NodeConfig
noNewHeadsThreshold time.Duration
order int32
chainFamily string
lfcLog logger.Logger
name string
id int32
chainID CHAIN_ID
nodePoolCfg NodeConfig
chainCfg ChainConfig
order int32
chainFamily string

ws url.URL
http *url.URL
Expand All @@ -90,8 +99,9 @@ type node[
stateMu sync.RWMutex // protects state* fields
state nodeState
// Each node is tracking the last received head number and total difficulty
stateLatestBlockNumber int64
stateLatestTotalDifficulty *big.Int
stateLatestBlockNumber int64
stateLatestTotalDifficulty *big.Int
stateLatestFinalizedBlockNumber int64

// nodeCtx is the node lifetime's context
nodeCtx context.Context
Expand All @@ -113,7 +123,7 @@ func NewNode[
RPC NodeClient[CHAIN_ID, HEAD],
](
nodeCfg NodeConfig,
noNewHeadsThreshold time.Duration,
chainCfg ChainConfig,
lggr logger.Logger,
wsuri url.URL,
httpuri *url.URL,
Expand All @@ -129,7 +139,7 @@ func NewNode[
n.id = id
n.chainID = chainID
n.nodePoolCfg = nodeCfg
n.noNewHeadsThreshold = noNewHeadsThreshold
n.chainCfg = chainCfg
n.ws = wsuri
n.order = nodeOrder
if httpuri != nil {
Expand Down
45 changes: 42 additions & 3 deletions common/client/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ var (
Name: "pool_rpc_node_highest_seen_block",
Help: "The highest seen block for the given RPC node",
}, []string{"chainID", "nodeName"})
promPoolRPCNodeHighestFinalizedBlock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "pool_rpc_node_highest_finalized_block",
Help: "The highest seen finalized block for the given RPC node",
}, []string{"chainID", "nodeName"})
promPoolRPCNodeNumSeenBlocks = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "pool_rpc_node_num_seen_blocks",
Help: "The total number of new blocks seen by the given RPC node",
Expand Down Expand Up @@ -88,7 +92,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
}
}

noNewHeadsTimeoutThreshold := n.noNewHeadsThreshold
noNewHeadsTimeoutThreshold := n.chainCfg.NodeNoNewHeadsThreshold()
pollFailureThreshold := n.nodePoolCfg.PollFailureThreshold()
pollInterval := n.nodePoolCfg.PollInterval()

Expand Down Expand Up @@ -134,6 +138,14 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Debug("Polling disabled")
}

var pollFinalizedHeadCh <-chan time.Time
if n.chainCfg.FinalityTagEnabled() && n.nodePoolCfg.FinalizedBlockPollInterval() > 0 {
lggr.Debugw("Finalized block polling enabled")
pollT := time.NewTicker(n.nodePoolCfg.FinalizedBlockPollInterval())
defer pollT.Stop()
pollFinalizedHeadCh = pollT.C
}

_, highestReceivedBlockNumber, _ := n.StateAndLatest()
var pollFailures uint32

Expand Down Expand Up @@ -201,6 +213,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
outOfSyncT.Reset(noNewHeadsTimeoutThreshold)
}
n.setLatestReceived(bh.BlockNumber(), bh.BlockDifficulty())
if !n.chainCfg.FinalityTagEnabled() {
latestFinalizedBN := max(bh.BlockNumber()-int64(n.chainCfg.FinalityDepth()), 0)
if latestFinalizedBN > n.stateLatestFinalizedBlockNumber {
promPoolRPCNodeHighestFinalizedBlock.WithLabelValues(n.chainID.String(), n.name).Set(float64(latestFinalizedBN))
n.stateLatestFinalizedBlockNumber = latestFinalizedBN
}
}
case err := <-sub.Err():
lggr.Errorw("Subscription was terminated", "err", err, "nodeState", n.State())
n.declareUnreachable()
Expand All @@ -214,13 +233,33 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
// We don't necessarily want to wait the full timeout to check again, we should
// check regularly and log noisily in this state
outOfSyncT.Reset(zombieNodeCheckInterval(n.noNewHeadsThreshold))
outOfSyncT.Reset(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold))
continue
}
}
n.declareOutOfSync(func(num int64, td *big.Int) bool { return num < highestReceivedBlockNumber })
return
case <-pollFinalizedHeadCh:
ctx, cancel := context.WithTimeout(n.nodeCtx, n.nodePoolCfg.FinalizedBlockPollInterval())
latestFinalized, err := n.RPC().LatestFinalizedBlock(ctx)
cancel()
if err != nil {
lggr.Warnw("Failed to fetch latest finalized block", "err", err)
continue
}

if !latestFinalized.IsValid() {
lggr.Warn("Latest finalized block is not valid")
continue
}

latestFinalizedBN := latestFinalized.BlockNumber()
if latestFinalizedBN > n.stateLatestFinalizedBlockNumber {
promPoolRPCNodeHighestFinalizedBlock.WithLabelValues(n.chainID.String(), n.name).Set(float64(latestFinalizedBN))
n.stateLatestFinalizedBlockNumber = latestFinalizedBN
}
}

}
}

Expand Down Expand Up @@ -316,7 +355,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(isOutOfSync func(num int64, td
return
}
lggr.Debugw(msgReceivedBlock, "blockNumber", head.BlockNumber(), "blockDifficulty", head.BlockDifficulty(), "nodeState", n.State())
case <-time.After(zombieNodeCheckInterval(n.noNewHeadsThreshold)):
case <-time.After(zombieNodeCheckInterval(n.chainCfg.NodeNoNewHeadsThreshold())):
if n.nLiveNodes != nil {
if l, _, _ := n.nLiveNodes(); l < 1 {
lggr.Critical("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state")
Expand Down
Loading

0 comments on commit 96d2fe1

Please sign in to comment.