Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rokatyy committed Jan 14, 2025
1 parent 40bbc0e commit 0d34f11
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
10 changes: 8 additions & 2 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ func getFunctionName(fn interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}

// give either retryInterval or backoff
// RetryFunc give either retryInterval or backoff
// gets fn func(int) (bool, error, int) as parameter
// which returns:
// bool - whether should be retried
// error - whether error happened
// int - increments retries (allows manage retries count from inside of this function)
func RetryFunc(ctx context.Context,
loggerInstance logger.Logger,
attempts int,
Expand Down Expand Up @@ -189,10 +194,11 @@ func EngineErrorIsNonFatal(err error) bool {

func EngineErrorIsFatal(err error) bool {
var fatalEngineErrorsPartialMatch = []string{
"Failed to fetch record batches",
"lookup v3io-webapi: i/o timeout",
}
return errorMatches(err, fatalEngineErrorsPartialMatch)
}

func errorMatches(err error, substrings []string) bool {
if err != nil && len(err.Error()) > 0 {
for _, substring := range substrings {
Expand Down
3 changes: 2 additions & 1 deletion pkg/dataplane/streamconsumergroup/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
c.logger,
c.getShardLocationAttempts,
nil,
&c.getShardLocationBackoff, func(attempt int) (bool, error, int) {
&c.getShardLocationBackoff,
func(attempt int) (bool, error, int) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
if err != nil {
if common.EngineErrorIsNonFatal(err) {
Expand Down

0 comments on commit 0d34f11

Please sign in to comment.