Skip to content

Commit

Permalink
cmd: move shared logic from execute{Local,SGE}Job to helper
Browse files Browse the repository at this point in the history
executeLocalJob and executeSGEJob have nearly identical code for
running a command and handling the errors.  While the code is not too
complicated, it's worth consolidating because 1) it ensures that all
runners (local, SGE, and upcoming Slurm) behave in the same way (e.g.,
in terms of reporting errors and writing output files) and 2) the
logic is going to get a bit more involved with the next commit.
  • Loading branch information
kyleam committed Oct 10, 2024
1 parent 0ba9574 commit a6bc310
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 61 deletions.
34 changes: 5 additions & 29 deletions cmd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,6 @@ func WriteGitIgnoreFile(filepath string) error {

func executeLocalJob(model *NonMemModel) turnstile.ConcurrentError {
log.Infof("%s Beginning local work phase", model.LogIdentifier())
fs := afero.NewOsFs()

log.Debugf("Output directory is currently set to %s", model.OutputDir)

scriptLocation := path.Join(model.OutputDir, model.FileName+".sh")
Expand All @@ -458,33 +456,7 @@ func executeLocalJob(model *NonMemModel) turnstile.ConcurrentError {

log.Debugf("%s Generated command was: %s", model.LogIdentifier(), command.String())

output, err := command.CombinedOutput()

if err != nil && !strings.Contains(string(output), "not well-formed (invalid token)") {
log.Debug(err)

var exitError *exec.ExitError
if errors.As(err, &exitError) {
code := exitError.ExitCode()
log.Errorf("%s exit code: %d, output:\n%s", model.LogIdentifier(), code, string(output))
}

return turnstile.ConcurrentError{
RunIdentifier: model.Model,
Notes: "error running shell script",
Error: err,
}
}

if err = afero.WriteFile(fs, path.Join(model.OutputDir, model.Model+".out"), output, 0640); err != nil {
return turnstile.ConcurrentError{
RunIdentifier: model.FileName,
Notes: "failed to write model output",
Error: err,
}
}

return turnstile.ConcurrentError{}
return runModelCommand(model, command, localIgnoreError)
}

func localModelsFromArguments(args []string, config configlib.Config) ([]LocalModel, error) {
Expand Down Expand Up @@ -559,3 +531,7 @@ func HashFileOnChannel(ch chan string, file string, identifier string) {
}
ch <- fmt.Sprintf("%x", h.Sum(nil))
}

func localIgnoreError(_ error, output string) bool {
return strings.Contains(output, "not well-formed (invalid token)")
}
43 changes: 43 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

"github.com/metrumresearchgroup/bbi/configlib"

"github.com/metrumresearchgroup/turnstile"
log "github.com/sirupsen/logrus"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -334,3 +336,44 @@ func onlyBbiVariables(provided []string) []string {

return matched
}

// runModelCommand runs command, writing the combined standard output and
// standard error to {model.OutputDir}/{model.Model}.out. command should be
// primed to run, but Stdout and Stderr must not be set.
//
// ignoreError is a function called if running the command fails. It takes the
// error and combined standard output and standard error as arguments. A return
// value of true indicates that the error should be swallowed rather propagated
// as a turnstile.ConcurrentError.
func runModelCommand(
model *NonMemModel, command *exec.Cmd, ignoreError func(error, string) bool,
) turnstile.ConcurrentError {

output, err := command.CombinedOutput()
if err != nil && !ignoreError(err, string(output)) {
log.Debug(err)

var exitError *exec.ExitError
if errors.As(err, &exitError) {
code := exitError.ExitCode()
log.Errorf("%s exit code: %d, output:\n%s", model.LogIdentifier(), code, string(output))
}

return turnstile.ConcurrentError{
RunIdentifier: model.Model,
Notes: fmt.Sprintf("error running %q", command.String()),
Error: err,
}
}

fs := afero.NewOsFs()
if err = afero.WriteFile(fs, filepath.Join(model.OutputDir, model.Model+".out"), output, 0640); err != nil {
return turnstile.ConcurrentError{
RunIdentifier: model.Model,
Notes: "failed to write model output",
Error: err,
}
}

return turnstile.ConcurrentError{}
}
81 changes: 81 additions & 0 deletions cmd/run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package cmd

import (
"os"
"os/exec"
"path/filepath"
"strings"
"testing"

"github.com/metrumresearchgroup/turnstile"
"github.com/metrumresearchgroup/wrapt"
)

func skipIfNoSh(t *wrapt.T) {
t.Helper()
_, err := exec.LookPath("sh")
if err != nil {
t.Skipf("skipped because sh is not available")
}
}

func never(_ error, _ string) bool {
return false
}

func TestRunModelCommand(tt *testing.T) {
t := wrapt.WrapT(tt)

skipIfNoSh(t)

cmd := exec.Command("sh", "-c", "printf 'stdout\n'; printf >&2 'stderr\n'")
mod := &NonMemModel{Model: "foo.ctl", FileName: "foo", OutputDir: t.TempDir()}
cerr := runModelCommand(mod, cmd, never)
t.A.Equal(cerr, turnstile.ConcurrentError{})

outfile := filepath.Join(mod.OutputDir, mod.Model+".out")
t.R.FileExists(outfile)
bs, err := os.ReadFile(outfile)
t.R.NoError(err)
output = string(bs)

t.A.Contains(output, "stdout")
t.A.Contains(output, "stderr")
}

func TestRunModelCommandError(tt *testing.T) {
t := wrapt.WrapT(tt)

skipIfNoSh(t)

cmd := exec.Command("sh", "-c", "printf 'stdout\n'; printf >&2 'stderr\n'; exit 1")
mod := &NonMemModel{Model: "foo.ctl", FileName: "foo", OutputDir: t.TempDir()}

cerr := runModelCommand(mod, cmd, never)
t.A.Error(cerr.Error)
}

func TestRunModelCommandIgnoreError(tt *testing.T) {
t := wrapt.WrapT(tt)

skipIfNoSh(t)

cmd := exec.Command("sh", "-c", "printf 'stdout\n'; printf >&2 'IGNORE\n'; exit 1")
mod := &NonMemModel{Model: "foo.ctl", FileName: "foo", OutputDir: t.TempDir()}

ign := func(_ error, output string) bool {
return strings.Contains(output, "IGNORE")
}

cerr := runModelCommand(mod, cmd, ign)
t.A.Equal(cerr, turnstile.ConcurrentError{})

outfile := filepath.Join(mod.OutputDir, mod.Model+".out")
t.R.FileExists(outfile)
bs, err := os.ReadFile(outfile)
t.R.NoError(err)
output = string(bs)

t.A.Contains(output, "stdout")
t.A.Contains(output, "IGNORE")
}
38 changes: 6 additions & 32 deletions cmd/sge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"bytes"
"errors"
"fmt"
"os/exec"
"path"
Expand Down Expand Up @@ -253,7 +252,6 @@ func sge(_ *cobra.Command, args []string) {

func executeSGEJob(model *NonMemModel) turnstile.ConcurrentError {
log.Printf("%s Beginning SGE work phase", model.LogIdentifier())
fs := afero.NewOsFs()
//Execute the script we created

//Compute the grid name for submission
Expand Down Expand Up @@ -313,36 +311,7 @@ func executeSGEJob(model *NonMemModel) turnstile.ConcurrentError {
command.Env = append(command.Env, additionalEnvs...)
}

output, err := command.CombinedOutput()

if err != nil {
//Let's look to see if it's just because of the typical "No queues present" error
if !strings.Contains(string(output), "job is not allowed to run in any queue") {
var exitError *exec.ExitError
if errors.As(err, &exitError) {
code := exitError.ExitCode()
log.Errorf("%s exit code: %d, output:\n%s", model.LogIdentifier(), code, string(output))
}
//If the error doesn't appear to be the above error, we'll generate the concurrent error and move along
return turnstile.ConcurrentError{
RunIdentifier: model.Model,
Notes: "error running submission script",
Error: err,
}
}
}

err = afero.WriteFile(fs, path.Join(model.OutputDir, model.Model+".out"), output, 0640)

if err != nil {
return turnstile.ConcurrentError{
RunIdentifier: model.Model,
Notes: "failed to write model output",
Error: err,
}
}

return turnstile.ConcurrentError{}
return runModelCommand(model, command, sgeIgnoreError)
}

func sgeModelsFromArguments(args []string, config configlib.Config) ([]SGEModel, error) {
Expand Down Expand Up @@ -448,3 +417,8 @@ func gridengineJobName(model *NonMemModel) (string, error) {

return outBytesBuffer.String(), nil
}

func sgeIgnoreError(_ error, output string) bool {
// Ignore the error that occurs when no workers are available yet.
return strings.Contains(output, "job is not allowed to run in any queue")
}

0 comments on commit a6bc310

Please sign in to comment.