Skip to content

Commit

Permalink
Wait forever for fatal errors (#155)
Browse files Browse the repository at this point in the history
* Wait forever for fatal errors

* Wait forever for fatal errors

* comments

* comments
  • Loading branch information
rokatyy authored Jan 19, 2025
1 parent 24c44b7 commit 9adc70d
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 33 deletions.
34 changes: 28 additions & 6 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,30 @@ func getFunctionName(fn interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}

// give either retryInterval or backoff
// RetryFunc give either retryInterval or backoff
// gets fn func(int) (bool, error, int) as parameter
// which returns:
// bool - whether should be retried
// error - whether error happened
// int - increments retries (allows manage retries count from inside of this function)
func RetryFunc(ctx context.Context,
loggerInstance logger.Logger,
attempts int,
retryInterval *time.Duration,
backoff *Backoff,
fn func(int) (bool, error)) error {
fn func(int) (bool, error, int)) error {

var err error
var retry bool
var addAttempts int

for attempt := 1; attempt <= attempts; attempt++ {
retry, err = fn(attempt)
var attempt = 0
for attempt <= attempts {

attempt++
// some errors might require more attempts than expected, so allow incrementing attempts from outside
retry, err, addAttempts = fn(attempt)
attempts += addAttempts

// if there's no need to retry - we're done
if !retry {
Expand Down Expand Up @@ -178,9 +189,20 @@ func EngineErrorIsNonFatal(err error) bool {
"timeout",
"refused",
}
return errorMatches(err, nonFatalEngineErrorsPartialMatch)
}

func EngineErrorIsFatal(err error) bool {
var fatalEngineErrorsPartialMatch = []string{
"lookup v3io-webapi: i/o timeout",
}
return errorMatches(err, fatalEngineErrorsPartialMatch)
}

func errorMatches(err error, substrings []string) bool {
if err != nil && len(err.Error()) > 0 {
for _, nonFatalError := range nonFatalEngineErrorsPartialMatch {
if strings.Contains(err.Error(), nonFatalError) || strings.Contains(errors.Cause(err).Error(), nonFatalError) {
for _, substring := range substrings {
if strings.Contains(err.Error(), substring) || strings.Contains(errors.Cause(err).Error(), substring) {
return true
}
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/dataplane/streamconsumergroup/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,31 +126,45 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
c.logger,
c.getShardLocationAttempts,
nil,
&c.getShardLocationBackoff, func(attempt int) (bool, error) {
&c.getShardLocationBackoff,
func(attempt int) (bool, error, int) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
if err != nil {
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0
}

// if the error is fatal and requires external resolution,
// we don't want to fail; instead, we will inform the user via a log
if common.EngineErrorIsFatal(err) {
c.logger.ErrorWith("A fatal error occurred. Will retry until successful",
"error", err,
"shard", c.shardID)
// for this type of error, we always increment the attempt counter
// this ensures the smooth operation of other components in Nuclio
// we avoid panicking and simply wait for the issue to be resolved
return true, errors.Wrap(err, "Failed to get shard location"), 1
}

// requested for an immediate stop
if err == v3ioerrors.ErrStopped {
return false, nil
return false, nil, 0
}

switch errors.RootCause(err).(type) {

// in case of a network error, retry to avoid transient errors
case *net.OpError:
return true, errors.Wrap(err, "Failed to get shard location due to a network error")
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0

// unknown error, fail now
default:
return false, errors.Wrap(err, "Failed to get shard location")
return false, errors.Wrap(err, "Failed to get shard location"), 0
}
}

// we have shard location
return false, nil
return false, nil, 0
}); err != nil {
return errors.Wrapf(err,
"Failed to get shard location state, attempts exhausted. shard id: %d",
Expand Down
18 changes: 9 additions & 9 deletions pkg/dataplane/streamconsumergroup/streamconsumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,19 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
backoff := scg.config.State.ModifyRetry.Backoff
attempts := scg.config.State.ModifyRetry.Attempts

err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error) {
err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error, int) {
state, stateMtimeNanoSeconds, stateMtimeSeconds, err := scg.getStateFromPersistency()
if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) {
return true, errors.Wrap(err, "Failed getting current state from persistency")
return true, errors.Wrap(err, "Failed getting current state from persistency"), 0
}
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error")
return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error"), 0
}

if state == nil {
state, err = newState()
if err != nil {
return true, errors.Wrap(err, "Failed to create state")
return true, errors.Wrap(err, "Failed to create state"), 0
}
}

Expand All @@ -137,9 +137,9 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
if errors.Is(errors.RootCause(err), errShardRetention) {

// if shard retention failed the member needs to be aborted, so we can stop retrying
return false, errors.Wrap(err, "Failed modifying state")
return false, errors.Wrap(err, "Failed modifying state"), 0
}
return true, errors.Wrap(err, "Failed modifying state")
return true, errors.Wrap(err, "Failed modifying state"), 0
}

// log only on change
Expand All @@ -157,14 +157,14 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier,
"attempt", attempt,
"err", errors.RootCause(err).Error())
}
return true, errors.Wrap(err, "Failed setting state in persistency state")
return true, errors.Wrap(err, "Failed setting state in persistency state"), 0
}

if err := handlePostSetStateInPersistency(); err != nil {
return false, errors.Wrap(err, "Failed handling post set state in persistency")
return false, errors.Wrap(err, "Failed handling post set state in persistency"), 0
}

return false, nil
return false, nil, 0
})

if err != nil {
Expand Down
24 changes: 12 additions & 12 deletions pkg/dataplane/test/streamconsumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,20 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
30,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
observedState, err := streamConsumerGroup.GetState()
suite.Require().NoError(err)
for _, sessionState := range observedState.SessionStates {
if sessionState.MemberID == member.streamConsumerGroupMember.GetID() {
suite.logger.DebugWith("Session state was not removed just yet")
return true, nil
return true, nil, 0
}
}

suite.logger.DebugWith("Session state was removed",
"observedState", observedState,
"memberID", member.id)
return false, nil
return false, nil, 0
})
})

Expand Down Expand Up @@ -270,7 +270,7 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
30,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
observedState, err := streamConsumerGroup.GetState()
suite.Require().NoError(err)

Expand All @@ -279,14 +279,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() {
suite.logger.DebugWith("retained shards",
"shards", sessionState.Shards,
"memberID", sessionState.MemberID)
return false, nil
return false, nil, 0
}
}

suite.logger.DebugWith("Session state shards were no retained just yet",
"sessionStates", observedState.SessionStates,
"memberID", member.streamConsumerGroupMember.GetID())
return true, nil
return true, nil, 0

})
})
Expand Down Expand Up @@ -368,14 +368,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() {
10,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
state, err = suite.getStateFromPersistency(suite.streamPath, consumerGroupName)
if err != nil {
suite.logger.DebugWith("State was not retrieved from persistency",
"err", err)
return true, err
return true, err, 0
}
return false, nil
return false, nil, 0
})
suite.Require().NoError(err)

Expand All @@ -391,14 +391,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() {
10,
&duration,
nil,
func(attempt int) (bool, error) {
func(attempt int) (bool, error, int) {
err = suite.setStateInPersistency(suite.streamPath, consumerGroupName, state)
if err != nil {
suite.logger.DebugWith("State was not set in persistency yet",
"err", err)
return true, err
return true, err, 0
}
return false, nil
return false, nil, 0
})
suite.Require().NoError(err)

Expand Down

0 comments on commit 9adc70d

Please sign in to comment.