Skip to content

Commit

Permalink
Merge pull request #7 from Staffbase/retry-adjustments
Browse files Browse the repository at this point in the history
adjust retry logic
  • Loading branch information
staffbase-robert authored Sep 26, 2023
2 parents 8c0b440 + 5674c52 commit f9c38da
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions pkg/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,22 @@ func (s *Spark) Submit(presetName string) error {
if err != nil {
return fmt.Errorf("couldn't build submit args, %w", err)
}

zap.L().Info("submit with args", zap.Any("args", args))
cmd := exec.Command(s.binaryPath, args...)
zap.L().Info("spark-submit", zap.Strings("args", args))
if s.debug {
writer := &zapio.Writer{Log: zap.L(), Level: zap.DebugLevel}
cmd.Stderr = writer
cmd.Stdout = writer
defer writer.Close()
}

go func() {
if err := retry(10, 1*time.Second, 2, 5*time.Minute, func() error {
retryCounter.WithLabelValues(presetName).Inc()
isFirstRun := true
if err := retry(10, 1*time.Second, 2, 3*time.Minute, func() error {
cmd := exec.Command(s.binaryPath, args...)
zap.L().Info("spark-submit", zap.Strings("args", args))
if s.debug {
writer := &zapio.Writer{Log: zap.L(), Level: zap.DebugLevel}
cmd.Stderr = writer
cmd.Stdout = writer
defer writer.Close()
}
if !isFirstRun {
retryCounter.WithLabelValues(presetName).Inc()
}
isFirstRun = false
return cmd.Run()
}); err != nil {
zap.L().Error("spark submit failed with retries", zap.Error(err))
Expand Down

0 comments on commit f9c38da

Please sign in to comment.