Skip to content

Commit

Permalink
add presetName label to metric
Browse files Browse the repository at this point in the history
  • Loading branch information
staffbase-robert committed Sep 26, 2023
1 parent 38bd9da commit cd0ea22
Showing 1 changed file with 62 additions and 33 deletions.
95 changes: 62 additions & 33 deletions pkg/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,65 @@ func (s *Spark) submitArgs(presetName string) ([]string, error) {
return args, nil
}

var submitCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "spark_exec_total",
Help: "The total number of spark-submit runs",
}, []string{"preset", "status"})

var retryCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "retry_total",
Help: "The total number of retries",
}, []string{"preset"})

func (s *Spark) Submit(presetName string) error {
args, err := s.submitArgs(presetName)
if err != nil {
return fmt.Errorf("couldn't build submit args, %w", err)
}

zap.L().Info("submit with args", zap.Any("args", args))
go s.exec("submit", args)
cmd := exec.Command(s.binaryPath, args...)
zap.L().Info("spark-submit", zap.Strings("args", args))
if s.debug {
writer := &zapio.Writer{Log: zap.L(), Level: zap.DebugLevel}
cmd.Stderr = writer
cmd.Stdout = writer
defer writer.Close()
}

go func() {
if err := retry(10, 1*time.Second, 2, 5*time.Minute, func() error {
retryCounter.WithLabelValues(presetName).Inc()
return cmd.Run()
}); err != nil {
zap.L().Error("spark submit failed with retries", zap.Error(err))
submitCounter.WithLabelValues(presetName, "failure").Inc()
}
submitCounter.WithLabelValues(presetName, "success").Inc()
}()

return nil
}

func (s *Spark) kill(args []string) {
cmd := exec.Command(s.binaryPath, args...)
zap.L().Info("spark-submit", zap.Strings("args", args))
if s.debug {
writer := &zapio.Writer{Log: zap.L(), Level: zap.DebugLevel}
cmd.Stderr = writer
cmd.Stdout = writer
defer writer.Close()
}

if err := retry(10, 1*time.Second, 2, 5*time.Minute, func() error {
return cmd.Run()
}); err != nil {
zap.L().Error("spark submit failed with retries", zap.Error(err))
submitCounter.WithLabelValues("failure").Inc()
}
submitCounter.WithLabelValues("success").Inc()
}

func (s *Spark) buildArgs(kind string, namespace, name string) []string {
args := make([]string, 0)
args = append(args, fmt.Sprintf("--master=%s", s.master))
Expand All @@ -139,8 +187,19 @@ func (s *Spark) buildArgs(kind string, namespace, name string) []string {
}

func (s *Spark) Kill(namespace, name string) {
kind := "kill"
s.exec(kind, s.buildArgs(kind, namespace, name))
args := s.buildArgs("kill", namespace, name)
cmd := exec.Command(s.binaryPath, args...)
zap.L().Info("spark-submit", zap.Strings("args", args))
if s.debug {
writer := &zapio.Writer{Log: zap.L(), Level: zap.DebugLevel}
cmd.Stderr = writer
cmd.Stdout = writer
defer writer.Close()
}

if err := cmd.Run(); err != nil {
zap.L().Error("killing spark app failed", zap.Error(err))
}
}

func (s *Spark) Status(namespace, name string) string {
Expand All @@ -157,35 +216,6 @@ func (s *Spark) Status(namespace, name string) string {
return buffer.String()
}

var execMetrics = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "spark_exec_total",
Help: "The total number of spark-submit runs",
}, []string{"kind", "status"})

func (s *Spark) exec(kind string, args []string) {
cmd := exec.Command(s.binaryPath, args...)
zap.L().Info("spark-submit", zap.Strings("args", args))
if s.debug {
writer := &zapio.Writer{Log: zap.L(), Level: zap.DebugLevel}
cmd.Stderr = writer
cmd.Stdout = writer
defer writer.Close()
}

if err := retry(10, 1*time.Second, 2, 5*time.Minute, func() error {
return cmd.Run()
}); err != nil {
zap.L().Error("spark submit failed with retries", zap.Error(err))
execMetrics.WithLabelValues("failure").Inc()
}
execMetrics.WithLabelValues("success").Inc()
}

var retryMetrics = promauto.NewCounter(prometheus.CounterOpts{
Name: "retry_total",
Help: "The total number of retries",
})

func retry(retries int, initialDelay time.Duration, mult int, maxWait time.Duration, fn func() error) error {
delay := initialDelay
for try := 0; try < retries; try++ {
Expand All @@ -197,7 +227,6 @@ func retry(retries int, initialDelay time.Duration, mult int, maxWait time.Durat
zap.Int("try", try),
zap.String("waitDuration", delay.String()),
)
retryMetrics.Inc()
}
time.Sleep(delay)
delay = delay * time.Duration(mult)
Expand Down

0 comments on commit cd0ea22

Please sign in to comment.