diff --git a/client/operations/get_workflow_logs_responses.go b/client/operations/get_workflow_logs_responses.go index b6889fa..f350bb1 100644 --- a/client/operations/get_workflow_logs_responses.go +++ b/client/operations/get_workflow_logs_responses.go @@ -567,6 +567,9 @@ swagger:model GetWorkflowLogsOKBody */ type GetWorkflowLogsOKBody struct { + // live logs enabled + LiveLogsEnabled bool `json:"live_logs_enabled,omitempty"` + // logs Logs string `json:"logs,omitempty"` diff --git a/cmd/logs.go b/cmd/logs.go index 26867ad..26a1c98 100644 --- a/cmd/logs.go +++ b/cmd/logs.go @@ -1,6 +1,6 @@ /* This file is part of REANA. -Copyright (C) 2022 CERN. +Copyright (C) 2022, 2024 CERN. REANA is free software; you can redistribute it and/or modify it under the terms of the MIT License; see LICENSE file for more details. @@ -11,12 +11,14 @@ package cmd import ( "encoding/json" "fmt" + "io" "reanahub/reana-client-go/client" "reanahub/reana-client-go/client/operations" "reanahub/reana-client-go/pkg/config" "reanahub/reana-client-go/pkg/displayer" "reanahub/reana-client-go/pkg/filterer" "strings" + "time" "github.com/jedib0t/go-pretty/v6/text" @@ -27,15 +29,17 @@ import ( const logsDesc = ` Get workflow logs. -The ` + "``logs``" + ` command allows to retrieve logs of running workflow. Note that -only finished steps of the workflow are returned, the logs of the currently -processed step is not returned until it is finished. +The ` + "``logs``" + ` command allows to retrieve logs of a running workflow. Examples: $ reana-client logs -w myanalysis.42 -$ reana-client logs -w myanalysis.42 -s 1st_ste +$ reana-client logs -w myanalysis.42 --json + +$ reana-client logs -w myanalysis.42 --filter status=running + +$ reana-client logs -w myanalysis.42 --filter step=myfit --follow ` const logsFilterFlagDesc = `Filter job logs to include only those steps that @@ -43,6 +47,12 @@ match certain filtering criteria. Use --filter name=value pairs. Available filters are compute_backend, docker_img, status and step.` +// logsFollowMinInterval is the minimum interval between log polling. +const logsFollowMinInterval = 1 + +// logsFollowDefautlInterval is the default interval between log polling. +const logsFollowDefautlInterval = 10 + // logs struct that contains the logs of a workflow. // Pointers used for nullable values type logs struct { @@ -65,6 +75,7 @@ type jobLogItem struct { FinishedAt *string `json:"finished_at"` } +// logsOptions struct that contains the options of the logs command. type logsOptions struct { token string workflow string @@ -72,6 +83,14 @@ type logsOptions struct { filters []string page int64 size int64 + follow bool + interval int64 +} + +// logsCommandRunner struct that executes logs command. +type logsCommandRunner struct { + api *client.API + options *logsOptions } // newLogsCmd creates a command to get workflow logs. @@ -84,7 +103,12 @@ func newLogsCmd() *cobra.Command { Long: logsDesc, Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { - return o.run(cmd) + api, err := client.ApiClient() + if err != nil { + return err + } + runner := newLogsCommandRunner(api, o) + return runner.run(cmd) }, } @@ -101,12 +125,36 @@ func newLogsCmd() *cobra.Command { f.StringSliceVar(&o.filters, "filter", []string{}, logsFilterFlagDesc) f.Int64Var(&o.page, "page", 1, "Results page number (to be used with --size).") f.Int64Var(&o.size, "size", 0, "Size of results per page (to be used with --page).") + f.BoolVar( + &o.follow, + "follow", + false, + "Follow the logs of a running workflow or job (similar to tail -f).", + ) + f.Int64VarP( + &o.interval, + "interval", + "i", + logsFollowDefautlInterval, + fmt.Sprintf( + "Sleep time in seconds between log polling if log following is enabled. [default=%d]", + logsFollowDefautlInterval, + ), + ) return cmd } -func (o *logsOptions) run(cmd *cobra.Command) error { - filters, err := parseLogsFilters(o.filters) +// newLogsCommandRunner creates a new logs command runner. +func newLogsCommandRunner(api *client.API, options *logsOptions) *logsCommandRunner { + return &logsCommandRunner{api: api, options: options} +} + +// run executes the logs command. +func (r *logsCommandRunner) run(cmd *cobra.Command) error { + r.validateOptions(cmd.OutOrStdout()) + + filters, err := parseLogsFilters(r.options.filters) if err != nil { return err } @@ -116,25 +164,159 @@ func (o *logsOptions) run(cmd *cobra.Command) error { } logsParams := operations.NewGetWorkflowLogsParams() - logsParams.SetAccessToken(&o.token) - logsParams.SetWorkflowIDOrName(o.workflow) - logsParams.SetPage(&o.page) + logsParams.SetAccessToken(&r.options.token) + logsParams.SetWorkflowIDOrName(r.options.workflow) + logsParams.SetPage(&r.options.page) logsParams.SetSteps(steps) if cmd.Flags().Changed("size") { - logsParams.SetSize(&o.size) + logsParams.SetSize(&r.options.size) + } + + if r.options.follow { + return r.followLogs(logsParams, cmd, steps) + } + + return r.retrieveLogs(filters, logsParams, cmd, steps) +} + +// followLogs follows the logs of a running workflow or job. +func (r *logsCommandRunner) followLogs( + logsParams *operations.GetWorkflowLogsParams, + cmd *cobra.Command, + steps []string, +) error { + stepLength := len(steps) + var step, previousLogs string + stdout := cmd.OutOrStdout() + + if stepLength > 0 { + step = steps[0] + } + + if stepLength > 1 { + displayer.DisplayMessage( + "Only one step can be followed at a time, ignoring additional steps.", + displayer.Warning, + false, + stdout, + ) + logsParams.SetSteps([]string{step}) + } + + workflowStatusParams := operations.NewGetWorkflowStatusParams() + workflowStatusParams.SetAccessToken(&r.options.token) + workflowStatusParams.SetWorkflowIDOrName(r.options.workflow) + + for { + newLogs, status, err := r.getLogsWithStatus(step, logsParams, workflowStatusParams) + if err != nil { + return err + } + + fmt.Fprint(stdout, strings.TrimPrefix(newLogs, previousLogs)) + + if slices.Contains(config.WorkflowCompletedStatuses, status) { + subject := "Workflow" + if stepLength > 0 { + subject = "Job" + } + displayer.DisplayMessage( + fmt.Sprintf( + "%s has completed, you might want to rerun the command without the --follow flag.", + subject, + ), + displayer.Info, + false, + stdout, + ) + return nil + } + + time.Sleep(time.Duration(r.options.interval) * time.Second) + previousLogs = newLogs } +} - api, err := client.ApiClient() +// getData retrieves logs and status of a workflow or a job. +func (r *logsCommandRunner) getLogsWithStatus( + step string, + logsParams *operations.GetWorkflowLogsParams, + workflowStatusParams *operations.GetWorkflowStatusParams, +) (string, string, error) { + workflowLogs, err := r.getLogs(logsParams) if err != nil { - return err + return "", "", err + } + + if step != "" { + job := getFirstJob(workflowLogs.JobLogs) + if job == nil { + return "", "", fmt.Errorf("step %s not found", step) + } + return job.Logs, job.Status, nil } - logsResp, err := api.Operations.GetWorkflowLogs(logsParams) + + statusResponse, err := r.api.Operations.GetWorkflowStatus(workflowStatusParams) if err != nil { - return err + return "", "", err } + return *workflowLogs.WorkflowLogs, statusResponse.GetPayload().Status, nil +} + +// getLogs retrieves logs of a workflow and unmarshals data into logs structure. +func (r *logsCommandRunner) getLogs(logsParams *operations.GetWorkflowLogsParams) (logs, error) { var workflowLogs logs + logsResp, err := r.api.Operations.GetWorkflowLogs(logsParams) + if err != nil { + return workflowLogs, err + } + if r.options.follow && !logsResp.GetPayload().LiveLogsEnabled { + return workflowLogs, fmt.Errorf( + "live logs are not enabled, please rerun the command without the --follow flag", + ) + } + err = json.Unmarshal([]byte(logsResp.GetPayload().Logs), &workflowLogs) + if err != nil { + return workflowLogs, err + } + return workflowLogs, nil +} + +// validateOptions validates the options of the logs command. +func (r *logsCommandRunner) validateOptions(writer io.Writer) { + if r.options.jsonOutput && r.options.follow { + displayer.DisplayMessage( + "Ignoring --json as it cannot be used together with --follow.", + displayer.Warning, + false, + writer, + ) + } + if r.options.interval < logsFollowMinInterval { + displayer.DisplayMessage( + fmt.Sprintf( + "Interval must be greater than or equal to %d, using default interval (%d s).", + logsFollowMinInterval, + logsFollowDefautlInterval, + ), + displayer.Warning, + false, + writer, + ) + r.options.interval = logsFollowDefautlInterval + } +} + +// retrieveLogs retrieves and prints logs of a workflow. +func (r *logsCommandRunner) retrieveLogs( + filters filterer.Filters, + logsParams *operations.GetWorkflowLogsParams, + cmd *cobra.Command, + steps []string, +) error { + workflowLogs, err := r.getLogs(logsParams) if err != nil { return err } @@ -144,7 +326,7 @@ func (o *logsOptions) run(cmd *cobra.Command) error { return err } - if o.jsonOutput { + if r.options.jsonOutput { err := displayer.DisplayJsonOutput(workflowLogs, cmd.OutOrStdout()) if err != nil { return err @@ -152,7 +334,15 @@ func (o *logsOptions) run(cmd *cobra.Command) error { } else { displayHumanFriendlyLogs(cmd, workflowLogs, steps) } + return nil +} +// getFirstJob returns the first job in the given map, +// or nil if the map is empty. +func getFirstJob(items map[string]jobLogItem) *jobLogItem { + for _, item := range items { + return &item + } return nil } diff --git a/cmd/logs_test.go b/cmd/logs_test.go index 3439610..c98e08a 100644 --- a/cmd/logs_test.go +++ b/cmd/logs_test.go @@ -140,6 +140,133 @@ func TestLogs(t *testing.T) { expected: []string{"Field 'page': Must be at least 1."}, wantError: true, }, + "invalid server url": { + serverURL: "^^*invalid", + args: []string{"-w", workflowName}, + expected: []string{"environment variable REANA_SERVER_URL is not set"}, + wantError: true, + }, + "follow workflow": { + serverResponses: map[string]ServerResponse{ + fmt.Sprintf(logsPathTemplate, workflowName): { + statusCode: http.StatusOK, + responseFile: "logs_complete_live.json", + }, + fmt.Sprintf(statusPathTemplate, workflowName): { + statusCode: http.StatusOK, + responseFile: "status_finished.json", + }, + }, + args: []string{"-w", workflowName, "--follow", "-i", "0"}, + expected: []string{ + "Interval must be greater than or equal to 1, using default interval (10 s).", + "workflow logs", + "==> Workflow has completed, you might want to rerun the command without the --follow flag.", + }, + unwanted: []string{ + "job1", + "step", + }, + }, + "follow job with multiple steps, size, interval and json flags": { + serverResponses: map[string]ServerResponse{ + fmt.Sprintf(logsPathTemplate, workflowName): { + statusCode: http.StatusOK, + responseFile: "logs_running.json", + additionalResponseFiles: []string{"logs_complete_live.json"}, + }, + }, + args: []string{ + "-w", + workflowName, + "--follow", + "--json", + "--filter", + "step=job1", + "--filter", + "step=job2", + "--size", + "1", + "-i", + "1", + }, + expected: []string{ + "Ignoring --json as it cannot be used together with --follow.", + "Only one step can be followed at a time, ignoring additional steps.", + "workflow 1 logs", + "Job has completed, you might want to rerun the command without the --follow flag.", + }, + }, + "follow job that does not exist": { + serverResponses: map[string]ServerResponse{ + fmt.Sprintf(logsPathTemplate, workflowName): { + statusCode: http.StatusOK, + responseFile: "logs_empty.json", + }, + }, + args: []string{"-w", workflowName, "--follow", "--filter", "step=job1"}, + expected: []string{ + "step job1 not found", + }, + wantError: true, + }, + "follow logs when server returns logs error": { + serverResponses: map[string]ServerResponse{ + fmt.Sprintf(logsPathTemplate, workflowName): { + statusCode: http.StatusInternalServerError, + }, + }, + args: []string{"-w", workflowName, "--follow"}, + wantError: true, + }, + "follow logs when server returns status error": { + serverResponses: map[string]ServerResponse{ + fmt.Sprintf(logsPathTemplate, workflowName): { + statusCode: http.StatusOK, + responseFile: "logs_complete_live.json", + }, + fmt.Sprintf(statusPathTemplate, workflowName): { + statusCode: http.StatusInternalServerError, + }, + }, + args: []string{"-w", workflowName, "--follow"}, + wantError: true, + }, + "follow logs when server returns html response": { + serverResponses: map[string]ServerResponse{ + fmt.Sprintf(logsPathTemplate, workflowName): { + statusCode: http.StatusOK, + responseFile: "page.html", + }, + }, + args: []string{"-w", workflowName, "--follow"}, + expected: []string{"invalid character '<' looking for beginning of value"}, + wantError: true, + }, + "follow logs when server returns malformed logs": { + serverResponses: map[string]ServerResponse{ + fmt.Sprintf(logsPathTemplate, workflowName): { + statusCode: http.StatusOK, + responseFile: "logs_malformed.json", + }, + }, + args: []string{"-w", workflowName, "--follow"}, + expected: []string{"invalid character 'm' looking for beginning of value"}, + wantError: true, + }, + "follow logs when live logs are disabled": { + serverResponses: map[string]ServerResponse{ + fmt.Sprintf(logsPathTemplate, workflowName): { + statusCode: http.StatusOK, + responseFile: "logs_complete.json", + }, + }, + args: []string{"-w", workflowName, "--follow"}, + expected: []string{ + "live logs are not enabled, please rerun the command without the --follow flag", + }, + wantError: true, + }, } for name, params := range tests { diff --git a/cmd/root_test.go b/cmd/root_test.go index 28fcdc3..a3f16c7 100644 --- a/cmd/root_test.go +++ b/cmd/root_test.go @@ -45,15 +45,37 @@ type TestCmdParams struct { expected []string unwanted []string wantError bool + serverURL string } type ServerResponse struct { - statusCode int - responseFile string - responseHeaders map[string]string + statusCode int + responseFile string + responseHeaders map[string]string + additionalResponseFiles []string +} + +// getResponseFile returns the response file for the given call number, +// allowing for additional response files to be used for the same endpoint, +// i. e. when the endpoint is called multiple times. +func getResponseFile(callSeqNum int, serverResponse ServerResponse) string { + if len(serverResponse.additionalResponseFiles) == 0 { + return serverResponse.responseFile + } + + if callSeqNum == 0 { + return serverResponse.responseFile + } + + if callSeqNum < len(serverResponse.additionalResponseFiles)+1 { + return serverResponse.additionalResponseFiles[callSeqNum-1] + } + + return serverResponse.responseFile } func testCmdRun(t *testing.T, p TestCmdParams) { + callSeqNum := 0 server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if accessToken := r.URL.Query().Get("access_token"); accessToken != "1234" { t.Errorf("Expected access token '1234', got '%v'", accessToken) @@ -67,9 +89,11 @@ func testCmdRun(t *testing.T, p TestCmdParams) { w.WriteHeader(res.statusCode) var body []byte - if res.responseFile != "" { + responseFile := getResponseFile(callSeqNum, res) + callSeqNum++ + if responseFile != "" { var err error - body, err = os.ReadFile("../testdata/inputs/" + res.responseFile) + body, err = os.ReadFile("../testdata/inputs/" + responseFile) if err != nil { t.Fatalf("Error while reading response file: %v", err) } @@ -84,6 +108,9 @@ func testCmdRun(t *testing.T, p TestCmdParams) { })) viper.Set("server-url", server.URL) + if p.serverURL != "" { + viper.Set("server-url", p.serverURL) + } t.Cleanup(func() { server.Close() viper.Reset() diff --git a/pkg/config/config.go b/pkg/config/config.go index e95df59..1a4acc3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,6 +1,6 @@ /* This file is part of REANA. -Copyright (C) 2022 CERN. +Copyright (C) 2022, 2024 CERN. REANA is free software; you can redistribute it and/or modify it under the terms of the MIT License; see LICENSE file for more details. @@ -31,18 +31,15 @@ var ReanaComputeBackendKeys = []string{"kubernetes", "htcondor", "slurm"} // LeadingMark prefix used when displaying headers or important messages. var LeadingMark = "==>" +var WorkflowCompletedStatuses = []string{"finished", "failed", "stopped"} + +var WorkflowProgressingStatuses = []string{"created", "running", "queued", "pending"} + // GetRunStatuses provides a list of currently supported run statuses. // Includes the deleted status if includeDeleted is set to true. func GetRunStatuses(includeDeleted bool) []string { - runStatuses := []string{ - "created", - "running", - "finished", - "failed", - "stopped", - "queued", - "pending", - } + runStatuses := append(WorkflowCompletedStatuses, WorkflowProgressingStatuses...) + if includeDeleted { runStatuses = append(runStatuses, "deleted") } diff --git a/testdata/inputs/logs_complete_live.json b/testdata/inputs/logs_complete_live.json new file mode 100644 index 0000000..6c69580 --- /dev/null +++ b/testdata/inputs/logs_complete_live.json @@ -0,0 +1,7 @@ +{ + "logs": "{\"workflow_logs\": \"workflow logs\",\"job_logs\": {\"1\": {\"workflow_uuid\": \"workflow_1\",\"job_name\": \"job1\",\"compute_backend\": \"Kubernetes\",\"backend_job_id\": \"backend1\",\"docker_img\": \"docker1\",\"cmd\": \"ls\",\"status\": \"finished\",\"logs\": \"workflow 1 logs\",\"started_at\": \"2022-07-20T12:09:09\",\"finished_at\": \"2022-07-20T19:09:09\"},\"2\": {\"workflow_uuid\": \"workflow_2\",\"job_name\": \"job2\",\"compute_backend\": \"Slurm\",\"backend_job_id\": \"backend2\",\"docker_img\": \"docker2\",\"cmd\": \"cd folder\",\"status\": \"running\",\"logs\": \"workflow 2 logs\",\"started_at\": \"2022-07-21T12:09:09\",\"finished_at\": \"2022-07-21T19:09:09\"}},\"engine_specific\": \"engine logs\"}", + "user": "user", + "workflow_id": "my_workflow_id", + "workflow_name": "my_workflow", + "live_logs_enabled": true +} diff --git a/testdata/inputs/logs_empty.json b/testdata/inputs/logs_empty.json index e5857be..c9f1973 100644 --- a/testdata/inputs/logs_empty.json +++ b/testdata/inputs/logs_empty.json @@ -2,5 +2,6 @@ "logs": "{}", "user": "user", "workflow_id": "my_workflow_id", - "workflow_name": "my_workflow" -} \ No newline at end of file + "workflow_name": "my_workflow", + "live_logs_enabled": true +} diff --git a/testdata/inputs/logs_malformed.json b/testdata/inputs/logs_malformed.json new file mode 100644 index 0000000..65185d9 --- /dev/null +++ b/testdata/inputs/logs_malformed.json @@ -0,0 +1,7 @@ +{ + "logs": "malformed logs", + "user": "user", + "workflow_id": "my_workflow_id", + "workflow_name": "my_workflow", + "live_logs_enabled": true +} diff --git a/testdata/inputs/logs_running.json b/testdata/inputs/logs_running.json new file mode 100644 index 0000000..1a8969b --- /dev/null +++ b/testdata/inputs/logs_running.json @@ -0,0 +1,7 @@ +{ + "logs": "{\"workflow_logs\": \"workflow logs\",\"job_logs\": {\"1\": {\"workflow_uuid\": \"workflow_1\",\"job_name\": \"job1\",\"compute_backend\": \"Kubernetes\",\"backend_job_id\": \"backend1\",\"docker_img\": \"docker1\",\"cmd\": \"ls\",\"status\": \"running\",\"logs\": \"workflow\",\"started_at\": \"2022-07-20T12:09:09\",\"finished_at\": \"2022-07-20T19:09:09\"}},\"engine_specific\": \"engine logs\"}", + "user": "user", + "workflow_id": "my_workflow_id", + "workflow_name": "my_workflow", + "live_logs_enabled": true +} diff --git a/testdata/inputs/page.html b/testdata/inputs/page.html new file mode 100644 index 0000000..9d5b11b --- /dev/null +++ b/testdata/inputs/page.html @@ -0,0 +1,10 @@ + + +
+This is a test HTML page.
+ +