Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): node stalled after client has stopped #1001

Merged
merged 4 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type socketClient struct {

var _ Client = (*socketClient)(nil)

var (
// ErrClientStopped is returned when client wasn't started yet or it was terminated with an error.
ErrClientStopped = errors.New("client has stopped")
)

// NewSocketClient creates a new socket client, which connects to a given
// address. If mustConnect is true, the client will return an error upon start
// if it fails to connect.
Expand Down Expand Up @@ -234,7 +239,7 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {

func (cli *socketClient) doRequest(ctx context.Context, req *types.Request) (*types.Response, error) {
if !cli.IsRunning() {
return nil, errors.New("client has stopped")
return nil, ErrClientStopped
}

reqres := makeReqRes(ctx, req)
Expand Down
81 changes: 81 additions & 0 deletions internal/consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

abciclient "github.com/dashpay/tenderdash/abci/client"
clientmocks "github.com/dashpay/tenderdash/abci/client/mocks"
"github.com/dashpay/tenderdash/abci/example/kvstore"
abci "github.com/dashpay/tenderdash/abci/types"
abcimocks "github.com/dashpay/tenderdash/abci/types/mocks"
Expand All @@ -22,6 +24,7 @@ import (
"github.com/dashpay/tenderdash/internal/mempool"
tmpubsub "github.com/dashpay/tenderdash/internal/pubsub"
tmquery "github.com/dashpay/tenderdash/internal/pubsub/query"
sm "github.com/dashpay/tenderdash/internal/state"
sf "github.com/dashpay/tenderdash/internal/state/test/factory"
"github.com/dashpay/tenderdash/internal/test/factory"
tmbytes "github.com/dashpay/tenderdash/libs/bytes"
Expand Down Expand Up @@ -3272,6 +3275,84 @@ func TestStateTryAddCommitCallsProcessProposal(t *testing.T) {
assert.NoError(t, err)
}

// TestStateTryAddCommitPanicsOnClientError ensures that
// given ABCI client that errors on ProcessProposal,
// when new block is about to be processed,
// then the TryAddCommitEvent causes a panic.
func TestStateTryAddCommitPanicsOnClientError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config := configSetup(t)

// setup some node and commit
genDoc, privVals := factory.RandGenesisDoc(1, factory.ConsensusParams())
logger := consensusLogger(t)

state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)

// Create a panicking app
app := clientmocks.NewClient(t)
app.On("ProcessProposal", mock.Anything, mock.Anything).
Return(&abci.ResponseProcessProposal{}, abciclient.ErrClientStopped).
Once()

// create a new consensus state
proTxHash, err := privVals[0].GetProTxHash(ctx)
require.NoError(t, err)
ctx = dash.ContextWithProTxHash(ctx, proTxHash)
consensusState := newStateWithConfig(ctx, t, logger, config, state, privVals[0], app)

stateData := consensusState.GetStateData()

// create proposal and a block to be processed
block, err := sf.MakeBlock(state, 1, &types.Commit{}, kvstore.ProtocolVersion)
require.NoError(t, err)
require.NotZero(t, block.Version.App)
block.CoreChainLockedHeight = 1

commit, err := factory.MakeCommit(
ctx,
block.BlockID(nil),
block.Height,
0,
stateData.Votes.Precommits(0),
state.Validators,
privVals,
)
require.NoError(t, err)

proposal := types.NewProposal(
block.Height,
block.CoreChainLockedHeight,
0,
-1,
commit.BlockID,
block.Time)

parts, err := block.MakePartSet(999999999)
require.NoError(t, err)

// emulate that the node has received proposal and block
peerID := stateData.Validators.Proposer().NodeAddress.NodeID
stateData.Proposal = proposal
stateData.ProposalBlock = block
stateData.ProposalBlockParts = parts
stateData.updateRoundStep(commit.Round, cstypes.RoundStepPrevote)

// invoke the TryAddCommitEvent to see if it will panic
ctx = msgInfoWithCtx(ctx, msgInfo{
Msg: &CommitMessage{commit},
PeerID: peerID,
ReceiveTime: time.Time{},
})
assert.PanicsWithError(t,
"ABCI client stopped, Tenderdash needs to be restarted: ProcessProposal abci method: client has stopped",
func() {
_ = consensusState.ctrl.Dispatch(ctx, &TryAddCommitEvent{Commit: commit, PeerID: peerID}, &stateData)
})
}

// TestStateTimestamp_ProposalMatch tests that a validator prevotes a
// proposed block if the timestamp in the block matches the timestamp in the
// corresponding proposal message.
Expand Down
6 changes: 6 additions & 0 deletions internal/consensus/state_try_add_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package consensus

import (
"context"
"errors"
"fmt"

abciclient "github.com/dashpay/tenderdash/abci/client"
"github.com/dashpay/tenderdash/dash"
cstypes "github.com/dashpay/tenderdash/internal/consensus/types"
"github.com/dashpay/tenderdash/libs/log"
Expand Down Expand Up @@ -108,6 +110,10 @@ func (cs *TryAddCommitAction) verifyCommit(ctx context.Context, stateData *State
// We have a correct block, let's process it before applying the commit
err = cs.blockExec.ensureProcess(ctx, &stateData.RoundState, commit.Round)
if err != nil {
if errors.Is(err, abciclient.ErrClientStopped) {
// this is a non-recoverable error in current architecture
panic(fmt.Errorf("ABCI client stopped, Tenderdash needs to be restarted: %w", err))
}
lklimek marked this conversation as resolved.
Show resolved Hide resolved
return false, fmt.Errorf("unable to process proposal: %w", err)
}
err = cs.blockExec.validate(ctx, stateData)
Expand Down
Loading