Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCI-2450: Health check for MultiNode to ensure that node is not in syncing state #11765

Merged
merged 15 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions common/client/mock_node_client_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions common/client/mock_rpc_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 46 additions & 18 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,53 +224,81 @@ func (n *node[CHAIN_ID, HEAD, RPC]) start(startCtx context.Context) {
}
n.setState(nodeStateDialed)

if err := n.verify(startCtx); errors.Is(err, errInvalidChainID) {
n.lfcLog.Errorw("Verify failed: Node has the wrong chain ID", "err", err)
n.declareInvalidChainID()
return
} else if err != nil {
n.lfcLog.Errorw(fmt.Sprintf("Verify failed: %v", err), "err", err)
n.declareUnreachable()
return
}

n.declareAlive()
state := n.verifyConn(startCtx, n.lfcLog)
n.declareState(state)
}

// verify checks that all connections to eth nodes match the given chain ID
// verifyChainID checks that connection to the node matches the given chain ID
// Not thread-safe
// Pure verify: does not mutate node "state" field.
func (n *node[CHAIN_ID, HEAD, RPC]) verify(callerCtx context.Context) (err error) {
// Pure verifyChainID: does not mutate node "state" field.
func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lggr logger.Logger) nodeState {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this return a state now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my initial thought would be an error return from the name of the method

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samsondav
This way we unify error handling and logging across multiple callers and unify signature with other methods that perform similar actions - verify a new connection.

Previously verify was called directly in multiple places: outOfSyncLoop, unreachableLoop, invalidChainIDLoop and start. New syncingLoop also needs to check that chainID matches.
All the callers had to log the error and do corresponding state transition.
With new syncing check we would endup with something like this

if err := n.rpc.Dial(n.nodeCtx); err != nil {
    lggr.Errorw("Failed to dial out-of-sync RPC node", "nodeState", n.State()).
    n.declareUnreachable()
    return
}

if err := n.verify(n.nodeCtx); errors.Is(err, errInvalidChainID) {
    n.lfcLog.Errorw("Verify failed: Node has the wrong chain ID", "err", err)
    n.declareInvalidChainID()
    return
} else if err != nil {
    n.lfcLog.Errorw(fmt.Sprintf("Verify failed: %v", err), "err", err)
    n.declareUnreachable()
    return
}

if isSyncing, err := n.verifyIsSyncing(n.nodeCtx); err != nil {
    n.lfcLog.Errorw("Node is still syncing")
    n.declareIsSyncing()
    return
} else if err != nil {
    n.lfcLog.Errorw(fmt.Sprintf("Verify is syncing failed: %v", err), "err", err)
    n.declareUnreachable()
    return
}

Instead we can do this

state := n.createVerifiedConn(n.nodeCtx, lggr)
if state != nodeStateAlive {
    n.declareState(state)
    return
}

@poopoothegorilla yes, the naming is not perfect, but I do not have better options. What do you suggest?

Copy link
Contributor

@poopoothegorilla poopoothegorilla Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeh i dont think the name is that big of a deal... I guess one option could be to return both the nodeState and error... that way you could just have the caller handle the logging and you might not have to pass the lggr through all calls

Its been a while since i read the PR sorry

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this makes sense

promPoolRPCNodeVerifies.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc()
promFailed := func() {
promPoolRPCNodeVerifiesFailed.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc()
}

st := n.State()
switch st {
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID:
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing:
default:
panic(fmt.Sprintf("cannot verify node in state %v", st))
}

var chainID CHAIN_ID
var err error
if chainID, err = n.rpc.ChainID(callerCtx); err != nil {
promFailed()
return fmt.Errorf("failed to verify chain ID for node %s: %w", n.name, err)
lggr.Errorw("Failed to verify chain ID for node", "err", err, "nodeState", n.State())
return nodeStateUnreachable
} else if chainID.String() != n.chainID.String() {
promFailed()
return fmt.Errorf(
err = fmt.Errorf(
"rpc ChainID doesn't match local chain ID: RPC ID=%s, local ID=%s, node name=%s: %w",
chainID.String(),
n.chainID.String(),
n.name,
errInvalidChainID,
)
lggr.Errorw("Failed to verify RPC node; remote endpoint returned the wrong chain ID", "err", err, "nodeState", n.State())
return nodeStateInvalidChainID
}

promPoolRPCNodeVerifiesSuccess.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc()

return nil
return nodeStateAlive
}

// createVerifiedConn - establishes new connection with the RPC and verifies that it's valid: chainID matches, and it's not syncing.
// Returns desired state if one of the verifications fails. Otherwise, returns nodeStateAlive.
func (n *node[CHAIN_ID, HEAD, RPC]) createVerifiedConn(ctx context.Context, lggr logger.Logger) nodeState {
if err := n.rpc.Dial(ctx); err != nil {
n.lfcLog.Errorw("Dial failed: Node is unreachable", "err", err, "nodeState", n.State())
return nodeStateUnreachable
}

return n.verifyConn(ctx, lggr)
}

// verifyConn - verifies that current connection is valid: chainID matches, and it's not syncing.
// Returns desired state if one of the verifications fails. Otherwise, returns nodeStateAlive.
func (n *node[CHAIN_ID, HEAD, RPC]) verifyConn(ctx context.Context, lggr logger.Logger) nodeState {
state := n.verifyChainID(ctx, lggr)
if state != nodeStateAlive {
return state
}

isSyncing, err := n.rpc.IsSyncing(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put an additional config setting here?
NodeIsSyncingEnabled = false[default]

This will allow us to skip IsSyncing() calls for chains/clients that don't reliably implement this functionality, or have bugs around this behavior.

if err != nil {
lggr.Errorw("Unexpected error while verifying RPC node synchronization status", "err", err, "nodeState", n.State())
return nodeStateUnreachable
}

if isSyncing {
lggr.Errorw("Verification failed: Node is syncing", "nodeState", n.State())
return nodeStateSyncing
}

return nodeStateAlive
}

// disconnectAll disconnects all clients connected to the node
Expand Down
57 changes: 53 additions & 4 deletions common/client/node_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ var (
Name: "pool_rpc_node_num_transitions_to_unusable",
Help: transitionString(nodeStateUnusable),
}, []string{"chainID", "nodeName"})
promPoolRPCNodeTransitionsToSyncing = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "pool_rpc_node_num_transitions_to_syncing",
Help: transitionString(nodeStateSyncing),
}, []string{"chainID", "nodeName"})
)

