Skip to content

Commit

Permalink
BCI-2450: Health check for MultiNode to ensure that node is not in sy…
Browse files Browse the repository at this point in the history
…ncing state (#11765)

* syncing state for multinode

* fix evm tests

* zombie check for syncing node

* Update common/client/node_fsm.go

Co-authored-by: Sam <[email protected]>

* make IsSyncing check optional

* regen config docs

* remove merge artefact

* add description to the test

* fix config docs

* fix RPC client mock generation

* regen mocks

---------

Co-authored-by: Sam <[email protected]>
  • Loading branch information
dhaidashenko and samsondav authored Feb 27, 2024
1 parent 2d9342e commit 17287df
Show file tree
Hide file tree
Showing 34 changed files with 908 additions and 148 deletions.
28 changes: 28 additions & 0 deletions common/client/mock_node_client_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions common/client/mock_rpc_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 49 additions & 18 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type NodeConfig interface {
PollInterval() time.Duration
SelectionMode() string
SyncThreshold() uint32
NodeIsSyncingEnabled() bool
}

//go:generate mockery --quiet --name Node --structname mockNode --filename "mock_node_test.go" --inpackage --case=underscore
Expand Down Expand Up @@ -224,53 +225,83 @@ func (n *node[CHAIN_ID, HEAD, RPC]) start(startCtx context.Context) {
}
n.setState(nodeStateDialed)

if err := n.verify(startCtx); errors.Is(err, errInvalidChainID) {
n.lfcLog.Errorw("Verify failed: Node has the wrong chain ID", "err", err)
n.declareInvalidChainID()
return
} else if err != nil {
n.lfcLog.Errorw(fmt.Sprintf("Verify failed: %v", err), "err", err)
n.declareUnreachable()
return
}

n.declareAlive()
state := n.verifyConn(startCtx, n.lfcLog)
n.declareState(state)
}

// verify checks that all connections to eth nodes match the given chain ID
// verifyChainID checks that connection to the node matches the given chain ID
// Not thread-safe
// Pure verify: does not mutate node "state" field.
func (n *node[CHAIN_ID, HEAD, RPC]) verify(callerCtx context.Context) (err error) {
// Pure verifyChainID: does not mutate node "state" field.
func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lggr logger.Logger) nodeState {
promPoolRPCNodeVerifies.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc()
promFailed := func() {
promPoolRPCNodeVerifiesFailed.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc()
}

st := n.State()
switch st {
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID:
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing:
default:
panic(fmt.Sprintf("cannot verify node in state %v", st))
}

var chainID CHAIN_ID
var err error
if chainID, err = n.rpc.ChainID(callerCtx); err != nil {
promFailed()
return fmt.Errorf("failed to verify chain ID for node %s: %w", n.name, err)
lggr.Errorw("Failed to verify chain ID for node", "err", err, "nodeState", n.State())
return nodeStateUnreachable
} else if chainID.String() != n.chainID.String() {
promFailed()
return fmt.Errorf(
err = fmt.Errorf(
"rpc ChainID doesn't match local chain ID: RPC ID=%s, local ID=%s, node name=%s: %w",
chainID.String(),
n.chainID.String(),
n.name,
errInvalidChainID,
)
lggr.Errorw("Failed to verify RPC node; remote endpoint returned the wrong chain ID", "err", err, "nodeState", n.State())
return nodeStateInvalidChainID
}

promPoolRPCNodeVerifiesSuccess.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc()

return nil
return nodeStateAlive
}

// createVerifiedConn - establishes new connection with the RPC and verifies that it's valid: chainID matches, and it's not syncing.
// Returns desired state if one of the verifications fails. Otherwise, returns nodeStateAlive.
func (n *node[CHAIN_ID, HEAD, RPC]) createVerifiedConn(ctx context.Context, lggr logger.Logger) nodeState {
if err := n.rpc.Dial(ctx); err != nil {
n.lfcLog.Errorw("Dial failed: Node is unreachable", "err", err, "nodeState", n.State())
return nodeStateUnreachable
}

return n.verifyConn(ctx, lggr)
}

// verifyConn - verifies that current connection is valid: chainID matches, and it's not syncing.
// Returns desired state if one of the verifications fails. Otherwise, returns nodeStateAlive.
func (n *node[CHAIN_ID, HEAD, RPC]) verifyConn(ctx context.Context, lggr logger.Logger) nodeState {
state := n.verifyChainID(ctx, lggr)
if state != nodeStateAlive {
return state
}

if n.nodePoolCfg.NodeIsSyncingEnabled() {
isSyncing, err := n.rpc.IsSyncing(ctx)
if err != nil {
lggr.Errorw("Unexpected error while verifying RPC node synchronization status", "err", err, "nodeState", n.State())
return nodeStateUnreachable
}

if isSyncing {
lggr.Errorw("Verification failed: Node is syncing", "nodeState", n.State())
return nodeStateSyncing
}
}

return nodeStateAlive
}

// disconnectAll disconnects all clients connected to the node
Expand Down
63 changes: 59 additions & 4 deletions common/client/node_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ var (
Name: "pool_rpc_node_num_transitions_to_unusable",
Help: transitionString(nodeStateUnusable),
}, []string{"chainID", "nodeName"})
promPoolRPCNodeTransitionsToSyncing = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "pool_rpc_node_num_transitions_to_syncing",
Help: transitionString(nodeStateSyncing),
}, []string{"chainID", "nodeName"})
)

