diff --git a/pkg/common/helper.go b/pkg/common/helper.go index cefd327..2c7cc6f 100644 --- a/pkg/common/helper.go +++ b/pkg/common/helper.go @@ -35,7 +35,12 @@ func getFunctionName(fn interface{}) string { return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() } -// give either retryInterval or backoff +// RetryFunc give either retryInterval or backoff +// gets fn func(int) (bool, error, int) as parameter +// which returns: +// bool - whether should be retried +// error - whether error happened +// int - increments retries (allows manage retries count from inside of this function) func RetryFunc(ctx context.Context, loggerInstance logger.Logger, attempts int, @@ -189,10 +194,11 @@ func EngineErrorIsNonFatal(err error) bool { func EngineErrorIsFatal(err error) bool { var fatalEngineErrorsPartialMatch = []string{ - "Failed to fetch record batches", + "lookup v3io-webapi: i/o timeout", } return errorMatches(err, fatalEngineErrorsPartialMatch) } + func errorMatches(err error, substrings []string) bool { if err != nil && len(err.Error()) > 0 { for _, substring := range substrings { diff --git a/pkg/dataplane/streamconsumergroup/claim.go b/pkg/dataplane/streamconsumergroup/claim.go index 003b10d..dd8ef9f 100644 --- a/pkg/dataplane/streamconsumergroup/claim.go +++ b/pkg/dataplane/streamconsumergroup/claim.go @@ -126,7 +126,8 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time c.logger, c.getShardLocationAttempts, nil, - &c.getShardLocationBackoff, func(attempt int) (bool, error, int) { + &c.getShardLocationBackoff, + func(attempt int) (bool, error, int) { c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID) if err != nil { if common.EngineErrorIsNonFatal(err) {