From a6bc3107e5b3b72090569714b727d68d47bbc80f Mon Sep 17 00:00:00 2001 From: Kyle Meyer Date: Mon, 7 Oct 2024 16:33:18 -0400 Subject: [PATCH] cmd: move shared logic from execute{Local,SGE}Job to helper executeLocalJob and executeSGEJob have nearly identical code for running a command and handling the errors. While the code is not too complicated, it's worth consolidating because 1) it ensures that all runners (local, SGE, and upcoming Slurm) behave in the same way (e.g., in terms of reporting errors and writing output files) and 2) the logic is going to get a bit more involved with the next commit. --- cmd/local.go | 34 +++------------------ cmd/run.go | 43 ++++++++++++++++++++++++++ cmd/run_test.go | 81 +++++++++++++++++++++++++++++++++++++++++++++++++ cmd/sge.go | 38 ++++------------------- 4 files changed, 135 insertions(+), 61 deletions(-) create mode 100644 cmd/run_test.go diff --git a/cmd/local.go b/cmd/local.go index 73939e38..ba0cea26 100644 --- a/cmd/local.go +++ b/cmd/local.go @@ -435,8 +435,6 @@ func WriteGitIgnoreFile(filepath string) error { func executeLocalJob(model *NonMemModel) turnstile.ConcurrentError { log.Infof("%s Beginning local work phase", model.LogIdentifier()) - fs := afero.NewOsFs() - log.Debugf("Output directory is currently set to %s", model.OutputDir) scriptLocation := path.Join(model.OutputDir, model.FileName+".sh") @@ -458,33 +456,7 @@ func executeLocalJob(model *NonMemModel) turnstile.ConcurrentError { log.Debugf("%s Generated command was: %s", model.LogIdentifier(), command.String()) - output, err := command.CombinedOutput() - - if err != nil && !strings.Contains(string(output), "not well-formed (invalid token)") { - log.Debug(err) - - var exitError *exec.ExitError - if errors.As(err, &exitError) { - code := exitError.ExitCode() - log.Errorf("%s exit code: %d, output:\n%s", model.LogIdentifier(), code, string(output)) - } - - return turnstile.ConcurrentError{ - RunIdentifier: model.Model, - Notes: "error running shell script", - Error: err, - } - } - - if err = afero.WriteFile(fs, path.Join(model.OutputDir, model.Model+".out"), output, 0640); err != nil { - return turnstile.ConcurrentError{ - RunIdentifier: model.FileName, - Notes: "failed to write model output", - Error: err, - } - } - - return turnstile.ConcurrentError{} + return runModelCommand(model, command, localIgnoreError) } func localModelsFromArguments(args []string, config configlib.Config) ([]LocalModel, error) { @@ -559,3 +531,7 @@ func HashFileOnChannel(ch chan string, file string, identifier string) { } ch <- fmt.Sprintf("%x", h.Sum(nil)) } + +func localIgnoreError(_ error, output string) bool { + return strings.Contains(output, "not well-formed (invalid token)") +} diff --git a/cmd/run.go b/cmd/run.go index fba72915..211e8a13 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -13,7 +13,9 @@ import ( "github.com/metrumresearchgroup/bbi/configlib" + "github.com/metrumresearchgroup/turnstile" log "github.com/sirupsen/logrus" + "github.com/spf13/afero" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -334,3 +336,44 @@ func onlyBbiVariables(provided []string) []string { return matched } + +// runModelCommand runs command, writing the combined standard output and +// standard error to {model.OutputDir}/{model.Model}.out. command should be +// primed to run, but Stdout and Stderr must not be set. +// +// ignoreError is a function called if running the command fails. It takes the +// error and combined standard output and standard error as arguments. A return +// value of true indicates that the error should be swallowed rather propagated +// as a turnstile.ConcurrentError. +func runModelCommand( + model *NonMemModel, command *exec.Cmd, ignoreError func(error, string) bool, +) turnstile.ConcurrentError { + + output, err := command.CombinedOutput() + if err != nil && !ignoreError(err, string(output)) { + log.Debug(err) + + var exitError *exec.ExitError + if errors.As(err, &exitError) { + code := exitError.ExitCode() + log.Errorf("%s exit code: %d, output:\n%s", model.LogIdentifier(), code, string(output)) + } + + return turnstile.ConcurrentError{ + RunIdentifier: model.Model, + Notes: fmt.Sprintf("error running %q", command.String()), + Error: err, + } + } + + fs := afero.NewOsFs() + if err = afero.WriteFile(fs, filepath.Join(model.OutputDir, model.Model+".out"), output, 0640); err != nil { + return turnstile.ConcurrentError{ + RunIdentifier: model.Model, + Notes: "failed to write model output", + Error: err, + } + } + + return turnstile.ConcurrentError{} +} diff --git a/cmd/run_test.go b/cmd/run_test.go new file mode 100644 index 00000000..e23fbc7d --- /dev/null +++ b/cmd/run_test.go @@ -0,0 +1,81 @@ +package cmd + +import ( + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/metrumresearchgroup/turnstile" + "github.com/metrumresearchgroup/wrapt" +) + +func skipIfNoSh(t *wrapt.T) { + t.Helper() + _, err := exec.LookPath("sh") + if err != nil { + t.Skipf("skipped because sh is not available") + } +} + +func never(_ error, _ string) bool { + return false +} + +func TestRunModelCommand(tt *testing.T) { + t := wrapt.WrapT(tt) + + skipIfNoSh(t) + + cmd := exec.Command("sh", "-c", "printf 'stdout\n'; printf >&2 'stderr\n'") + mod := &NonMemModel{Model: "foo.ctl", FileName: "foo", OutputDir: t.TempDir()} + cerr := runModelCommand(mod, cmd, never) + t.A.Equal(cerr, turnstile.ConcurrentError{}) + + outfile := filepath.Join(mod.OutputDir, mod.Model+".out") + t.R.FileExists(outfile) + bs, err := os.ReadFile(outfile) + t.R.NoError(err) + output = string(bs) + + t.A.Contains(output, "stdout") + t.A.Contains(output, "stderr") +} + +func TestRunModelCommandError(tt *testing.T) { + t := wrapt.WrapT(tt) + + skipIfNoSh(t) + + cmd := exec.Command("sh", "-c", "printf 'stdout\n'; printf >&2 'stderr\n'; exit 1") + mod := &NonMemModel{Model: "foo.ctl", FileName: "foo", OutputDir: t.TempDir()} + + cerr := runModelCommand(mod, cmd, never) + t.A.Error(cerr.Error) +} + +func TestRunModelCommandIgnoreError(tt *testing.T) { + t := wrapt.WrapT(tt) + + skipIfNoSh(t) + + cmd := exec.Command("sh", "-c", "printf 'stdout\n'; printf >&2 'IGNORE\n'; exit 1") + mod := &NonMemModel{Model: "foo.ctl", FileName: "foo", OutputDir: t.TempDir()} + + ign := func(_ error, output string) bool { + return strings.Contains(output, "IGNORE") + } + + cerr := runModelCommand(mod, cmd, ign) + t.A.Equal(cerr, turnstile.ConcurrentError{}) + + outfile := filepath.Join(mod.OutputDir, mod.Model+".out") + t.R.FileExists(outfile) + bs, err := os.ReadFile(outfile) + t.R.NoError(err) + output = string(bs) + + t.A.Contains(output, "stdout") + t.A.Contains(output, "IGNORE") +} diff --git a/cmd/sge.go b/cmd/sge.go index 8d059149..5ed9b246 100644 --- a/cmd/sge.go +++ b/cmd/sge.go @@ -2,7 +2,6 @@ package cmd import ( "bytes" - "errors" "fmt" "os/exec" "path" @@ -253,7 +252,6 @@ func sge(_ *cobra.Command, args []string) { func executeSGEJob(model *NonMemModel) turnstile.ConcurrentError { log.Printf("%s Beginning SGE work phase", model.LogIdentifier()) - fs := afero.NewOsFs() //Execute the script we created //Compute the grid name for submission @@ -313,36 +311,7 @@ func executeSGEJob(model *NonMemModel) turnstile.ConcurrentError { command.Env = append(command.Env, additionalEnvs...) } - output, err := command.CombinedOutput() - - if err != nil { - //Let's look to see if it's just because of the typical "No queues present" error - if !strings.Contains(string(output), "job is not allowed to run in any queue") { - var exitError *exec.ExitError - if errors.As(err, &exitError) { - code := exitError.ExitCode() - log.Errorf("%s exit code: %d, output:\n%s", model.LogIdentifier(), code, string(output)) - } - //If the error doesn't appear to be the above error, we'll generate the concurrent error and move along - return turnstile.ConcurrentError{ - RunIdentifier: model.Model, - Notes: "error running submission script", - Error: err, - } - } - } - - err = afero.WriteFile(fs, path.Join(model.OutputDir, model.Model+".out"), output, 0640) - - if err != nil { - return turnstile.ConcurrentError{ - RunIdentifier: model.Model, - Notes: "failed to write model output", - Error: err, - } - } - - return turnstile.ConcurrentError{} + return runModelCommand(model, command, sgeIgnoreError) } func sgeModelsFromArguments(args []string, config configlib.Config) ([]SGEModel, error) { @@ -448,3 +417,8 @@ func gridengineJobName(model *NonMemModel) (string, error) { return outBytesBuffer.String(), nil } + +func sgeIgnoreError(_ error, output string) bool { + // Ignore the error that occurs when no workers are available yet. + return strings.Contains(output, "job is not allowed to run in any queue") +}