// nodeState represents the current state of the node
Expand All @@ -57,6 +61,8 @@ func (n nodeState) String() string {
return "OutOfSync"
case nodeStateClosed:
return "Closed"
case nodeStateSyncing:
return "Syncing"
default:
return fmt.Sprintf("nodeState(%d)", n)
}
Expand Down Expand Up @@ -87,6 +93,11 @@ const (
nodeStateUnusable
// nodeStateClosed is after the connection has been closed and the node is at the end of its lifecycle
nodeStateClosed
// nodeStateSyncing is a node that is actively back-filling blockchain. Usually, it's a newly set up node that is
// still syncing the chain. The main difference from `nodeStateOutOfSync` is that it represents state relative
// to other primary nodes configured in the MultiNode. In contrast, `nodeStateSyncing` represents the internal state of
// the node (RPC).
nodeStateSyncing
// nodeStateLen tracks the number of states
nodeStateLen
)
Expand Down Expand Up @@ -144,7 +155,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToAlive(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateInvalidChainID:
case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing:
n.state = nodeStateAlive
default:
panic(transitionFail(n.state, nodeStateAlive))
Expand All @@ -171,7 +182,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInSync(fn func()) {
return
}
switch n.state {
case nodeStateOutOfSync:
case nodeStateOutOfSync, nodeStateSyncing:
n.state = nodeStateAlive
default:
panic(transitionFail(n.state, nodeStateAlive))
Expand Down Expand Up @@ -222,7 +233,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) {
return
}
switch n.state {
case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID:
case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing:
n.disconnectAll()
n.state = nodeStateUnreachable
default:
Expand All @@ -231,6 +242,21 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) {
fn()
}

func (n *node[CHAIN_ID, HEAD, RPC]) declareState(state nodeState) {
switch state {
case nodeStateInvalidChainID:
n.declareInvalidChainID()
case nodeStateUnreachable:
n.declareUnreachable()
case nodeStateSyncing:
n.declareSyncing()
case nodeStateAlive:
n.declareAlive()
default:
panic(fmt.Sprintf("%#v state declaration is not implemented", state))
}
}

func (n *node[CHAIN_ID, HEAD, RPC]) declareInvalidChainID() {
n.transitionToInvalidChainID(func() {
n.lfcLog.Errorw("RPC Node has the wrong chain ID", "nodeState", n.state)
Expand All @@ -247,7 +273,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync:
case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing:
n.disconnectAll()
n.state = nodeStateInvalidChainID
default:
Expand All @@ -256,6 +282,35 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) {
fn()
}

func (n *node[CHAIN_ID, HEAD, RPC]) declareSyncing() {
n.transitionToSyncing(func() {
n.lfcLog.Errorw("RPC Node is syncing", "nodeState", n.state)
n.wg.Add(1)
go n.syncingLoop()
})
}

func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) {
promPoolRPCNodeTransitionsToSyncing.WithLabelValues(n.chainID.String(), n.name).Inc()
n.stateMu.Lock()
defer n.stateMu.Unlock()
if n.state == nodeStateClosed {
return
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID:
n.disconnectAll()
n.state = nodeStateSyncing
default:
panic(transitionFail(n.state, nodeStateSyncing))
}

if !n.nodePoolCfg.NodeIsSyncingEnabled() {
panic("unexpected transition to nodeStateSyncing, while it's disabled")
}
fn()
}

func transitionString(state nodeState) string {
return fmt.Sprintf("Total number of times node has transitioned to %s", state)
}
Expand Down
Loading

0 comments on commit 17287df

Please sign in to comment.