Skip to content

Commit

Permalink
feat: error handling
Browse files Browse the repository at this point in the history
Signed-off-by: LingKa <[email protected]>
  • Loading branch information
LingKa28 committed Jan 16, 2024
1 parent e1f0bc2 commit 8c21351
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 7 deletions.
20 changes: 19 additions & 1 deletion client/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,22 @@ func (e *CommandError) Error() string {
return fmt.Sprintf("command error: %v", e.err)
}

var ErrWrongClusterVersion = errors.New("wrong cluster version")
type errInternalError struct {
inner error
}

func NewErrInternalError(err error) error {
return &errInternalError{
inner: err,
}
}

func (e *errInternalError) Error() string {
return fmt.Sprintf("Client Internal error: %s", e.inner.Error())
}

var (
ErrShuttingDown = errors.New("Curp Server is shutting down")
ErrWrongClusterVersion = errors.New("Wrong cluster version")
ErrTimeout = errors.New("Request timeout")
)
102 changes: 96 additions & 6 deletions client/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,22 @@ func (c *protocolClient) fastRound(pid *curpapi.ProposeId, cmd *xlineapi.Command
return nil, &CommandError{err: &exeErr}
}
case err := <-errCh:
// TODO: error handling
c.logger.Warn("propose fail", zap.Error(err))
if fromErr, ok := status.FromError(err); ok {
msg := fromErr.Message()
if msg == "wrong cluster version" {
curpErr := curpapi.CurpError{}
dtl := fromErr.Details()
err := proto.Unmarshal(dtl[0].([]byte), &curpErr)
if err != nil {
return nil, err
}
if curpErr.GetShuttingDown() != nil {
return nil, ErrShuttingDown
} else if curpErr.GetWrongClusterVersion() != nil {
return nil, ErrWrongClusterVersion
} else {
continue
}
}
return nil, err
}
}

Expand All @@ -311,6 +318,7 @@ func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command
var exeErr xlineapi.ExecuteError

retryCnt := c.config.RetryCount
retryTimeout := c.getBackoff()
for i := 0; i < retryCnt; i++ {
leaderID, err := c.getLeaderID()
if err != nil {
Expand All @@ -327,7 +335,39 @@ func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command
}
res, err := protocolClient.WaitSynced(ctx, req)
if err != nil {
return nil, err
if fromErr, ok := status.FromError(err); ok {
curpErr := curpapi.CurpError{}
dtl := fromErr.Details()
err := proto.Unmarshal(dtl[0].([]byte), &curpErr)
if err != nil {
return nil, err
}
if curpErr.GetShuttingDown() != nil {
return nil, ErrShuttingDown
} else if curpErr.GetWrongClusterVersion() != nil {
return nil, ErrWrongClusterVersion
} else if curpErr.GetRpcTransport() != nil {
// it's quite likely that the leader has crashed, then we should wait for some time and fetch the leader again
time.Sleep(retryTimeout.nextRetry())
err := c.resendPropose(pid, cmd, nil)
if err != nil {
return nil, err
}
continue
} else if curpErr.GetRedirect() != nil {
newLeader := curpErr.GetRedirect().GetLeaderId()
term := curpErr.GetRedirect().GetTerm()
c.state.checkAndUpdate(newLeader, term)
// resend the propose to the new leader
err := c.resendPropose(pid, cmd, &newLeader)
if err != nil {
return nil, err
}
continue
} else {
return nil, NewErrInternalError(err)
}
}
}

if res.AfterSyncResult != nil {
Expand Down Expand Up @@ -370,7 +410,57 @@ func (c *protocolClient) slowRound(pid *curpapi.ProposeId, cmd *xlineapi.Command
}, nil
}

return nil, errors.New("slow round timeout")
return nil, ErrTimeout
}

// Resend the propose only to the leader. This is used when leader changes and we need to ensure that the propose is received by the new leader.
func (c *protocolClient) resendPropose(pid *curpapi.ProposeId, cmd *xlineapi.Command, newLeader *ServerId) error {
retryTimeout := c.getBackoff()
retryCnt := c.config.RetryCount

for i := 0; i < retryCnt; i++ {
time.Sleep(retryTimeout.nextRetry())

var leaderID ServerId
if newLeader != nil {
leaderID = *newLeader
} else {
res, err := c.fetchLeader()
if err != nil {
return err
}
leaderID = *res
}

bcmd, err := proto.Marshal(cmd)
if err != nil {
return err
}

protocolClient := curpapi.NewProtocolClient(c.connects[leaderID])
ctx, cancel := context.WithTimeout(context.Background(), c.config.ProposeTimeout)
defer cancel()
_, err = protocolClient.Propose(ctx, &curpapi.ProposeRequest{ProposeId: pid, Command: bcmd, ClusterVersion: c.clusterVersion})
if err != nil {
if fromErr, ok := status.FromError(err); ok {
curpErr := curpapi.CurpError{}
dtl := fromErr.Details()
err := proto.Unmarshal(dtl[0].([]byte), &curpErr)
if err != nil {
return err
}
if curpErr.GetShuttingDown() != nil {
return ErrShuttingDown
} else if curpErr.GetWrongClusterVersion() != nil {
return ErrWrongClusterVersion
} else {
continue
}
}
}
}

return ErrTimeout
}

// Generate a propose id
Expand Down

0 comments on commit 8c21351

Please sign in to comment.