// nodeState represents the current state of the node
Expand Down Expand Up @@ -87,6 +91,11 @@ const (
nodeStateUnusable
// nodeStateClosed is after the connection has been closed and the node is at the end of its lifecycle
nodeStateClosed
// nodeStateSyncing is a node that is actively back-filling blockchain. Usually, it's a newly set up node that is
// still syncing the chain. The main difference from `nodeStateOutOfSync` is that it represents state relative
// to other primary nodes configured in the MultiNode. In contrast, `nodeStateSyncing` represents the internal state of
// the node (RPC).
nodeStateSyncing
// nodeStateLen tracks the number of states
nodeStateLen
)
Expand Down Expand Up @@ -144,7 +153,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToAlive(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateInvalidChainID:
case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing:
n.state = nodeStateAlive
default:
panic(transitionFail(n.state, nodeStateAlive))
Expand All @@ -171,7 +180,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInSync(fn func()) {
return
}
switch n.state {
case nodeStateOutOfSync:
case nodeStateOutOfSync, nodeStateSyncing:
n.state = nodeStateAlive
default:
panic(transitionFail(n.state, nodeStateAlive))
Expand Down Expand Up @@ -222,7 +231,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) {
return
}
switch n.state {
case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID:
case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing:
n.disconnectAll()
n.state = nodeStateUnreachable
default:
Expand All @@ -231,6 +240,21 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) {
fn()
}

func (n *node[CHAIN_ID, HEAD, RPC]) declareState(state nodeState) {
switch state {
case nodeStateInvalidChainID:
n.declareInvalidChainID()
case nodeStateUnreachable:
n.declareUnreachable()
case nodeStateSyncing:
n.declareSyncing()
case nodeStateAlive:
n.declareAlive()
default:
panic(fmt.Sprintf("%#v state declaration is not impletemented", state))
dhaidashenko marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (n *node[CHAIN_ID, HEAD, RPC]) declareInvalidChainID() {
n.transitionToInvalidChainID(func() {
n.lfcLog.Errorw("RPC Node has the wrong chain ID", "nodeState", n.state)
Expand All @@ -247,7 +271,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync:
case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing:
n.disconnectAll()
n.state = nodeStateInvalidChainID
default:
Expand All @@ -256,6 +280,31 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) {
fn()
}

func (n *node[CHAIN_ID, HEAD, RPC]) declareSyncing() {
n.transitionToSyncing(func() {
n.lfcLog.Errorw("RPC Node is syncing", "nodeState", n.state)
n.wg.Add(1)
go n.syncingLoop()
})
}

func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) {
promPoolRPCNodeTransitionsToSyncing.WithLabelValues(n.chainID.String(), n.name).Inc()
n.stateMu.Lock()
defer n.stateMu.Unlock()
if n.state == nodeStateClosed {
return
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID:
n.disconnectAll()
n.state = nodeStateSyncing
default:
panic(transitionFail(n.state, nodeStateSyncing))
}
fn()
}

func transitionString(state nodeState) string {
return fmt.Sprintf("Total number of times node has transitioned to %s", state)
}
Expand Down
15 changes: 11 additions & 4 deletions common/client/node_fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func TestUnit_Node_StateTransitions(t *testing.T) {

t.Run("transitionToAlive", func(t *testing.T) {
const destinationState = nodeStateAlive
allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID}
allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing}
rpc := newMockNodeClient[types.ID, Head](t)
testTransition(t, rpc, testNode.transitionToAlive, destinationState, allowedStates...)
})

t.Run("transitionToInSync", func(t *testing.T) {
const destinationState = nodeStateAlive
allowedStates := []nodeState{nodeStateOutOfSync}
allowedStates := []nodeState{nodeStateOutOfSync, nodeStateSyncing}
rpc := newMockNodeClient[types.ID, Head](t)
testTransition(t, rpc, testNode.transitionToInSync, destinationState, allowedStates...)
})
Expand All @@ -61,18 +61,25 @@ func TestUnit_Node_StateTransitions(t *testing.T) {
})
t.Run("transitionToUnreachable", func(t *testing.T) {
const destinationState = nodeStateUnreachable
allowedStates := []nodeState{nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID}
allowedStates := []nodeState{nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing}
rpc := newMockNodeClient[types.ID, Head](t)
rpc.On("DisconnectAll").Times(len(allowedStates))
testTransition(t, rpc, testNode.transitionToUnreachable, destinationState, allowedStates...)
})
t.Run("transitionToInvalidChain", func(t *testing.T) {
const destinationState = nodeStateInvalidChainID
allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync}
allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing}
rpc := newMockNodeClient[types.ID, Head](t)
rpc.On("DisconnectAll").Times(len(allowedStates))
testTransition(t, rpc, testNode.transitionToInvalidChainID, destinationState, allowedStates...)
})
t.Run("transitionToSyncing", func(t *testing.T) {
const destinationState = nodeStateSyncing
allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID}
rpc := newMockNodeClient[types.ID, Head](t)
rpc.On("DisconnectAll").Times(len(allowedStates))
testTransition(t, rpc, testNode.transitionToSyncing, destinationState, allowedStates...)
})
}

func testTransition(t *testing.T, rpc *mockNodeClient[types.ID, Head], transition func(node testNode, fn func()), destinationState nodeState, allowedStates ...nodeState) {
Expand Down
Loading
Loading