diff --git a/pkg/common/helper.go b/pkg/common/helper.go index d96fee7..2c7cc6f 100644 --- a/pkg/common/helper.go +++ b/pkg/common/helper.go @@ -35,19 +35,30 @@ func getFunctionName(fn interface{}) string { return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() } -// give either retryInterval or backoff +// RetryFunc give either retryInterval or backoff +// gets fn func(int) (bool, error, int) as parameter +// which returns: +// bool - whether should be retried +// error - whether error happened +// int - increments retries (allows manage retries count from inside of this function) func RetryFunc(ctx context.Context, loggerInstance logger.Logger, attempts int, retryInterval *time.Duration, backoff *Backoff, - fn func(int) (bool, error)) error { + fn func(int) (bool, error, int)) error { var err error var retry bool + var addAttempts int - for attempt := 1; attempt <= attempts; attempt++ { - retry, err = fn(attempt) + var attempt = 0 + for attempt <= attempts { + + attempt++ + // some errors might require more attempts than expected, so allow incrementing attempts from outside + retry, err, addAttempts = fn(attempt) + attempts += addAttempts // if there's no need to retry - we're done if !retry { @@ -178,9 +189,20 @@ func EngineErrorIsNonFatal(err error) bool { "timeout", "refused", } + return errorMatches(err, nonFatalEngineErrorsPartialMatch) +} + +func EngineErrorIsFatal(err error) bool { + var fatalEngineErrorsPartialMatch = []string{ + "lookup v3io-webapi: i/o timeout", + } + return errorMatches(err, fatalEngineErrorsPartialMatch) +} + +func errorMatches(err error, substrings []string) bool { if err != nil && len(err.Error()) > 0 { - for _, nonFatalError := range nonFatalEngineErrorsPartialMatch { - if strings.Contains(err.Error(), nonFatalError) || strings.Contains(errors.Cause(err).Error(), nonFatalError) { + for _, substring := range substrings { + if strings.Contains(err.Error(), substring) || strings.Contains(errors.Cause(err).Error(), substring) { return true } } diff --git a/pkg/dataplane/streamconsumergroup/claim.go b/pkg/dataplane/streamconsumergroup/claim.go index 6555a15..912c68f 100644 --- a/pkg/dataplane/streamconsumergroup/claim.go +++ b/pkg/dataplane/streamconsumergroup/claim.go @@ -126,31 +126,45 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time c.logger, c.getShardLocationAttempts, nil, - &c.getShardLocationBackoff, func(attempt int) (bool, error) { + &c.getShardLocationBackoff, + func(attempt int) (bool, error, int) { c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID) if err != nil { if common.EngineErrorIsNonFatal(err) { - return true, errors.Wrap(err, "Failed to get shard location due to a network error") + return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0 } + + // if the error is fatal and requires external resolution, + // we don't want to fail; instead, we will inform the user via a log + if common.EngineErrorIsFatal(err) { + c.logger.ErrorWith("A fatal error occurred. Will retry until successful", + "error", err, + "shard", c.shardID) + // for this type of error, we always increment the attempt counter + // this ensures the smooth operation of other components in Nuclio + // we avoid panicking and simply wait for the issue to be resolved + return true, errors.Wrap(err, "Failed to get shard location"), 1 + } + // requested for an immediate stop if err == v3ioerrors.ErrStopped { - return false, nil + return false, nil, 0 } switch errors.RootCause(err).(type) { // in case of a network error, retry to avoid transient errors case *net.OpError: - return true, errors.Wrap(err, "Failed to get shard location due to a network error") + return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0 // unknown error, fail now default: - return false, errors.Wrap(err, "Failed to get shard location") + return false, errors.Wrap(err, "Failed to get shard location"), 0 } } // we have shard location - return false, nil + return false, nil, 0 }); err != nil { return errors.Wrapf(err, "Failed to get shard location state, attempts exhausted. shard id: %d", diff --git a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go index fd5b5a6..9433d1e 100644 --- a/pkg/dataplane/streamconsumergroup/streamconsumergroup.go +++ b/pkg/dataplane/streamconsumergroup/streamconsumergroup.go @@ -113,19 +113,19 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier, backoff := scg.config.State.ModifyRetry.Backoff attempts := scg.config.State.ModifyRetry.Attempts - err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error) { + err := common.RetryFunc(context.TODO(), scg.logger, attempts, nil, &backoff, func(attempt int) (bool, error, int) { state, stateMtimeNanoSeconds, stateMtimeSeconds, err := scg.getStateFromPersistency() if err != nil && !errors.Is(err, v3ioerrors.ErrNotFound) { - return true, errors.Wrap(err, "Failed getting current state from persistency") + return true, errors.Wrap(err, "Failed getting current state from persistency"), 0 } if common.EngineErrorIsNonFatal(err) { - return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error") + return true, errors.Wrap(err, "Failed getting current state from persistency due to a network error"), 0 } if state == nil { state, err = newState() if err != nil { - return true, errors.Wrap(err, "Failed to create state") + return true, errors.Wrap(err, "Failed to create state"), 0 } } @@ -137,9 +137,9 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier, if errors.Is(errors.RootCause(err), errShardRetention) { // if shard retention failed the member needs to be aborted, so we can stop retrying - return false, errors.Wrap(err, "Failed modifying state") + return false, errors.Wrap(err, "Failed modifying state"), 0 } - return true, errors.Wrap(err, "Failed modifying state") + return true, errors.Wrap(err, "Failed modifying state"), 0 } // log only on change @@ -157,14 +157,14 @@ func (scg *streamConsumerGroup) setState(modifier stateModifier, "attempt", attempt, "err", errors.RootCause(err).Error()) } - return true, errors.Wrap(err, "Failed setting state in persistency state") + return true, errors.Wrap(err, "Failed setting state in persistency state"), 0 } if err := handlePostSetStateInPersistency(); err != nil { - return false, errors.Wrap(err, "Failed handling post set state in persistency") + return false, errors.Wrap(err, "Failed handling post set state in persistency"), 0 } - return false, nil + return false, nil, 0 }) if err != nil { diff --git a/pkg/dataplane/test/streamconsumergroup_test.go b/pkg/dataplane/test/streamconsumergroup_test.go index 7fcf8af..850e8a1 100644 --- a/pkg/dataplane/test/streamconsumergroup_test.go +++ b/pkg/dataplane/test/streamconsumergroup_test.go @@ -214,20 +214,20 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() { 30, &duration, nil, - func(attempt int) (bool, error) { + func(attempt int) (bool, error, int) { observedState, err := streamConsumerGroup.GetState() suite.Require().NoError(err) for _, sessionState := range observedState.SessionStates { if sessionState.MemberID == member.streamConsumerGroupMember.GetID() { suite.logger.DebugWith("Session state was not removed just yet") - return true, nil + return true, nil, 0 } } suite.logger.DebugWith("Session state was removed", "observedState", observedState, "memberID", member.id) - return false, nil + return false, nil, 0 }) }) @@ -270,7 +270,7 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() { 30, &duration, nil, - func(attempt int) (bool, error) { + func(attempt int) (bool, error, int) { observedState, err := streamConsumerGroup.GetState() suite.Require().NoError(err) @@ -279,14 +279,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerRetainShards() { suite.logger.DebugWith("retained shards", "shards", sessionState.Shards, "memberID", sessionState.MemberID) - return false, nil + return false, nil, 0 } } suite.logger.DebugWith("Session state shards were no retained just yet", "sessionStates", observedState.SessionStates, "memberID", member.streamConsumerGroupMember.GetID()) - return true, nil + return true, nil, 0 }) }) @@ -368,14 +368,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() { 10, &duration, nil, - func(attempt int) (bool, error) { + func(attempt int) (bool, error, int) { state, err = suite.getStateFromPersistency(suite.streamPath, consumerGroupName) if err != nil { suite.logger.DebugWith("State was not retrieved from persistency", "err", err) - return true, err + return true, err, 0 } - return false, nil + return false, nil, 0 }) suite.Require().NoError(err) @@ -391,14 +391,14 @@ func (suite *streamConsumerGroupTestSuite) TestStateHandlerAbort() { 10, &duration, nil, - func(attempt int) (bool, error) { + func(attempt int) (bool, error, int) { err = suite.setStateInPersistency(suite.streamPath, consumerGroupName, state) if err != nil { suite.logger.DebugWith("State was not set in persistency yet", "err", err) - return true, err + return true, err, 0 } - return false, nil + return false, nil, 0 }) suite.Require().NoError(err)