diff --git a/ibex/heartbeat.go b/ibex/heartbeat.go index abe28b04..477e9b68 100644 --- a/ibex/heartbeat.go +++ b/ibex/heartbeat.go @@ -36,6 +36,7 @@ func heartbeat() { } var resp types.ReportResponse + err := client.GetCli().Call("Server.Report", req, &resp) if err != nil { diff --git a/ibex/task.go b/ibex/task.go index 3625b01e..47892bcb 100644 --- a/ibex/task.go +++ b/ibex/task.go @@ -3,8 +3,10 @@ package ibex import ( + "bufio" "bytes" "fmt" + "io" "log" "os/exec" "os/user" @@ -281,28 +283,65 @@ func (t *Task) start() { } } - cmd.Stdout = &t.Stdout - cmd.Stderr = &t.Stderr cmd.Stdin = t.Stdin t.Cmd = cmd + stdout, err := t.Cmd.StdoutPipe() + if err != nil { + log.Printf("E! cannot read ouput of task[%d]: %v", t.Id, err) + } + + stderr, err := t.Cmd.StderrPipe() + + if err != nil { + log.Printf("E! cannot read err of task[%d]: %v", t.Id, err) + } + err = CmdStart(cmd) + if err != nil { log.Printf("E! cannot start cmd of task[%d]: %v", t.Id, err) return } - go runProcess(t) + go runProcessRealtime(stdout, stderr, t) } func (t *Task) kill() { go killProcess(t) } -func runProcess(t *Task) { +func runProcessRealtime(stdout io.ReadCloser, stderr io.ReadCloser, t *Task) { t.SetAlive(true) defer t.SetAlive(false) + reader := bufio.NewReader(stdout) + + go func() { + for { + line, err2 := reader.ReadString('\n') + if err2 != nil || io.EOF == err2 { + break + } + t.Stdout.WriteString(line) + + persistResult(t) + } + }() + + errReader := bufio.NewReader(stderr) + + go func() { + for { + line, err2 := errReader.ReadString('\n') + if err2 != nil || io.EOF == err2 { + break + } + t.Stderr.WriteString(line) + persistResult(t) + } + }() + err := t.Cmd.Wait() if err != nil { if strings.Contains(err.Error(), "signal: killed") { @@ -326,7 +365,6 @@ func runProcess(t *Task) { func persistResult(t *Task) { metadir := config.Config.Ibex.MetaDir - stdout := filepath.Join(metadir, fmt.Sprint(t.Id), "stdout") stderr := filepath.Join(metadir, fmt.Sprint(t.Id), "stderr") doneFlag := filepath.Join(metadir, fmt.Sprint(t.Id), fmt.Sprintf("%d.done", t.Clock)) diff --git a/ibex/tasks.go b/ibex/tasks.go index f4964db2..308e079c 100644 --- a/ibex/tasks.go +++ b/ibex/tasks.go @@ -20,7 +20,7 @@ func (lt *LocalTasksT) ReportTasks() []types.ReportTask { rt := types.ReportTask{Id: id, Clock: t.Clock} rt.Status = t.GetStatus() - if rt.Status == "running" || rt.Status == "killing" { + if rt.Status == "killing" { // intermediate state continue }