Skip to content

Commit

Permalink
Merge pull request #5 from Staffbase/retry-submit
Browse files Browse the repository at this point in the history
retry `cmd.Exec`
  • Loading branch information
staffbase-robert authored Sep 26, 2023
2 parents 42d9b3e + 04865f7 commit 5fcac44
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
29 changes: 27 additions & 2 deletions pkg/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path"
"path/filepath"
"strings"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapio"
Expand Down Expand Up @@ -162,7 +163,31 @@ func (s *Spark) exec(args []string) {
cmd.Stdout = writer
defer writer.Close()
}
if err := cmd.Run(); err != nil {
zap.L().Error("spark-submit failed", zap.Error(err))

if err := retry(10, 1*time.Second, 2, 5*time.Minute, func() error {
return cmd.Run()
}); err != nil {
zap.L().Error("spark submit failed with retries", zap.Error(err))
}
}

func retry(retries int, initialDelay time.Duration, mult int, maxWait time.Duration, fn func() error) error {
delay := initialDelay
for try := 0; try < retries; try++ {
if err := fn(); err == nil {
return nil
} else {
zap.L().Warn(
"retry failed",
zap.Int("try", try),
zap.String("waitDuration", delay.String()),
)
}
time.Sleep(delay)
delay = delay * time.Duration(mult)
if delay >= maxWait {
delay = maxWait
}
}
return fmt.Errorf("retries exceeded")
}
15 changes: 15 additions & 0 deletions pkg/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ limitations under the License.
package spark

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -75,4 +77,17 @@ func TestSpark(t *testing.T) {
"--status=namespace:name",
}, args)
})

t.Run("retry works", func(t *testing.T) {
try := 0
fn := func() error {
if try >= 2 {
return nil
}
try++
return fmt.Errorf("error in fn")
}
require.NoError(t, retry(3, 1*time.Nanosecond, 2, 1*time.Second, fn))
require.Greater(t, try, 1)
})
}

0 comments on commit 5fcac44

Please sign in to comment.