From b997d97fe146936a5ad9848fc6bc309cce7861cf Mon Sep 17 00:00:00 2001 From: Michael Adler Date: Mon, 19 Jun 2023 14:14:42 +0200 Subject: [PATCH] feat: instant job update notifications using server-sent events This commit introduces a new feature that enables clients to receive instant notifications for job status updates. Details: - A new endpoint `/jobs/{id}/status/subscribe` has been added, which implements instant notifications through Server-Sent Events (SSE). - By subscribing to this endpoint, clients can efficiently keep track of job status changes without the need to repeatedly poll the server. - A client reference implementation has been added to wfxctl. Signed-off-by: Michael Adler --- CHANGELOG.md | 2 + api/errors.go | 6 + api/job_status_test.go | 121 +++++++ api/northbound.go | 16 + api/southbound.go | 16 + api/workflow_test.go | 7 +- cmd/wfx/cmd/root/cmd.go | 6 +- cmd/wfxctl/cmd/job/job.go | 2 + .../cmd/job/subscribestatus/main_test.go | 19 ++ .../job/subscribestatus/subscribe_status.go | 126 +++++++ .../subscribestatus/subscribe_status_test.go | 99 ++++++ cmd/wfxctl/flags/basecmd.go | 9 +- docs/operations.md | 64 ++++ ...get_jobs_id_status_subscribe_parameters.go | 156 +++++++++ .../get_jobs_id_status_subscribe_responses.go | 309 ++++++++++++++++++ generated/client/jobs/jobs_client.go | 44 +++ .../restapi/configure_workflow_executor.go | 2 + generated/northbound/restapi/doc.go | 1 + generated/northbound/restapi/embedded_spec.go | 138 ++++++++ .../get_jobs_id_status_subscribe.go | 67 ++++ ...get_jobs_id_status_subscribe_parameters.go | 76 +++++ .../get_jobs_id_status_subscribe_responses.go | 172 ++++++++++ ...get_jobs_id_status_subscribe_urlbuilder.go | 104 ++++++ .../operations/workflow_executor_api.go | 24 ++ .../restapi/configure_workflow_executor.go | 2 + generated/southbound/restapi/doc.go | 1 + generated/southbound/restapi/embedded_spec.go | 138 ++++++++ .../get_jobs_id_status_subscribe.go | 67 ++++ ...get_jobs_id_status_subscribe_parameters.go | 76 +++++ .../get_jobs_id_status_subscribe_responses.go | 172 ++++++++++ ...get_jobs_id_status_subscribe_urlbuilder.go | 104 ++++++ .../operations/workflow_executor_api.go | 24 ++ go.mod | 4 + go.sum | 7 + internal/handler/job/create.go | 1 - internal/handler/job/get.go | 9 +- internal/handler/job/status/subscribe.go | 70 ++++ internal/handler/job/status/subscribe_test.go | 103 ++++++ internal/handler/job/status/update.go | 17 + internal/handler/job/status/update_test.go | 57 ++++ internal/producer/text_event_stream.go | 37 +++ internal/producer/text_event_stream_test.go | 34 ++ internal/workflow/workflow.go | 11 + internal/workflow/workflow_test.go | 18 +- middleware/logging/writer.go | 6 + middleware/logging/writer_test.go | 8 + middleware/responder/sse/main_test.go | 19 ++ middleware/responder/sse/responder.go | 67 ++++ middleware/responder/sse/responder_test.go | 54 +++ middleware/responder/util/content_type.go | 29 ++ .../responder/util/content_type_test.go | 33 ++ middleware/responder/util/main_test.go | 19 ++ spec/wfx.swagger.yml | 46 +++ test/04-operations.bats | 24 ++ 54 files changed, 2827 insertions(+), 16 deletions(-) create mode 100644 cmd/wfxctl/cmd/job/subscribestatus/main_test.go create mode 100644 cmd/wfxctl/cmd/job/subscribestatus/subscribe_status.go create mode 100644 cmd/wfxctl/cmd/job/subscribestatus/subscribe_status_test.go create mode 100644 generated/client/jobs/get_jobs_id_status_subscribe_parameters.go create mode 100644 generated/client/jobs/get_jobs_id_status_subscribe_responses.go create mode 100644 generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe.go create mode 100644 generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_parameters.go create mode 100644 generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_responses.go create mode 100644 generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_urlbuilder.go create mode 100644 generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe.go create mode 100644 generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_parameters.go create mode 100644 generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_responses.go create mode 100644 generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_urlbuilder.go create mode 100644 internal/handler/job/status/subscribe.go create mode 100644 internal/handler/job/status/subscribe_test.go create mode 100644 internal/producer/text_event_stream.go create mode 100644 internal/producer/text_event_stream_test.go create mode 100644 middleware/responder/sse/main_test.go create mode 100644 middleware/responder/sse/responder.go create mode 100644 middleware/responder/sse/responder_test.go create mode 100644 middleware/responder/util/content_type.go create mode 100644 middleware/responder/util/content_type_test.go create mode 100644 middleware/responder/util/main_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d7a7051b..1c8ee29a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Instant job update notifications using server-sent events (see #11) + ### Fixed - Send HTTP status code 404 when attempting to access the file server while it is disabled diff --git a/api/errors.go b/api/errors.go index 9a7d9d68..92f8944f 100644 --- a/api/errors.go +++ b/api/errors.go @@ -41,3 +41,9 @@ var WorkflowInvalid = model.Error{ Logref: "18f57adc70dd79c7fb4f1246be8a6e04", Message: "Workflow validation failed", } + +var JobTerminalState = model.Error{ + Code: "wfx.jobTerminalState", + Logref: "916f0a913a3e4a52a96bd271e029c201", + Message: "The request was invalid because the job is in a terminal state", +} diff --git a/api/job_status_test.go b/api/job_status_test.go index 52708b08..d20fe73a 100644 --- a/api/job_status_test.go +++ b/api/job_status_test.go @@ -12,11 +12,15 @@ import ( "context" "fmt" "net/http" + "sync" "testing" + "time" "github.com/siemens/wfx/generated/model" "github.com/siemens/wfx/internal/handler/job" + "github.com/siemens/wfx/internal/handler/job/status" "github.com/siemens/wfx/internal/handler/workflow" + "github.com/siemens/wfx/persistence" "github.com/siemens/wfx/workflow/dau" "github.com/steinfletcher/apitest" jsonpath "github.com/steinfletcher/apitest-jsonpath" @@ -118,3 +122,120 @@ func TestJobStatusUpdate(t *testing.T) { Assert(jsonpath.Contains(`$.state`, "DOWNLOADING")). End() } + +func TestJobStatusSubscribe(t *testing.T) { + db := newInMemoryDB(t) + wf := dau.DirectWorkflow() + _, err := workflow.CreateWorkflow(context.Background(), db, wf) + require.NoError(t, err) + + north, south := createNorthAndSouth(t, db) + + handlers := []http.Handler{north, south} + for i, name := range allAPIs { + handler := handlers[i] + t.Run(name, func(t *testing.T) { + jobReq := model.JobRequest{ + ClientID: "foo", + Workflow: wf.Name, + } + job, err := job.CreateJob(context.Background(), db, &jobReq) + require.NoError(t, err) + jobPath := fmt.Sprintf("/api/wfx/v1/jobs/%s/status/subscribe", job.ID) + + job, err = db.UpdateJob(context.Background(), job, persistence.JobUpdate{ + Status: &model.JobStatus{State: "ACTIVATING"}, + }) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + if status.SubscriberCount() > 0 { + break + } + time.Sleep(time.Millisecond) + } + // give it some extra time, just to be safe + time.Sleep(500 * time.Millisecond) + // update job to terminal state + _, err = status.Update(context.Background(), db, job.ID, &model.JobStatus{State: "ACTIVATED"}, model.EligibleEnumCLIENT) + require.NoError(t, err) + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + apitest.New(). + Handler(handler). + Get(jobPath).WithContext(ctx). + Expect(t). + Status(http.StatusOK). + Header("Content-Type", "text/event-stream"). + Body(`data: {"state":"ACTIVATING"} + +data: {"state":"ACTIVATED"} + +`). + End() + + wg.Wait() + status.ShutdownSubscribers() + }) + } +} + +func TestJobStatusSubscribe_NotFound(t *testing.T) { + db := newInMemoryDB(t) + north, south := createNorthAndSouth(t, db) + + handlers := []http.Handler{north, south} + for i, name := range allAPIs { + handler := handlers[i] + t.Run(name, func(t *testing.T) { + apitest.New(). + Handler(handler). + Get("/api/wfx/v1/jobs/42/status/subscribe"). + Expect(t). + Status(http.StatusNotFound). + Header("Content-Type", "application/json"). + Assert(jsonpath.Equal(`$.errors[0].code`, "wfx.jobNotFound")). + End() + }) + } +} + +func TestJobStatusSubscribe_TerminalState(t *testing.T) { + db := newInMemoryDB(t) + + wf := dau.DirectWorkflow() + _, _ = workflow.CreateWorkflow(context.Background(), db, wf) + + tmpJob := model.Job{ + ClientID: "foo", + Workflow: wf, + Status: &model.JobStatus{State: "ACTIVATED"}, + } + + job, err := db.CreateJob(context.Background(), &tmpJob) + require.NoError(t, err) + + jobPath := fmt.Sprintf("/api/wfx/v1/jobs/%s/status/subscribe", job.ID) + + north, south := createNorthAndSouth(t, db) + handlers := []http.Handler{north, south} + for i, name := range allAPIs { + handler := handlers[i] + t.Run(name, func(t *testing.T) { + apitest.New(). + Handler(handler). + Get(jobPath). + Expect(t). + Status(http.StatusBadRequest). + Header("Content-Type", "application/json"). + Assert(jsonpath.Equal(`$.errors[0].code`, "wfx.jobTerminalState")). + End() + }) + } +} diff --git a/api/northbound.go b/api/northbound.go index a06f26fe..f5810639 100644 --- a/api/northbound.go +++ b/api/northbound.go @@ -25,6 +25,8 @@ import ( "github.com/siemens/wfx/internal/handler/job/tags" "github.com/siemens/wfx/internal/handler/workflow" "github.com/siemens/wfx/middleware/logging" + "github.com/siemens/wfx/middleware/responder/sse" + "github.com/siemens/wfx/middleware/responder/util" "github.com/siemens/wfx/persistence" ) @@ -263,5 +265,19 @@ func NewNorthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor return northbound.NewDeleteJobsIDTagsOK().WithPayload(tags) }) + serverAPI.NorthboundGetJobsIDStatusSubscribeHandler = northbound.GetJobsIDStatusSubscribeHandlerFunc( + func(params northbound.GetJobsIDStatusSubscribeParams) middleware.Responder { + ctx := params.HTTPRequest.Context() + eventChan, err := status.AddSubscriber(ctx, storage, params.ID) + if ftag.Get(err) == ftag.NotFound { + return util.ForceJSONResponse(http.StatusNotFound, + &model.ErrorResponse{Errors: []*model.Error{&JobNotFound}}) + } else if ftag.Get(err) == ftag.InvalidArgument { + return util.ForceJSONResponse(http.StatusBadRequest, + &model.ErrorResponse{Errors: []*model.Error{&JobTerminalState}}) + } + return sse.Responder(ctx, eventChan) + }) + return serverAPI, nil } diff --git a/api/southbound.go b/api/southbound.go index 2a34ca0e..0bf289a3 100644 --- a/api/southbound.go +++ b/api/southbound.go @@ -25,6 +25,8 @@ import ( "github.com/siemens/wfx/internal/handler/job/tags" "github.com/siemens/wfx/internal/handler/workflow" "github.com/siemens/wfx/middleware/logging" + "github.com/siemens/wfx/middleware/responder/sse" + "github.com/siemens/wfx/middleware/responder/util" "github.com/siemens/wfx/persistence" ) @@ -176,5 +178,19 @@ func NewSouthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor return southbound.NewGetJobsIDTagsOK().WithPayload(tags) }) + serverAPI.SouthboundGetJobsIDStatusSubscribeHandler = southbound.GetJobsIDStatusSubscribeHandlerFunc( + func(params southbound.GetJobsIDStatusSubscribeParams) middleware.Responder { + ctx := params.HTTPRequest.Context() + eventChan, err := status.AddSubscriber(ctx, storage, params.ID) + if ftag.Get(err) == ftag.NotFound { + return util.ForceJSONResponse(http.StatusNotFound, + &model.ErrorResponse{Errors: []*model.Error{&JobNotFound}}) + } else if ftag.Get(err) == ftag.InvalidArgument { + return util.ForceJSONResponse(http.StatusBadRequest, + &model.ErrorResponse{Errors: []*model.Error{&JobTerminalState}}) + } + return sse.Responder(ctx, eventChan) + }) + return serverAPI, nil } diff --git a/api/workflow_test.go b/api/workflow_test.go index 404963f9..0263b943 100644 --- a/api/workflow_test.go +++ b/api/workflow_test.go @@ -213,8 +213,11 @@ func createNorthAndSouth(t *testing.T, db persistence.Storage) (http.Handler, ht } func persistJob(t *testing.T, db persistence.Storage) *model.Job { - wf, err := workflow.CreateWorkflow(context.Background(), db, dau.DirectWorkflow()) - require.NoError(t, err) + wf := dau.DirectWorkflow() + if found, _ := workflow.GetWorkflow(context.Background(), db, wf.Name); found == nil { + _, err := workflow.CreateWorkflow(context.Background(), db, wf) + require.NoError(t, err) + } jobReq := model.JobRequest{ ClientID: "foo", diff --git a/cmd/wfx/cmd/root/cmd.go b/cmd/wfx/cmd/root/cmd.go index a5552172..e31e33f6 100644 --- a/cmd/wfx/cmd/root/cmd.go +++ b/cmd/wfx/cmd/root/cmd.go @@ -30,6 +30,7 @@ import ( "github.com/rs/zerolog/log" "github.com/siemens/wfx/cmd/wfx/metadata" "github.com/siemens/wfx/internal/config" + "github.com/siemens/wfx/internal/handler/job/status" "github.com/siemens/wfx/persistence" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -255,7 +256,10 @@ Examples of tasks are installation of firmware or other types of commands issued } } - // Create a context with a timeout to allow outstanding requests to complete + // shut down (disconnect) subscribers otherwise we cannot stop the web server due to open connections + status.ShutdownSubscribers() + + // create a context with a timeout to allow outstanding requests to complete var timeout time.Duration k.Read(func(k *koanf.Koanf) { timeout = k.Duration(gracefulTimeoutFlag) diff --git a/cmd/wfxctl/cmd/job/job.go b/cmd/wfxctl/cmd/job/job.go index 167a5921..7937deeb 100644 --- a/cmd/wfxctl/cmd/job/job.go +++ b/cmd/wfxctl/cmd/job/job.go @@ -18,6 +18,7 @@ import ( "github.com/siemens/wfx/cmd/wfxctl/cmd/job/getstatus" "github.com/siemens/wfx/cmd/wfxctl/cmd/job/gettags" "github.com/siemens/wfx/cmd/wfxctl/cmd/job/query" + "github.com/siemens/wfx/cmd/wfxctl/cmd/job/subscribestatus" "github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatedefinition" "github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatestatus" "github.com/spf13/cobra" @@ -43,4 +44,5 @@ func init() { Command.AddCommand(addtags.Command) Command.AddCommand(deltags.Command) Command.AddCommand(gettags.Command) + Command.AddCommand(subscribestatus.Command) } diff --git a/cmd/wfxctl/cmd/job/subscribestatus/main_test.go b/cmd/wfxctl/cmd/job/subscribestatus/main_test.go new file mode 100644 index 00000000..cf65d0a8 --- /dev/null +++ b/cmd/wfxctl/cmd/job/subscribestatus/main_test.go @@ -0,0 +1,19 @@ +package subscribestatus + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/cmd/wfxctl/cmd/job/subscribestatus/subscribe_status.go b/cmd/wfxctl/cmd/job/subscribestatus/subscribe_status.go new file mode 100644 index 00000000..2c6fadd3 --- /dev/null +++ b/cmd/wfxctl/cmd/job/subscribestatus/subscribe_status.go @@ -0,0 +1,126 @@ +package subscribestatus + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "time" + + "github.com/Southclaws/fault" + "github.com/go-openapi/runtime" + "github.com/go-openapi/runtime/client" + "github.com/go-openapi/strfmt" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "github.com/tmaxmax/go-sse" + + "github.com/siemens/wfx/cmd/wfxctl/errutil" + "github.com/siemens/wfx/cmd/wfxctl/flags" + generatedClient "github.com/siemens/wfx/generated/client" + "github.com/siemens/wfx/generated/client/jobs" + "github.com/siemens/wfx/generated/model" +) + +const ( + idFlag = "id" +) + +var validator = func(out io.Writer) sse.ResponseValidator { + return func(r *http.Response) error { + if r.StatusCode == http.StatusOK { + return nil + } + + if r.Body != nil { + defer r.Body.Close() + b, err := io.ReadAll(r.Body) + if err != nil { + return fault.Wrap(err) + } + + errResp := new(model.ErrorResponse) + if err := json.Unmarshal(b, errResp); err != nil { + return fault.Wrap(err) + } + if len(errResp.Errors) > 0 { + for _, msg := range errResp.Errors { + fmt.Fprintf(out, "ERROR: %s (code=%s, logref=%s)\n", msg.Message, msg.Code, msg.Logref) + } + } + } + return fmt.Errorf("received HTTP status code: %d", r.StatusCode) + } +} + +func init() { + f := Command.Flags() + f.String(idFlag, "", "job id") +} + +type SSETransport struct { + baseCmd *flags.BaseCmd + out io.Writer +} + +// Submit implements the runtime.ClientTransport interface. +func (t SSETransport) Submit(op *runtime.ClientOperation) (interface{}, error) { + cfg := t.baseCmd.CreateTransportConfig() + rt := client.New(cfg.Host, generatedClient.DefaultBasePath, cfg.Schemes) + req := errutil.Must(rt.CreateHttpRequest(op)) + + httpClient := errutil.Must(t.baseCmd.CreateHTTPClient()) + httpClient.Timeout = 0 + + client := sse.Client{ + HTTPClient: httpClient, + DefaultReconnectionTime: time.Second * 5, + ResponseValidator: validator(t.out), + } + + conn := client.NewConnection(req) + unsubscribe := conn.SubscribeMessages(func(event sse.Event) { + _, _ = os.Stdout.WriteString(event.Data) + os.Stdout.Write([]byte("\n")) + }) + defer unsubscribe() + + err := conn.Connect() + if err != nil { + return nil, fault.Wrap(err) + } + + return jobs.NewGetJobsIDStatusSubscribeOK(), nil +} + +var Command = &cobra.Command{ + Use: "subscribe-status", + Short: "Subscribe to job update events", + Example: ` +wfxctl job subscribe-status --id=1 +`, + TraverseChildren: true, + Run: func(cmd *cobra.Command, args []string) { + params := jobs.NewGetJobsIDStatusSubscribeParams(). + WithID(flags.Koanf.String(idFlag)) + if params.ID == "" { + log.Fatal().Msg("Job ID missing") + } + + baseCmd := flags.NewBaseCmd() + transport := SSETransport{baseCmd: &baseCmd, out: cmd.OutOrStderr()} + executor := generatedClient.New(transport, strfmt.Default) + if _, err := executor.Jobs.GetJobsIDStatusSubscribe(params); err != nil { + log.Fatal().Msg("Failed to subscribe to job status") + } + }, +} diff --git a/cmd/wfxctl/cmd/job/subscribestatus/subscribe_status_test.go b/cmd/wfxctl/cmd/job/subscribestatus/subscribe_status_test.go new file mode 100644 index 00000000..4465b325 --- /dev/null +++ b/cmd/wfxctl/cmd/job/subscribestatus/subscribe_status_test.go @@ -0,0 +1,99 @@ +package subscribestatus + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + + "github.com/siemens/wfx/api" + "github.com/siemens/wfx/cmd/wfxctl/errutil" + "github.com/siemens/wfx/cmd/wfxctl/flags" + "github.com/siemens/wfx/generated/model" + "github.com/stretchr/testify/assert" +) + +func TestSubscribeJobStatus(t *testing.T) { + const expectedPath = "/api/wfx/v1/jobs/1/status/subscribe" + var actualPath string + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + actualPath = r.URL.Path + + w.Header().Add("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`data: "hello world" + +`)) + })) + defer ts.Close() + + u, _ := url.Parse(ts.URL) + _ = flags.Koanf.Set(flags.ClientHostFlag, u.Hostname()) + port, _ := strconv.Atoi(u.Port()) + _ = flags.Koanf.Set(flags.ClientPortFlag, port) + + _ = flags.Koanf.Set(idFlag, "1") + + err := Command.Execute() + assert.NoError(t, err) + + assert.Equal(t, expectedPath, actualPath) +} + +func TestValidator_OK(t *testing.T) { + out := new(bytes.Buffer) + resp := http.Response{StatusCode: http.StatusOK} + err := validator(out)(&resp) + assert.Nil(t, err) +} + +func TestValidator_Error(t *testing.T) { + out := new(bytes.Buffer) + resp := http.Response{StatusCode: http.StatusInternalServerError} + err := validator(out)(&resp) + assert.NotNil(t, err) +} + +func TestValidator_BadRequest(t *testing.T) { + out := new(bytes.Buffer) + + errResp := model.ErrorResponse{ + Errors: []*model.Error{ + &api.JobTerminalState, + }, + } + + rec := httptest.NewRecorder() + rec.WriteHeader(http.StatusBadRequest) + _, _ = rec.Write(errutil.Must(json.Marshal(&errResp))) + + resp := rec.Result() + err := validator(out)(resp) + assert.NotNil(t, err) + + assert.Equal(t, "ERROR: The request was invalid because the job is in a terminal state (code=wfx.jobTerminalState, logref=916f0a913a3e4a52a96bd271e029c201)\n", out.String()) +} + +func TestValidator_BadRequestInvalidJson(t *testing.T) { + out := new(bytes.Buffer) + + rec := httptest.NewRecorder() + rec.WriteHeader(http.StatusBadRequest) + _, _ = rec.WriteString("data: foo") + + resp := rec.Result() + err := validator(out)(resp) + assert.NotNil(t, err) +} diff --git a/cmd/wfxctl/flags/basecmd.go b/cmd/wfxctl/flags/basecmd.go index cbe2a999..d47bcc28 100644 --- a/cmd/wfxctl/flags/basecmd.go +++ b/cmd/wfxctl/flags/basecmd.go @@ -87,6 +87,7 @@ func (b *BaseCmd) CreateHTTPClient() (*http.Client, error) { if err != nil { return nil, fault.Wrap(err) } + log.Info().Msg("Using unix-domain socket transport") return &http.Client{ Transport: &http.Transport{ Dial: func(_, _ string) (net.Conn, error) { @@ -125,6 +126,10 @@ func (b *BaseCmd) CreateHTTPClient() (*http.Client, error) { } func (b *BaseCmd) CreateClient() *client.WorkflowExecutor { + return client.NewHTTPClientWithConfig(strfmt.Default, b.CreateTransportConfig()) +} + +func (b *BaseCmd) CreateTransportConfig() *client.TransportConfig { var host string var schemes []string if b.EnableTLS { @@ -134,11 +139,9 @@ func (b *BaseCmd) CreateClient() *client.WorkflowExecutor { schemes = []string{"http"} host = fmt.Sprintf("%s:%d", b.Host, b.Port) } - - cfg := client.DefaultTransportConfig(). + return client.DefaultTransportConfig(). WithHost(host). WithSchemes(schemes) - return client.NewHTTPClientWithConfig(strfmt.Default, cfg) } func (b *BaseCmd) CreateMgmtClient() *client.WorkflowExecutor { diff --git a/docs/operations.md b/docs/operations.md index 2c8faf8f..e623a56b 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -28,6 +28,70 @@ Clients may inspect this specification at run-time so to obey the various limits For convenience, wfx includes a built-in Swagger UI accessible at runtime via , assuming default listening host and port [configuration](configuration.md). +### Job Status Notifications + +Job status notifications inform clients of changes in job status instantly, avoiding the need for repeated server polling. +This can be useful for UIs and applications requiring prompt status updates. + +The status notifications are built using [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) (SSE): + +```txt + ┌────────┐ ┌─────┐ + │ Client │ │ wfx │ + └────────┘ └─────┘ + | | + | GET /jobs/{id}/status/subscribe | + |-----------------------------------►| + | | + | | + | loop | + ┌───|────────────────────────────────────|───┐ + │ | [text/event-stream] | │ + │ | | │ + │ | send event | │ + │ |◄-----------------------------------| │ + │ | e.g. data: {"state":"INSTALLING"} | │ + │ | | │ + └───|────────────────────────────────────|───┘ + | | + ▼ ▼ + ┌────────┐ ┌─────┐ + │ Client │ │ wfx │ + └────────┘ └─────┘ +``` + +1. The client sends a `GET` request to the endpoint `/jobs/{id}/status/subscribe`. +2. The server sets the `Content-Type` set to `text/event-stream`. +3. The response body contains a stream of job status updates, with **the first event being the current job status**. + Each event is terminated by a pair of newline characters `\n\n` (as required by the SSE spec). + +**Example:** `wfxctl` provides a reference client implementation, e.g. + +```bash +# subscribe to a job with ID c28ef750-bdda-4d1c-a3fa-bb0ea1dca9d3 +wfxctl job subscribe-status --id c28ef750-bdda-4d1c-a3fa-bb0ea1dca9d3 +``` + +**Client Notes** + +1. Subscribing to a non-existent job yields a 404 Not Found error. +2. Subscribing to a job in a terminal state (a state without outgoing transitions) yields a 400 Bad Request error. +3. The connection closes when the job reaches a terminal state (after the last status update has been sent). +4. On issues (e.g., slow client), wfx closes connection. The client must reconnect if needed. + +**Caveats** + +1. Job status updates are dispatched asynchronously to prevent a subscriber from causing a delay in status updates. + Consequently, messages may be **delivered out-of-order**. This is typically not an issue; however, under + circumstances with high concurrency, such as multiple clients attempting to modify the same job or a single client + sending a burst of status updates, this behavior might occur. +2. Server-Sent Events use unidirectional communication, which means that **message delivery is not guaranteed**. In + other words, there are scenarios where certain events may never reach the subscriber. +3. When scaling out horizontally, the load balancer should consistently route job updates for a specific job ID to the same wfx instance. + Alternatively, the client can subscribe to updates from every wfx instance and subsequently aggregate the events it receives. +4. Browsers typically limit SSE connections (6 per domain). HTTP/2 can be used instead (100 connections by default) or some kind of + aggregation service could be used. + ### Response Filters wfx allows server-side response content filtering prior to sending the response to the client so to tailor it to client information needs. diff --git a/generated/client/jobs/get_jobs_id_status_subscribe_parameters.go b/generated/client/jobs/get_jobs_id_status_subscribe_parameters.go new file mode 100644 index 00000000..6bd56b91 --- /dev/null +++ b/generated/client/jobs/get_jobs_id_status_subscribe_parameters.go @@ -0,0 +1,156 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package jobs + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "context" + "net/http" + "time" + + "github.com/go-openapi/errors" + "github.com/go-openapi/runtime" + cr "github.com/go-openapi/runtime/client" + "github.com/go-openapi/strfmt" +) + +// NewGetJobsIDStatusSubscribeParams creates a new GetJobsIDStatusSubscribeParams object, +// with the default timeout for this client. +// +// Default values are not hydrated, since defaults are normally applied by the API server side. +// +// To enforce default values in parameter, use SetDefaults or WithDefaults. +func NewGetJobsIDStatusSubscribeParams() *GetJobsIDStatusSubscribeParams { + return &GetJobsIDStatusSubscribeParams{ + timeout: cr.DefaultTimeout, + } +} + +// NewGetJobsIDStatusSubscribeParamsWithTimeout creates a new GetJobsIDStatusSubscribeParams object +// with the ability to set a timeout on a request. +func NewGetJobsIDStatusSubscribeParamsWithTimeout(timeout time.Duration) *GetJobsIDStatusSubscribeParams { + return &GetJobsIDStatusSubscribeParams{ + timeout: timeout, + } +} + +// NewGetJobsIDStatusSubscribeParamsWithContext creates a new GetJobsIDStatusSubscribeParams object +// with the ability to set a context for a request. +func NewGetJobsIDStatusSubscribeParamsWithContext(ctx context.Context) *GetJobsIDStatusSubscribeParams { + return &GetJobsIDStatusSubscribeParams{ + Context: ctx, + } +} + +// NewGetJobsIDStatusSubscribeParamsWithHTTPClient creates a new GetJobsIDStatusSubscribeParams object +// with the ability to set a custom HTTPClient for a request. +func NewGetJobsIDStatusSubscribeParamsWithHTTPClient(client *http.Client) *GetJobsIDStatusSubscribeParams { + return &GetJobsIDStatusSubscribeParams{ + HTTPClient: client, + } +} + +/* +GetJobsIDStatusSubscribeParams contains all the parameters to send to the API endpoint + + for the get jobs ID status subscribe operation. + + Typically these are written to a http.Request. +*/ +type GetJobsIDStatusSubscribeParams struct { + + /* ID. + + Job ID + */ + ID string + + timeout time.Duration + Context context.Context + HTTPClient *http.Client +} + +// WithDefaults hydrates default values in the get jobs ID status subscribe params (not the query body). +// +// All values with no default are reset to their zero value. +func (o *GetJobsIDStatusSubscribeParams) WithDefaults() *GetJobsIDStatusSubscribeParams { + o.SetDefaults() + return o +} + +// SetDefaults hydrates default values in the get jobs ID status subscribe params (not the query body). +// +// All values with no default are reset to their zero value. +func (o *GetJobsIDStatusSubscribeParams) SetDefaults() { + // no default values defined for this parameter +} + +// WithTimeout adds the timeout to the get jobs ID status subscribe params +func (o *GetJobsIDStatusSubscribeParams) WithTimeout(timeout time.Duration) *GetJobsIDStatusSubscribeParams { + o.SetTimeout(timeout) + return o +} + +// SetTimeout adds the timeout to the get jobs ID status subscribe params +func (o *GetJobsIDStatusSubscribeParams) SetTimeout(timeout time.Duration) { + o.timeout = timeout +} + +// WithContext adds the context to the get jobs ID status subscribe params +func (o *GetJobsIDStatusSubscribeParams) WithContext(ctx context.Context) *GetJobsIDStatusSubscribeParams { + o.SetContext(ctx) + return o +} + +// SetContext adds the context to the get jobs ID status subscribe params +func (o *GetJobsIDStatusSubscribeParams) SetContext(ctx context.Context) { + o.Context = ctx +} + +// WithHTTPClient adds the HTTPClient to the get jobs ID status subscribe params +func (o *GetJobsIDStatusSubscribeParams) WithHTTPClient(client *http.Client) *GetJobsIDStatusSubscribeParams { + o.SetHTTPClient(client) + return o +} + +// SetHTTPClient adds the HTTPClient to the get jobs ID status subscribe params +func (o *GetJobsIDStatusSubscribeParams) SetHTTPClient(client *http.Client) { + o.HTTPClient = client +} + +// WithID adds the id to the get jobs ID status subscribe params +func (o *GetJobsIDStatusSubscribeParams) WithID(id string) *GetJobsIDStatusSubscribeParams { + o.SetID(id) + return o +} + +// SetID adds the id to the get jobs ID status subscribe params +func (o *GetJobsIDStatusSubscribeParams) SetID(id string) { + o.ID = id +} + +// WriteToRequest writes these params to a swagger request +func (o *GetJobsIDStatusSubscribeParams) WriteToRequest(r runtime.ClientRequest, reg strfmt.Registry) error { + + if err := r.SetTimeout(o.timeout); err != nil { + return err + } + var res []error + + // path param id + if err := r.SetPathParam("id", o.ID); err != nil { + return err + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} diff --git a/generated/client/jobs/get_jobs_id_status_subscribe_responses.go b/generated/client/jobs/get_jobs_id_status_subscribe_responses.go new file mode 100644 index 00000000..a3387718 --- /dev/null +++ b/generated/client/jobs/get_jobs_id_status_subscribe_responses.go @@ -0,0 +1,309 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package jobs + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "fmt" + "io" + + "github.com/go-openapi/runtime" + "github.com/go-openapi/strfmt" + "github.com/siemens/wfx/generated/model" +) + +// GetJobsIDStatusSubscribeReader is a Reader for the GetJobsIDStatusSubscribe structure. +type GetJobsIDStatusSubscribeReader struct { + formats strfmt.Registry +} + +// ReadResponse reads a server response into the received o. +func (o *GetJobsIDStatusSubscribeReader) ReadResponse(response runtime.ClientResponse, consumer runtime.Consumer) (interface{}, error) { + switch response.Code() { + case 200: + result := NewGetJobsIDStatusSubscribeOK() + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + return result, nil + case 400: + result := NewGetJobsIDStatusSubscribeBadRequest() + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + return nil, result + case 404: + result := NewGetJobsIDStatusSubscribeNotFound() + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + return nil, result + default: + result := NewGetJobsIDStatusSubscribeDefault(response.Code()) + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + if response.Code()/100 == 2 { + return result, nil + } + return nil, result + } +} + +// NewGetJobsIDStatusSubscribeOK creates a GetJobsIDStatusSubscribeOK with default headers values +func NewGetJobsIDStatusSubscribeOK() *GetJobsIDStatusSubscribeOK { + return &GetJobsIDStatusSubscribeOK{} +} + +/* +GetJobsIDStatusSubscribeOK describes a response with status code 200, with default header values. + +A stream of server-sent events +*/ +type GetJobsIDStatusSubscribeOK struct { +} + +// IsSuccess returns true when this get jobs Id status subscribe o k response has a 2xx status code +func (o *GetJobsIDStatusSubscribeOK) IsSuccess() bool { + return true +} + +// IsRedirect returns true when this get jobs Id status subscribe o k response has a 3xx status code +func (o *GetJobsIDStatusSubscribeOK) IsRedirect() bool { + return false +} + +// IsClientError returns true when this get jobs Id status subscribe o k response has a 4xx status code +func (o *GetJobsIDStatusSubscribeOK) IsClientError() bool { + return false +} + +// IsServerError returns true when this get jobs Id status subscribe o k response has a 5xx status code +func (o *GetJobsIDStatusSubscribeOK) IsServerError() bool { + return false +} + +// IsCode returns true when this get jobs Id status subscribe o k response a status code equal to that given +func (o *GetJobsIDStatusSubscribeOK) IsCode(code int) bool { + return code == 200 +} + +// Code gets the status code for the get jobs Id status subscribe o k response +func (o *GetJobsIDStatusSubscribeOK) Code() int { + return 200 +} + +func (o *GetJobsIDStatusSubscribeOK) Error() string { + return fmt.Sprintf("[GET /jobs/{id}/status/subscribe][%d] getJobsIdStatusSubscribeOK ", 200) +} + +func (o *GetJobsIDStatusSubscribeOK) String() string { + return fmt.Sprintf("[GET /jobs/{id}/status/subscribe][%d] getJobsIdStatusSubscribeOK ", 200) +} + +func (o *GetJobsIDStatusSubscribeOK) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + return nil +} + +// NewGetJobsIDStatusSubscribeBadRequest creates a GetJobsIDStatusSubscribeBadRequest with default headers values +func NewGetJobsIDStatusSubscribeBadRequest() *GetJobsIDStatusSubscribeBadRequest { + return &GetJobsIDStatusSubscribeBadRequest{} +} + +/* +GetJobsIDStatusSubscribeBadRequest describes a response with status code 400, with default header values. + +Bad Request +*/ +type GetJobsIDStatusSubscribeBadRequest struct { + Payload *model.ErrorResponse +} + +// IsSuccess returns true when this get jobs Id status subscribe bad request response has a 2xx status code +func (o *GetJobsIDStatusSubscribeBadRequest) IsSuccess() bool { + return false +} + +// IsRedirect returns true when this get jobs Id status subscribe bad request response has a 3xx status code +func (o *GetJobsIDStatusSubscribeBadRequest) IsRedirect() bool { + return false +} + +// IsClientError returns true when this get jobs Id status subscribe bad request response has a 4xx status code +func (o *GetJobsIDStatusSubscribeBadRequest) IsClientError() bool { + return true +} + +// IsServerError returns true when this get jobs Id status subscribe bad request response has a 5xx status code +func (o *GetJobsIDStatusSubscribeBadRequest) IsServerError() bool { + return false +} + +// IsCode returns true when this get jobs Id status subscribe bad request response a status code equal to that given +func (o *GetJobsIDStatusSubscribeBadRequest) IsCode(code int) bool { + return code == 400 +} + +// Code gets the status code for the get jobs Id status subscribe bad request response +func (o *GetJobsIDStatusSubscribeBadRequest) Code() int { + return 400 +} + +func (o *GetJobsIDStatusSubscribeBadRequest) Error() string { + return fmt.Sprintf("[GET /jobs/{id}/status/subscribe][%d] getJobsIdStatusSubscribeBadRequest %+v", 400, o.Payload) +} + +func (o *GetJobsIDStatusSubscribeBadRequest) String() string { + return fmt.Sprintf("[GET /jobs/{id}/status/subscribe][%d] getJobsIdStatusSubscribeBadRequest %+v", 400, o.Payload) +} + +func (o *GetJobsIDStatusSubscribeBadRequest) GetPayload() *model.ErrorResponse { + return o.Payload +} + +func (o *GetJobsIDStatusSubscribeBadRequest) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + o.Payload = new(model.ErrorResponse) + + // response payload + if err := consumer.Consume(response.Body(), o.Payload); err != nil && err != io.EOF { + return err + } + + return nil +} + +// NewGetJobsIDStatusSubscribeNotFound creates a GetJobsIDStatusSubscribeNotFound with default headers values +func NewGetJobsIDStatusSubscribeNotFound() *GetJobsIDStatusSubscribeNotFound { + return &GetJobsIDStatusSubscribeNotFound{} +} + +/* +GetJobsIDStatusSubscribeNotFound describes a response with status code 404, with default header values. + +Not Found +*/ +type GetJobsIDStatusSubscribeNotFound struct { + Payload *model.ErrorResponse +} + +// IsSuccess returns true when this get jobs Id status subscribe not found response has a 2xx status code +func (o *GetJobsIDStatusSubscribeNotFound) IsSuccess() bool { + return false +} + +// IsRedirect returns true when this get jobs Id status subscribe not found response has a 3xx status code +func (o *GetJobsIDStatusSubscribeNotFound) IsRedirect() bool { + return false +} + +// IsClientError returns true when this get jobs Id status subscribe not found response has a 4xx status code +func (o *GetJobsIDStatusSubscribeNotFound) IsClientError() bool { + return true +} + +// IsServerError returns true when this get jobs Id status subscribe not found response has a 5xx status code +func (o *GetJobsIDStatusSubscribeNotFound) IsServerError() bool { + return false +} + +// IsCode returns true when this get jobs Id status subscribe not found response a status code equal to that given +func (o *GetJobsIDStatusSubscribeNotFound) IsCode(code int) bool { + return code == 404 +} + +// Code gets the status code for the get jobs Id status subscribe not found response +func (o *GetJobsIDStatusSubscribeNotFound) Code() int { + return 404 +} + +func (o *GetJobsIDStatusSubscribeNotFound) Error() string { + return fmt.Sprintf("[GET /jobs/{id}/status/subscribe][%d] getJobsIdStatusSubscribeNotFound %+v", 404, o.Payload) +} + +func (o *GetJobsIDStatusSubscribeNotFound) String() string { + return fmt.Sprintf("[GET /jobs/{id}/status/subscribe][%d] getJobsIdStatusSubscribeNotFound %+v", 404, o.Payload) +} + +func (o *GetJobsIDStatusSubscribeNotFound) GetPayload() *model.ErrorResponse { + return o.Payload +} + +func (o *GetJobsIDStatusSubscribeNotFound) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + o.Payload = new(model.ErrorResponse) + + // response payload + if err := consumer.Consume(response.Body(), o.Payload); err != nil && err != io.EOF { + return err + } + + return nil +} + +// NewGetJobsIDStatusSubscribeDefault creates a GetJobsIDStatusSubscribeDefault with default headers values +func NewGetJobsIDStatusSubscribeDefault(code int) *GetJobsIDStatusSubscribeDefault { + return &GetJobsIDStatusSubscribeDefault{ + _statusCode: code, + } +} + +/* +GetJobsIDStatusSubscribeDefault describes a response with status code -1, with default header values. + +Other error with any status code and response body format. +*/ +type GetJobsIDStatusSubscribeDefault struct { + _statusCode int +} + +// IsSuccess returns true when this get jobs ID status subscribe default response has a 2xx status code +func (o *GetJobsIDStatusSubscribeDefault) IsSuccess() bool { + return o._statusCode/100 == 2 +} + +// IsRedirect returns true when this get jobs ID status subscribe default response has a 3xx status code +func (o *GetJobsIDStatusSubscribeDefault) IsRedirect() bool { + return o._statusCode/100 == 3 +} + +// IsClientError returns true when this get jobs ID status subscribe default response has a 4xx status code +func (o *GetJobsIDStatusSubscribeDefault) IsClientError() bool { + return o._statusCode/100 == 4 +} + +// IsServerError returns true when this get jobs ID status subscribe default response has a 5xx status code +func (o *GetJobsIDStatusSubscribeDefault) IsServerError() bool { + return o._statusCode/100 == 5 +} + +// IsCode returns true when this get jobs ID status subscribe default response a status code equal to that given +func (o *GetJobsIDStatusSubscribeDefault) IsCode(code int) bool { + return o._statusCode == code +} + +// Code gets the status code for the get jobs ID status subscribe default response +func (o *GetJobsIDStatusSubscribeDefault) Code() int { + return o._statusCode +} + +func (o *GetJobsIDStatusSubscribeDefault) Error() string { + return fmt.Sprintf("[GET /jobs/{id}/status/subscribe][%d] GetJobsIDStatusSubscribe default ", o._statusCode) +} + +func (o *GetJobsIDStatusSubscribeDefault) String() string { + return fmt.Sprintf("[GET /jobs/{id}/status/subscribe][%d] GetJobsIDStatusSubscribe default ", o._statusCode) +} + +func (o *GetJobsIDStatusSubscribeDefault) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + return nil +} diff --git a/generated/client/jobs/jobs_client.go b/generated/client/jobs/jobs_client.go index a0965a56..4392a285 100644 --- a/generated/client/jobs/jobs_client.go +++ b/generated/client/jobs/jobs_client.go @@ -45,6 +45,8 @@ type ClientService interface { GetJobsIDStatus(params *GetJobsIDStatusParams, opts ...ClientOption) (*GetJobsIDStatusOK, error) + GetJobsIDStatusSubscribe(params *GetJobsIDStatusSubscribeParams, opts ...ClientOption) (*GetJobsIDStatusSubscribeOK, error) + GetJobsIDTags(params *GetJobsIDTagsParams, opts ...ClientOption) (*GetJobsIDTagsOK, error) PostJobs(params *PostJobsParams, opts ...ClientOption) (*PostJobsCreated, error) @@ -297,6 +299,48 @@ func (a *Client) GetJobsIDStatus(params *GetJobsIDStatusParams, opts ...ClientOp return nil, runtime.NewAPIError("unexpected success response: content available as default response in error", unexpectedSuccess, unexpectedSuccess.Code()) } +/* + GetJobsIDStatusSubscribe subscribes to job status updates + + Obtain instant notifications when there is a change in the job status. This endpoint utilizes server-sent events (SSE), where responses are "chunked" with double newline breaks. For example, a single event might look like this: + data: {"clientId":"example_client","state":"INSTALLING"}\n\n + +Note: The first event is always the current job status, i.e. equivalent to calling GET on /jobs/{id}/status. +*/ +func (a *Client) GetJobsIDStatusSubscribe(params *GetJobsIDStatusSubscribeParams, opts ...ClientOption) (*GetJobsIDStatusSubscribeOK, error) { + // TODO: Validate the params before sending + if params == nil { + params = NewGetJobsIDStatusSubscribeParams() + } + op := &runtime.ClientOperation{ + ID: "GetJobsIDStatusSubscribe", + Method: "GET", + PathPattern: "/jobs/{id}/status/subscribe", + ProducesMediaTypes: []string{"application/json", "text/event-stream"}, + ConsumesMediaTypes: []string{"application/json"}, + Schemes: []string{"http"}, + Params: params, + Reader: &GetJobsIDStatusSubscribeReader{formats: a.formats}, + Context: params.Context, + Client: params.HTTPClient, + } + for _, opt := range opts { + opt(op) + } + + result, err := a.transport.Submit(op) + if err != nil { + return nil, err + } + success, ok := result.(*GetJobsIDStatusSubscribeOK) + if ok { + return success, nil + } + // unexpected success response + unexpectedSuccess := result.(*GetJobsIDStatusSubscribeDefault) + return nil, runtime.NewAPIError("unexpected success response: content available as default response in error", unexpectedSuccess, unexpectedSuccess.Code()) +} + /* GetJobsIDTags gets tags diff --git a/generated/northbound/restapi/configure_workflow_executor.go b/generated/northbound/restapi/configure_workflow_executor.go index fa1dc177..c10e7837 100644 --- a/generated/northbound/restapi/configure_workflow_executor.go +++ b/generated/northbound/restapi/configure_workflow_executor.go @@ -30,6 +30,8 @@ func ConfigureAPI(api *operations.WorkflowExecutorAPI) http.Handler { api.JSONProducer = producer.JSONProducer() + api.TextEventStreamProducer = producer.TextEventStreamProducer() + api.PreServerShutdown = func() {} return setupGlobalMiddleware(api.Serve(setupMiddlewares)) diff --git a/generated/northbound/restapi/doc.go b/generated/northbound/restapi/doc.go index ac633a29..5876cb2b 100644 --- a/generated/northbound/restapi/doc.go +++ b/generated/northbound/restapi/doc.go @@ -18,6 +18,7 @@ // // Produces: // - application/json +// - text/event-stream // // swagger:meta package restapi diff --git a/generated/northbound/restapi/embedded_spec.go b/generated/northbound/restapi/embedded_spec.go index d8378c55..10b7a1a9 100644 --- a/generated/northbound/restapi/embedded_spec.go +++ b/generated/northbound/restapi/embedded_spec.go @@ -536,6 +536,68 @@ func init() { } } }, + "/jobs/{id}/status/subscribe": { + "get": { + "description": "Obtain instant notifications when there is a change in the job status. This endpoint utilizes server-sent events (SSE), where responses are \"chunked\" with double newline breaks. For example, a single event might look like this:\n data: {\"clientId\":\"example_client\",\"state\":\"INSTALLING\"}\\n\\n\n\nNote: The first event is always the current job status, i.e. equivalent to calling GET on /jobs/{id}/status.\n", + "produces": [ + "application/json", + "text/event-stream" + ], + "tags": [ + "jobs", + "northbound", + "southbound" + ], + "summary": "Subscribe to job status updates", + "parameters": [ + { + "$ref": "#/parameters/jobId" + } + ], + "responses": { + "200": { + "description": "A stream of server-sent events" + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/ErrorResponse" + }, + "examples": { + "Error responses occurring at this operation for invalid requests": { + "errors": [ + { + "code": "wfx.jobTerminalState", + "logref": "916f0a913a3e4a52a96bd271e029c201", + "message": "The request was invalid because the job is in a terminal state" + } + ] + } + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/ErrorResponse" + }, + "examples": { + "Error responses occurring at this operation while updating a non-existent job": { + "errors": [ + { + "code": "wfx.jobNotFound", + "logref": "11cc67762090e15b79a1387eca65ba65", + "message": "Job ID was not found" + } + ] + } + } + }, + "default": { + "description": "Other error with any status code and response body format." + } + } + } + }, "/jobs/{id}/tags": { "get": { "description": "Get the tags of a job", @@ -1530,6 +1592,11 @@ func init() { "logref": "11cc67762090e15b79a1387eca65ba65", "message": "Job ID was not found" }, + "jobTerminalStateError": { + "code": "wfx.jobTerminalState", + "logref": "916f0a913a3e4a52a96bd271e029c201", + "message": "The request was invalid because the job is in a terminal state" + }, "workflowInvalidError": { "code": "wfx.workflowInvalid", "logref": "18f57adc70dd79c7fb4f1246be8a6e04", @@ -2129,6 +2196,72 @@ func init() { } } }, + "/jobs/{id}/status/subscribe": { + "get": { + "description": "Obtain instant notifications when there is a change in the job status. This endpoint utilizes server-sent events (SSE), where responses are \"chunked\" with double newline breaks. For example, a single event might look like this:\n data: {\"clientId\":\"example_client\",\"state\":\"INSTALLING\"}\\n\\n\n\nNote: The first event is always the current job status, i.e. equivalent to calling GET on /jobs/{id}/status.\n", + "produces": [ + "application/json", + "text/event-stream" + ], + "tags": [ + "jobs", + "northbound", + "southbound" + ], + "summary": "Subscribe to job status updates", + "parameters": [ + { + "type": "string", + "description": "Job ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "A stream of server-sent events" + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/ErrorResponse" + }, + "examples": { + "Error responses occurring at this operation for invalid requests": { + "errors": [ + { + "code": "wfx.jobTerminalState", + "logref": "916f0a913a3e4a52a96bd271e029c201", + "message": "The request was invalid because the job is in a terminal state" + } + ] + } + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/ErrorResponse" + }, + "examples": { + "Error responses occurring at this operation while updating a non-existent job": { + "errors": [ + { + "code": "wfx.jobNotFound", + "logref": "11cc67762090e15b79a1387eca65ba65", + "message": "Job ID was not found" + } + ] + } + } + }, + "default": { + "description": "Other error with any status code and response body format." + } + } + } + }, "/jobs/{id}/tags": { "get": { "description": "Get the tags of a job", @@ -3200,6 +3333,11 @@ func init() { "logref": "11cc67762090e15b79a1387eca65ba65", "message": "Job ID was not found" }, + "jobTerminalStateError": { + "code": "wfx.jobTerminalState", + "logref": "916f0a913a3e4a52a96bd271e029c201", + "message": "The request was invalid because the job is in a terminal state" + }, "workflowInvalidError": { "code": "wfx.workflowInvalid", "logref": "18f57adc70dd79c7fb4f1246be8a6e04", diff --git a/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe.go b/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe.go new file mode 100644 index 00000000..6886723f --- /dev/null +++ b/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe.go @@ -0,0 +1,67 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package northbound + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the generate command + +import ( + "net/http" + + "github.com/go-openapi/runtime/middleware" +) + +// GetJobsIDStatusSubscribeHandlerFunc turns a function with the right signature into a get jobs ID status subscribe handler +type GetJobsIDStatusSubscribeHandlerFunc func(GetJobsIDStatusSubscribeParams) middleware.Responder + +// Handle executing the request and returning a response +func (fn GetJobsIDStatusSubscribeHandlerFunc) Handle(params GetJobsIDStatusSubscribeParams) middleware.Responder { + return fn(params) +} + +// GetJobsIDStatusSubscribeHandler interface for that can handle valid get jobs ID status subscribe params +type GetJobsIDStatusSubscribeHandler interface { + Handle(GetJobsIDStatusSubscribeParams) middleware.Responder +} + +// NewGetJobsIDStatusSubscribe creates a new http.Handler for the get jobs ID status subscribe operation +func NewGetJobsIDStatusSubscribe(ctx *middleware.Context, handler GetJobsIDStatusSubscribeHandler) *GetJobsIDStatusSubscribe { + return &GetJobsIDStatusSubscribe{Context: ctx, Handler: handler} +} + +/* + GetJobsIDStatusSubscribe swagger:route GET /jobs/{id}/status/subscribe northbound getJobsIdStatusSubscribe + +# Subscribe to job status updates + +Obtain instant notifications when there is a change in the job status. This endpoint utilizes server-sent events (SSE), where responses are "chunked" with double newline breaks. For example, a single event might look like this: + + data: {"clientId":"example_client","state":"INSTALLING"}\n\n + +Note: The first event is always the current job status, i.e. equivalent to calling GET on /jobs/{id}/status. +*/ +type GetJobsIDStatusSubscribe struct { + Context *middleware.Context + Handler GetJobsIDStatusSubscribeHandler +} + +func (o *GetJobsIDStatusSubscribe) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + route, rCtx, _ := o.Context.RouteInfo(r) + if rCtx != nil { + *r = *rCtx + } + var Params = NewGetJobsIDStatusSubscribeParams() + if err := o.Context.BindValidRequest(r, route, &Params); err != nil { // bind params + o.Context.Respond(rw, r, route.Produces, route, err) + return + } + + res := o.Handler.Handle(Params) // actually handle the request + o.Context.Respond(rw, r, route.Produces, route, res) + +} diff --git a/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_parameters.go b/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_parameters.go new file mode 100644 index 00000000..fe762c90 --- /dev/null +++ b/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_parameters.go @@ -0,0 +1,76 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package northbound + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "net/http" + + "github.com/go-openapi/errors" + "github.com/go-openapi/runtime/middleware" + "github.com/go-openapi/strfmt" +) + +// NewGetJobsIDStatusSubscribeParams creates a new GetJobsIDStatusSubscribeParams object +// +// There are no default values defined in the spec. +func NewGetJobsIDStatusSubscribeParams() GetJobsIDStatusSubscribeParams { + + return GetJobsIDStatusSubscribeParams{} +} + +// GetJobsIDStatusSubscribeParams contains all the bound params for the get jobs ID status subscribe operation +// typically these are obtained from a http.Request +// +// swagger:parameters GetJobsIDStatusSubscribe +type GetJobsIDStatusSubscribeParams struct { + + // HTTP Request Object + HTTPRequest *http.Request `json:"-"` + + /*Job ID + Required: true + In: path + */ + ID string +} + +// BindRequest both binds and validates a request, it assumes that complex things implement a Validatable(strfmt.Registry) error interface +// for simple values it will use straight method calls. +// +// To ensure default values, the struct must have been initialized with NewGetJobsIDStatusSubscribeParams() beforehand. +func (o *GetJobsIDStatusSubscribeParams) BindRequest(r *http.Request, route *middleware.MatchedRoute) error { + var res []error + + o.HTTPRequest = r + + rID, rhkID, _ := route.Params.GetOK("id") + if err := o.bindID(rID, rhkID, route.Formats); err != nil { + res = append(res, err) + } + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +// bindID binds and validates parameter ID from path. +func (o *GetJobsIDStatusSubscribeParams) bindID(rawData []string, hasKey bool, formats strfmt.Registry) error { + var raw string + if len(rawData) > 0 { + raw = rawData[len(rawData)-1] + } + + // Required: true + // Parameter is provided by construction from the route + o.ID = raw + + return nil +} diff --git a/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_responses.go b/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_responses.go new file mode 100644 index 00000000..11d8c63f --- /dev/null +++ b/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_responses.go @@ -0,0 +1,172 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package northbound + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "net/http" + + "github.com/go-openapi/runtime" + "github.com/siemens/wfx/generated/model" +) + +// GetJobsIDStatusSubscribeOKCode is the HTTP code returned for type GetJobsIDStatusSubscribeOK +const GetJobsIDStatusSubscribeOKCode int = 200 + +/* +GetJobsIDStatusSubscribeOK A stream of server-sent events + +swagger:response getJobsIdStatusSubscribeOK +*/ +type GetJobsIDStatusSubscribeOK struct { +} + +// NewGetJobsIDStatusSubscribeOK creates GetJobsIDStatusSubscribeOK with default headers values +func NewGetJobsIDStatusSubscribeOK() *GetJobsIDStatusSubscribeOK { + + return &GetJobsIDStatusSubscribeOK{} +} + +// WriteResponse to the client +func (o *GetJobsIDStatusSubscribeOK) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + + rw.Header().Del(runtime.HeaderContentType) //Remove Content-Type on empty responses + + rw.WriteHeader(200) +} + +// GetJobsIDStatusSubscribeBadRequestCode is the HTTP code returned for type GetJobsIDStatusSubscribeBadRequest +const GetJobsIDStatusSubscribeBadRequestCode int = 400 + +/* +GetJobsIDStatusSubscribeBadRequest Bad Request + +swagger:response getJobsIdStatusSubscribeBadRequest +*/ +type GetJobsIDStatusSubscribeBadRequest struct { + + /* + In: Body + */ + Payload *model.ErrorResponse `json:"body,omitempty"` +} + +// NewGetJobsIDStatusSubscribeBadRequest creates GetJobsIDStatusSubscribeBadRequest with default headers values +func NewGetJobsIDStatusSubscribeBadRequest() *GetJobsIDStatusSubscribeBadRequest { + + return &GetJobsIDStatusSubscribeBadRequest{} +} + +// WithPayload adds the payload to the get jobs Id status subscribe bad request response +func (o *GetJobsIDStatusSubscribeBadRequest) WithPayload(payload *model.ErrorResponse) *GetJobsIDStatusSubscribeBadRequest { + o.Payload = payload + return o +} + +// SetPayload sets the payload to the get jobs Id status subscribe bad request response +func (o *GetJobsIDStatusSubscribeBadRequest) SetPayload(payload *model.ErrorResponse) { + o.Payload = payload +} + +// WriteResponse to the client +func (o *GetJobsIDStatusSubscribeBadRequest) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + + rw.WriteHeader(400) + if o.Payload != nil { + payload := o.Payload + if err := producer.Produce(rw, payload); err != nil { + panic(err) // let the recovery middleware deal with this + } + } +} + +// GetJobsIDStatusSubscribeNotFoundCode is the HTTP code returned for type GetJobsIDStatusSubscribeNotFound +const GetJobsIDStatusSubscribeNotFoundCode int = 404 + +/* +GetJobsIDStatusSubscribeNotFound Not Found + +swagger:response getJobsIdStatusSubscribeNotFound +*/ +type GetJobsIDStatusSubscribeNotFound struct { + + /* + In: Body + */ + Payload *model.ErrorResponse `json:"body,omitempty"` +} + +// NewGetJobsIDStatusSubscribeNotFound creates GetJobsIDStatusSubscribeNotFound with default headers values +func NewGetJobsIDStatusSubscribeNotFound() *GetJobsIDStatusSubscribeNotFound { + + return &GetJobsIDStatusSubscribeNotFound{} +} + +// WithPayload adds the payload to the get jobs Id status subscribe not found response +func (o *GetJobsIDStatusSubscribeNotFound) WithPayload(payload *model.ErrorResponse) *GetJobsIDStatusSubscribeNotFound { + o.Payload = payload + return o +} + +// SetPayload sets the payload to the get jobs Id status subscribe not found response +func (o *GetJobsIDStatusSubscribeNotFound) SetPayload(payload *model.ErrorResponse) { + o.Payload = payload +} + +// WriteResponse to the client +func (o *GetJobsIDStatusSubscribeNotFound) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + + rw.WriteHeader(404) + if o.Payload != nil { + payload := o.Payload + if err := producer.Produce(rw, payload); err != nil { + panic(err) // let the recovery middleware deal with this + } + } +} + +/* +GetJobsIDStatusSubscribeDefault Other error with any status code and response body format. + +swagger:response getJobsIdStatusSubscribeDefault +*/ +type GetJobsIDStatusSubscribeDefault struct { + _statusCode int +} + +// NewGetJobsIDStatusSubscribeDefault creates GetJobsIDStatusSubscribeDefault with default headers values +func NewGetJobsIDStatusSubscribeDefault(code int) *GetJobsIDStatusSubscribeDefault { + if code <= 0 { + code = 500 + } + + return &GetJobsIDStatusSubscribeDefault{ + _statusCode: code, + } +} + +// WithStatusCode adds the status to the get jobs ID status subscribe default response +func (o *GetJobsIDStatusSubscribeDefault) WithStatusCode(code int) *GetJobsIDStatusSubscribeDefault { + o._statusCode = code + return o +} + +// SetStatusCode sets the status to the get jobs ID status subscribe default response +func (o *GetJobsIDStatusSubscribeDefault) SetStatusCode(code int) { + o._statusCode = code +} + +// WriteResponse to the client +func (o *GetJobsIDStatusSubscribeDefault) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + + rw.Header().Del(runtime.HeaderContentType) //Remove Content-Type on empty responses + + rw.WriteHeader(o._statusCode) +} diff --git a/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_urlbuilder.go b/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_urlbuilder.go new file mode 100644 index 00000000..432f5a00 --- /dev/null +++ b/generated/northbound/restapi/operations/northbound/get_jobs_id_status_subscribe_urlbuilder.go @@ -0,0 +1,104 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package northbound + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the generate command + +import ( + "errors" + "net/url" + golangswaggerpaths "path" + "strings" +) + +// GetJobsIDStatusSubscribeURL generates an URL for the get jobs ID status subscribe operation +type GetJobsIDStatusSubscribeURL struct { + ID string + + _basePath string + // avoid unkeyed usage + _ struct{} +} + +// WithBasePath sets the base path for this url builder, only required when it's different from the +// base path specified in the swagger spec. +// When the value of the base path is an empty string +func (o *GetJobsIDStatusSubscribeURL) WithBasePath(bp string) *GetJobsIDStatusSubscribeURL { + o.SetBasePath(bp) + return o +} + +// SetBasePath sets the base path for this url builder, only required when it's different from the +// base path specified in the swagger spec. +// When the value of the base path is an empty string +func (o *GetJobsIDStatusSubscribeURL) SetBasePath(bp string) { + o._basePath = bp +} + +// Build a url path and query string +func (o *GetJobsIDStatusSubscribeURL) Build() (*url.URL, error) { + var _result url.URL + + var _path = "/jobs/{id}/status/subscribe" + + id := o.ID + if id != "" { + _path = strings.Replace(_path, "{id}", id, -1) + } else { + return nil, errors.New("id is required on GetJobsIDStatusSubscribeURL") + } + + _basePath := o._basePath + if _basePath == "" { + _basePath = "/api/wfx/v1" + } + _result.Path = golangswaggerpaths.Join(_basePath, _path) + + return &_result, nil +} + +// Must is a helper function to panic when the url builder returns an error +func (o *GetJobsIDStatusSubscribeURL) Must(u *url.URL, err error) *url.URL { + if err != nil { + panic(err) + } + if u == nil { + panic("url can't be nil") + } + return u +} + +// String returns the string representation of the path with query string +func (o *GetJobsIDStatusSubscribeURL) String() string { + return o.Must(o.Build()).String() +} + +// BuildFull builds a full url with scheme, host, path and query string +func (o *GetJobsIDStatusSubscribeURL) BuildFull(scheme, host string) (*url.URL, error) { + if scheme == "" { + return nil, errors.New("scheme is required for a full url on GetJobsIDStatusSubscribeURL") + } + if host == "" { + return nil, errors.New("host is required for a full url on GetJobsIDStatusSubscribeURL") + } + + base, err := o.Build() + if err != nil { + return nil, err + } + + base.Scheme = scheme + base.Host = host + return base, nil +} + +// StringFull returns the string representation of a complete url +func (o *GetJobsIDStatusSubscribeURL) StringFull(scheme, host string) string { + return o.Must(o.BuildFull(scheme, host)).String() +} diff --git a/generated/northbound/restapi/operations/workflow_executor_api.go b/generated/northbound/restapi/operations/workflow_executor_api.go index d3b5e11a..a6055537 100644 --- a/generated/northbound/restapi/operations/workflow_executor_api.go +++ b/generated/northbound/restapi/operations/workflow_executor_api.go @@ -12,6 +12,7 @@ package operations import ( "fmt" + "io" "net/http" "strings" @@ -48,6 +49,9 @@ func NewWorkflowExecutorAPI(spec *loads.Document) *WorkflowExecutorAPI { JSONConsumer: runtime.JSONConsumer(), JSONProducer: runtime.JSONProducer(), + TextEventStreamProducer: runtime.ProducerFunc(func(w io.Writer, data interface{}) error { + return errors.NotImplemented("textEventStream producer has not yet been implemented") + }), NorthboundDeleteJobsIDHandler: northbound.DeleteJobsIDHandlerFunc(func(params northbound.DeleteJobsIDParams) middleware.Responder { return middleware.NotImplemented("operation northbound.DeleteJobsID has not yet been implemented") @@ -70,6 +74,9 @@ func NewWorkflowExecutorAPI(spec *loads.Document) *WorkflowExecutorAPI { NorthboundGetJobsIDStatusHandler: northbound.GetJobsIDStatusHandlerFunc(func(params northbound.GetJobsIDStatusParams) middleware.Responder { return middleware.NotImplemented("operation northbound.GetJobsIDStatus has not yet been implemented") }), + NorthboundGetJobsIDStatusSubscribeHandler: northbound.GetJobsIDStatusSubscribeHandlerFunc(func(params northbound.GetJobsIDStatusSubscribeParams) middleware.Responder { + return middleware.NotImplemented("operation northbound.GetJobsIDStatusSubscribe has not yet been implemented") + }), NorthboundGetJobsIDTagsHandler: northbound.GetJobsIDTagsHandlerFunc(func(params northbound.GetJobsIDTagsParams) middleware.Responder { return middleware.NotImplemented("operation northbound.GetJobsIDTags has not yet been implemented") }), @@ -129,6 +136,9 @@ type WorkflowExecutorAPI struct { // JSONProducer registers a producer for the following mime types: // - application/json JSONProducer runtime.Producer + // TextEventStreamProducer registers a producer for the following mime types: + // - text/event-stream + TextEventStreamProducer runtime.Producer // NorthboundDeleteJobsIDHandler sets the operation handler for the delete jobs ID operation NorthboundDeleteJobsIDHandler northbound.DeleteJobsIDHandler @@ -144,6 +154,8 @@ type WorkflowExecutorAPI struct { NorthboundGetJobsIDDefinitionHandler northbound.GetJobsIDDefinitionHandler // NorthboundGetJobsIDStatusHandler sets the operation handler for the get jobs ID status operation NorthboundGetJobsIDStatusHandler northbound.GetJobsIDStatusHandler + // NorthboundGetJobsIDStatusSubscribeHandler sets the operation handler for the get jobs ID status subscribe operation + NorthboundGetJobsIDStatusSubscribeHandler northbound.GetJobsIDStatusSubscribeHandler // NorthboundGetJobsIDTagsHandler sets the operation handler for the get jobs ID tags operation NorthboundGetJobsIDTagsHandler northbound.GetJobsIDTagsHandler // NorthboundGetWorkflowsHandler sets the operation handler for the get workflows operation @@ -236,6 +248,9 @@ func (o *WorkflowExecutorAPI) Validate() error { if o.JSONProducer == nil { unregistered = append(unregistered, "JSONProducer") } + if o.TextEventStreamProducer == nil { + unregistered = append(unregistered, "TextEventStreamProducer") + } if o.NorthboundDeleteJobsIDHandler == nil { unregistered = append(unregistered, "northbound.DeleteJobsIDHandler") @@ -258,6 +273,9 @@ func (o *WorkflowExecutorAPI) Validate() error { if o.NorthboundGetJobsIDStatusHandler == nil { unregistered = append(unregistered, "northbound.GetJobsIDStatusHandler") } + if o.NorthboundGetJobsIDStatusSubscribeHandler == nil { + unregistered = append(unregistered, "northbound.GetJobsIDStatusSubscribeHandler") + } if o.NorthboundGetJobsIDTagsHandler == nil { unregistered = append(unregistered, "northbound.GetJobsIDTagsHandler") } @@ -330,6 +348,8 @@ func (o *WorkflowExecutorAPI) ProducersFor(mediaTypes []string) map[string]runti switch mt { case "application/json": result["application/json"] = o.JSONProducer + case "text/event-stream": + result["text/event-stream"] = o.TextEventStreamProducer } if p, ok := o.customProducers[mt]; ok { @@ -401,6 +421,10 @@ func (o *WorkflowExecutorAPI) initHandlerCache() { if o.handlers["GET"] == nil { o.handlers["GET"] = make(map[string]http.Handler) } + o.handlers["GET"]["/jobs/{id}/status/subscribe"] = northbound.NewGetJobsIDStatusSubscribe(o.context, o.NorthboundGetJobsIDStatusSubscribeHandler) + if o.handlers["GET"] == nil { + o.handlers["GET"] = make(map[string]http.Handler) + } o.handlers["GET"]["/jobs/{id}/tags"] = northbound.NewGetJobsIDTags(o.context, o.NorthboundGetJobsIDTagsHandler) if o.handlers["GET"] == nil { o.handlers["GET"] = make(map[string]http.Handler) diff --git a/generated/southbound/restapi/configure_workflow_executor.go b/generated/southbound/restapi/configure_workflow_executor.go index e86c8f38..13570198 100644 --- a/generated/southbound/restapi/configure_workflow_executor.go +++ b/generated/southbound/restapi/configure_workflow_executor.go @@ -30,6 +30,8 @@ func ConfigureAPI(api *operations.WorkflowExecutorAPI) http.Handler { api.JSONProducer = producer.JSONProducer() + api.TextEventStreamProducer = producer.TextEventStreamProducer() + api.PreServerShutdown = func() {} return setupGlobalMiddleware(api.Serve(setupMiddlewares)) diff --git a/generated/southbound/restapi/doc.go b/generated/southbound/restapi/doc.go index ac633a29..5876cb2b 100644 --- a/generated/southbound/restapi/doc.go +++ b/generated/southbound/restapi/doc.go @@ -18,6 +18,7 @@ // // Produces: // - application/json +// - text/event-stream // // swagger:meta package restapi diff --git a/generated/southbound/restapi/embedded_spec.go b/generated/southbound/restapi/embedded_spec.go index d8378c55..10b7a1a9 100644 --- a/generated/southbound/restapi/embedded_spec.go +++ b/generated/southbound/restapi/embedded_spec.go @@ -536,6 +536,68 @@ func init() { } } }, + "/jobs/{id}/status/subscribe": { + "get": { + "description": "Obtain instant notifications when there is a change in the job status. This endpoint utilizes server-sent events (SSE), where responses are \"chunked\" with double newline breaks. For example, a single event might look like this:\n data: {\"clientId\":\"example_client\",\"state\":\"INSTALLING\"}\\n\\n\n\nNote: The first event is always the current job status, i.e. equivalent to calling GET on /jobs/{id}/status.\n", + "produces": [ + "application/json", + "text/event-stream" + ], + "tags": [ + "jobs", + "northbound", + "southbound" + ], + "summary": "Subscribe to job status updates", + "parameters": [ + { + "$ref": "#/parameters/jobId" + } + ], + "responses": { + "200": { + "description": "A stream of server-sent events" + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/ErrorResponse" + }, + "examples": { + "Error responses occurring at this operation for invalid requests": { + "errors": [ + { + "code": "wfx.jobTerminalState", + "logref": "916f0a913a3e4a52a96bd271e029c201", + "message": "The request was invalid because the job is in a terminal state" + } + ] + } + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/ErrorResponse" + }, + "examples": { + "Error responses occurring at this operation while updating a non-existent job": { + "errors": [ + { + "code": "wfx.jobNotFound", + "logref": "11cc67762090e15b79a1387eca65ba65", + "message": "Job ID was not found" + } + ] + } + } + }, + "default": { + "description": "Other error with any status code and response body format." + } + } + } + }, "/jobs/{id}/tags": { "get": { "description": "Get the tags of a job", @@ -1530,6 +1592,11 @@ func init() { "logref": "11cc67762090e15b79a1387eca65ba65", "message": "Job ID was not found" }, + "jobTerminalStateError": { + "code": "wfx.jobTerminalState", + "logref": "916f0a913a3e4a52a96bd271e029c201", + "message": "The request was invalid because the job is in a terminal state" + }, "workflowInvalidError": { "code": "wfx.workflowInvalid", "logref": "18f57adc70dd79c7fb4f1246be8a6e04", @@ -2129,6 +2196,72 @@ func init() { } } }, + "/jobs/{id}/status/subscribe": { + "get": { + "description": "Obtain instant notifications when there is a change in the job status. This endpoint utilizes server-sent events (SSE), where responses are \"chunked\" with double newline breaks. For example, a single event might look like this:\n data: {\"clientId\":\"example_client\",\"state\":\"INSTALLING\"}\\n\\n\n\nNote: The first event is always the current job status, i.e. equivalent to calling GET on /jobs/{id}/status.\n", + "produces": [ + "application/json", + "text/event-stream" + ], + "tags": [ + "jobs", + "northbound", + "southbound" + ], + "summary": "Subscribe to job status updates", + "parameters": [ + { + "type": "string", + "description": "Job ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "A stream of server-sent events" + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/ErrorResponse" + }, + "examples": { + "Error responses occurring at this operation for invalid requests": { + "errors": [ + { + "code": "wfx.jobTerminalState", + "logref": "916f0a913a3e4a52a96bd271e029c201", + "message": "The request was invalid because the job is in a terminal state" + } + ] + } + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/ErrorResponse" + }, + "examples": { + "Error responses occurring at this operation while updating a non-existent job": { + "errors": [ + { + "code": "wfx.jobNotFound", + "logref": "11cc67762090e15b79a1387eca65ba65", + "message": "Job ID was not found" + } + ] + } + } + }, + "default": { + "description": "Other error with any status code and response body format." + } + } + } + }, "/jobs/{id}/tags": { "get": { "description": "Get the tags of a job", @@ -3200,6 +3333,11 @@ func init() { "logref": "11cc67762090e15b79a1387eca65ba65", "message": "Job ID was not found" }, + "jobTerminalStateError": { + "code": "wfx.jobTerminalState", + "logref": "916f0a913a3e4a52a96bd271e029c201", + "message": "The request was invalid because the job is in a terminal state" + }, "workflowInvalidError": { "code": "wfx.workflowInvalid", "logref": "18f57adc70dd79c7fb4f1246be8a6e04", diff --git a/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe.go b/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe.go new file mode 100644 index 00000000..202dd381 --- /dev/null +++ b/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe.go @@ -0,0 +1,67 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package southbound + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the generate command + +import ( + "net/http" + + "github.com/go-openapi/runtime/middleware" +) + +// GetJobsIDStatusSubscribeHandlerFunc turns a function with the right signature into a get jobs ID status subscribe handler +type GetJobsIDStatusSubscribeHandlerFunc func(GetJobsIDStatusSubscribeParams) middleware.Responder + +// Handle executing the request and returning a response +func (fn GetJobsIDStatusSubscribeHandlerFunc) Handle(params GetJobsIDStatusSubscribeParams) middleware.Responder { + return fn(params) +} + +// GetJobsIDStatusSubscribeHandler interface for that can handle valid get jobs ID status subscribe params +type GetJobsIDStatusSubscribeHandler interface { + Handle(GetJobsIDStatusSubscribeParams) middleware.Responder +} + +// NewGetJobsIDStatusSubscribe creates a new http.Handler for the get jobs ID status subscribe operation +func NewGetJobsIDStatusSubscribe(ctx *middleware.Context, handler GetJobsIDStatusSubscribeHandler) *GetJobsIDStatusSubscribe { + return &GetJobsIDStatusSubscribe{Context: ctx, Handler: handler} +} + +/* + GetJobsIDStatusSubscribe swagger:route GET /jobs/{id}/status/subscribe southbound getJobsIdStatusSubscribe + +# Subscribe to job status updates + +Obtain instant notifications when there is a change in the job status. This endpoint utilizes server-sent events (SSE), where responses are "chunked" with double newline breaks. For example, a single event might look like this: + + data: {"clientId":"example_client","state":"INSTALLING"}\n\n + +Note: The first event is always the current job status, i.e. equivalent to calling GET on /jobs/{id}/status. +*/ +type GetJobsIDStatusSubscribe struct { + Context *middleware.Context + Handler GetJobsIDStatusSubscribeHandler +} + +func (o *GetJobsIDStatusSubscribe) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + route, rCtx, _ := o.Context.RouteInfo(r) + if rCtx != nil { + *r = *rCtx + } + var Params = NewGetJobsIDStatusSubscribeParams() + if err := o.Context.BindValidRequest(r, route, &Params); err != nil { // bind params + o.Context.Respond(rw, r, route.Produces, route, err) + return + } + + res := o.Handler.Handle(Params) // actually handle the request + o.Context.Respond(rw, r, route.Produces, route, res) + +} diff --git a/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_parameters.go b/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_parameters.go new file mode 100644 index 00000000..06f02d22 --- /dev/null +++ b/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_parameters.go @@ -0,0 +1,76 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package southbound + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "net/http" + + "github.com/go-openapi/errors" + "github.com/go-openapi/runtime/middleware" + "github.com/go-openapi/strfmt" +) + +// NewGetJobsIDStatusSubscribeParams creates a new GetJobsIDStatusSubscribeParams object +// +// There are no default values defined in the spec. +func NewGetJobsIDStatusSubscribeParams() GetJobsIDStatusSubscribeParams { + + return GetJobsIDStatusSubscribeParams{} +} + +// GetJobsIDStatusSubscribeParams contains all the bound params for the get jobs ID status subscribe operation +// typically these are obtained from a http.Request +// +// swagger:parameters GetJobsIDStatusSubscribe +type GetJobsIDStatusSubscribeParams struct { + + // HTTP Request Object + HTTPRequest *http.Request `json:"-"` + + /*Job ID + Required: true + In: path + */ + ID string +} + +// BindRequest both binds and validates a request, it assumes that complex things implement a Validatable(strfmt.Registry) error interface +// for simple values it will use straight method calls. +// +// To ensure default values, the struct must have been initialized with NewGetJobsIDStatusSubscribeParams() beforehand. +func (o *GetJobsIDStatusSubscribeParams) BindRequest(r *http.Request, route *middleware.MatchedRoute) error { + var res []error + + o.HTTPRequest = r + + rID, rhkID, _ := route.Params.GetOK("id") + if err := o.bindID(rID, rhkID, route.Formats); err != nil { + res = append(res, err) + } + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +// bindID binds and validates parameter ID from path. +func (o *GetJobsIDStatusSubscribeParams) bindID(rawData []string, hasKey bool, formats strfmt.Registry) error { + var raw string + if len(rawData) > 0 { + raw = rawData[len(rawData)-1] + } + + // Required: true + // Parameter is provided by construction from the route + o.ID = raw + + return nil +} diff --git a/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_responses.go b/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_responses.go new file mode 100644 index 00000000..c02f6300 --- /dev/null +++ b/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_responses.go @@ -0,0 +1,172 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package southbound + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "net/http" + + "github.com/go-openapi/runtime" + "github.com/siemens/wfx/generated/model" +) + +// GetJobsIDStatusSubscribeOKCode is the HTTP code returned for type GetJobsIDStatusSubscribeOK +const GetJobsIDStatusSubscribeOKCode int = 200 + +/* +GetJobsIDStatusSubscribeOK A stream of server-sent events + +swagger:response getJobsIdStatusSubscribeOK +*/ +type GetJobsIDStatusSubscribeOK struct { +} + +// NewGetJobsIDStatusSubscribeOK creates GetJobsIDStatusSubscribeOK with default headers values +func NewGetJobsIDStatusSubscribeOK() *GetJobsIDStatusSubscribeOK { + + return &GetJobsIDStatusSubscribeOK{} +} + +// WriteResponse to the client +func (o *GetJobsIDStatusSubscribeOK) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + + rw.Header().Del(runtime.HeaderContentType) //Remove Content-Type on empty responses + + rw.WriteHeader(200) +} + +// GetJobsIDStatusSubscribeBadRequestCode is the HTTP code returned for type GetJobsIDStatusSubscribeBadRequest +const GetJobsIDStatusSubscribeBadRequestCode int = 400 + +/* +GetJobsIDStatusSubscribeBadRequest Bad Request + +swagger:response getJobsIdStatusSubscribeBadRequest +*/ +type GetJobsIDStatusSubscribeBadRequest struct { + + /* + In: Body + */ + Payload *model.ErrorResponse `json:"body,omitempty"` +} + +// NewGetJobsIDStatusSubscribeBadRequest creates GetJobsIDStatusSubscribeBadRequest with default headers values +func NewGetJobsIDStatusSubscribeBadRequest() *GetJobsIDStatusSubscribeBadRequest { + + return &GetJobsIDStatusSubscribeBadRequest{} +} + +// WithPayload adds the payload to the get jobs Id status subscribe bad request response +func (o *GetJobsIDStatusSubscribeBadRequest) WithPayload(payload *model.ErrorResponse) *GetJobsIDStatusSubscribeBadRequest { + o.Payload = payload + return o +} + +// SetPayload sets the payload to the get jobs Id status subscribe bad request response +func (o *GetJobsIDStatusSubscribeBadRequest) SetPayload(payload *model.ErrorResponse) { + o.Payload = payload +} + +// WriteResponse to the client +func (o *GetJobsIDStatusSubscribeBadRequest) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + + rw.WriteHeader(400) + if o.Payload != nil { + payload := o.Payload + if err := producer.Produce(rw, payload); err != nil { + panic(err) // let the recovery middleware deal with this + } + } +} + +// GetJobsIDStatusSubscribeNotFoundCode is the HTTP code returned for type GetJobsIDStatusSubscribeNotFound +const GetJobsIDStatusSubscribeNotFoundCode int = 404 + +/* +GetJobsIDStatusSubscribeNotFound Not Found + +swagger:response getJobsIdStatusSubscribeNotFound +*/ +type GetJobsIDStatusSubscribeNotFound struct { + + /* + In: Body + */ + Payload *model.ErrorResponse `json:"body,omitempty"` +} + +// NewGetJobsIDStatusSubscribeNotFound creates GetJobsIDStatusSubscribeNotFound with default headers values +func NewGetJobsIDStatusSubscribeNotFound() *GetJobsIDStatusSubscribeNotFound { + + return &GetJobsIDStatusSubscribeNotFound{} +} + +// WithPayload adds the payload to the get jobs Id status subscribe not found response +func (o *GetJobsIDStatusSubscribeNotFound) WithPayload(payload *model.ErrorResponse) *GetJobsIDStatusSubscribeNotFound { + o.Payload = payload + return o +} + +// SetPayload sets the payload to the get jobs Id status subscribe not found response +func (o *GetJobsIDStatusSubscribeNotFound) SetPayload(payload *model.ErrorResponse) { + o.Payload = payload +} + +// WriteResponse to the client +func (o *GetJobsIDStatusSubscribeNotFound) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + + rw.WriteHeader(404) + if o.Payload != nil { + payload := o.Payload + if err := producer.Produce(rw, payload); err != nil { + panic(err) // let the recovery middleware deal with this + } + } +} + +/* +GetJobsIDStatusSubscribeDefault Other error with any status code and response body format. + +swagger:response getJobsIdStatusSubscribeDefault +*/ +type GetJobsIDStatusSubscribeDefault struct { + _statusCode int +} + +// NewGetJobsIDStatusSubscribeDefault creates GetJobsIDStatusSubscribeDefault with default headers values +func NewGetJobsIDStatusSubscribeDefault(code int) *GetJobsIDStatusSubscribeDefault { + if code <= 0 { + code = 500 + } + + return &GetJobsIDStatusSubscribeDefault{ + _statusCode: code, + } +} + +// WithStatusCode adds the status to the get jobs ID status subscribe default response +func (o *GetJobsIDStatusSubscribeDefault) WithStatusCode(code int) *GetJobsIDStatusSubscribeDefault { + o._statusCode = code + return o +} + +// SetStatusCode sets the status to the get jobs ID status subscribe default response +func (o *GetJobsIDStatusSubscribeDefault) SetStatusCode(code int) { + o._statusCode = code +} + +// WriteResponse to the client +func (o *GetJobsIDStatusSubscribeDefault) WriteResponse(rw http.ResponseWriter, producer runtime.Producer) { + + rw.Header().Del(runtime.HeaderContentType) //Remove Content-Type on empty responses + + rw.WriteHeader(o._statusCode) +} diff --git a/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_urlbuilder.go b/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_urlbuilder.go new file mode 100644 index 00000000..abac95fc --- /dev/null +++ b/generated/southbound/restapi/operations/southbound/get_jobs_id_status_subscribe_urlbuilder.go @@ -0,0 +1,104 @@ +// Code generated by go-swagger; DO NOT EDIT. + +// SPDX-FileCopyrightText: 2023 Siemens AG +// +// SPDX-License-Identifier: Apache-2.0 +// + +package southbound + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the generate command + +import ( + "errors" + "net/url" + golangswaggerpaths "path" + "strings" +) + +// GetJobsIDStatusSubscribeURL generates an URL for the get jobs ID status subscribe operation +type GetJobsIDStatusSubscribeURL struct { + ID string + + _basePath string + // avoid unkeyed usage + _ struct{} +} + +// WithBasePath sets the base path for this url builder, only required when it's different from the +// base path specified in the swagger spec. +// When the value of the base path is an empty string +func (o *GetJobsIDStatusSubscribeURL) WithBasePath(bp string) *GetJobsIDStatusSubscribeURL { + o.SetBasePath(bp) + return o +} + +// SetBasePath sets the base path for this url builder, only required when it's different from the +// base path specified in the swagger spec. +// When the value of the base path is an empty string +func (o *GetJobsIDStatusSubscribeURL) SetBasePath(bp string) { + o._basePath = bp +} + +// Build a url path and query string +func (o *GetJobsIDStatusSubscribeURL) Build() (*url.URL, error) { + var _result url.URL + + var _path = "/jobs/{id}/status/subscribe" + + id := o.ID + if id != "" { + _path = strings.Replace(_path, "{id}", id, -1) + } else { + return nil, errors.New("id is required on GetJobsIDStatusSubscribeURL") + } + + _basePath := o._basePath + if _basePath == "" { + _basePath = "/api/wfx/v1" + } + _result.Path = golangswaggerpaths.Join(_basePath, _path) + + return &_result, nil +} + +// Must is a helper function to panic when the url builder returns an error +func (o *GetJobsIDStatusSubscribeURL) Must(u *url.URL, err error) *url.URL { + if err != nil { + panic(err) + } + if u == nil { + panic("url can't be nil") + } + return u +} + +// String returns the string representation of the path with query string +func (o *GetJobsIDStatusSubscribeURL) String() string { + return o.Must(o.Build()).String() +} + +// BuildFull builds a full url with scheme, host, path and query string +func (o *GetJobsIDStatusSubscribeURL) BuildFull(scheme, host string) (*url.URL, error) { + if scheme == "" { + return nil, errors.New("scheme is required for a full url on GetJobsIDStatusSubscribeURL") + } + if host == "" { + return nil, errors.New("host is required for a full url on GetJobsIDStatusSubscribeURL") + } + + base, err := o.Build() + if err != nil { + return nil, err + } + + base.Scheme = scheme + base.Host = host + return base, nil +} + +// StringFull returns the string representation of a complete url +func (o *GetJobsIDStatusSubscribeURL) StringFull(scheme, host string) string { + return o.Must(o.BuildFull(scheme, host)).String() +} diff --git a/generated/southbound/restapi/operations/workflow_executor_api.go b/generated/southbound/restapi/operations/workflow_executor_api.go index ddc0bc3a..ceabeae6 100644 --- a/generated/southbound/restapi/operations/workflow_executor_api.go +++ b/generated/southbound/restapi/operations/workflow_executor_api.go @@ -12,6 +12,7 @@ package operations import ( "fmt" + "io" "net/http" "strings" @@ -48,6 +49,9 @@ func NewWorkflowExecutorAPI(spec *loads.Document) *WorkflowExecutorAPI { JSONConsumer: runtime.JSONConsumer(), JSONProducer: runtime.JSONProducer(), + TextEventStreamProducer: runtime.ProducerFunc(func(w io.Writer, data interface{}) error { + return errors.NotImplemented("textEventStream producer has not yet been implemented") + }), SouthboundGetJobsHandler: southbound.GetJobsHandlerFunc(func(params southbound.GetJobsParams) middleware.Responder { return middleware.NotImplemented("operation southbound.GetJobs has not yet been implemented") @@ -61,6 +65,9 @@ func NewWorkflowExecutorAPI(spec *loads.Document) *WorkflowExecutorAPI { SouthboundGetJobsIDStatusHandler: southbound.GetJobsIDStatusHandlerFunc(func(params southbound.GetJobsIDStatusParams) middleware.Responder { return middleware.NotImplemented("operation southbound.GetJobsIDStatus has not yet been implemented") }), + SouthboundGetJobsIDStatusSubscribeHandler: southbound.GetJobsIDStatusSubscribeHandlerFunc(func(params southbound.GetJobsIDStatusSubscribeParams) middleware.Responder { + return middleware.NotImplemented("operation southbound.GetJobsIDStatusSubscribe has not yet been implemented") + }), SouthboundGetJobsIDTagsHandler: southbound.GetJobsIDTagsHandlerFunc(func(params southbound.GetJobsIDTagsParams) middleware.Responder { return middleware.NotImplemented("operation southbound.GetJobsIDTags has not yet been implemented") }), @@ -111,6 +118,9 @@ type WorkflowExecutorAPI struct { // JSONProducer registers a producer for the following mime types: // - application/json JSONProducer runtime.Producer + // TextEventStreamProducer registers a producer for the following mime types: + // - text/event-stream + TextEventStreamProducer runtime.Producer // SouthboundGetJobsHandler sets the operation handler for the get jobs operation SouthboundGetJobsHandler southbound.GetJobsHandler @@ -120,6 +130,8 @@ type WorkflowExecutorAPI struct { SouthboundGetJobsIDDefinitionHandler southbound.GetJobsIDDefinitionHandler // SouthboundGetJobsIDStatusHandler sets the operation handler for the get jobs ID status operation SouthboundGetJobsIDStatusHandler southbound.GetJobsIDStatusHandler + // SouthboundGetJobsIDStatusSubscribeHandler sets the operation handler for the get jobs ID status subscribe operation + SouthboundGetJobsIDStatusSubscribeHandler southbound.GetJobsIDStatusSubscribeHandler // SouthboundGetJobsIDTagsHandler sets the operation handler for the get jobs ID tags operation SouthboundGetJobsIDTagsHandler southbound.GetJobsIDTagsHandler // SouthboundGetWorkflowsHandler sets the operation handler for the get workflows operation @@ -206,6 +218,9 @@ func (o *WorkflowExecutorAPI) Validate() error { if o.JSONProducer == nil { unregistered = append(unregistered, "JSONProducer") } + if o.TextEventStreamProducer == nil { + unregistered = append(unregistered, "TextEventStreamProducer") + } if o.SouthboundGetJobsHandler == nil { unregistered = append(unregistered, "southbound.GetJobsHandler") @@ -219,6 +234,9 @@ func (o *WorkflowExecutorAPI) Validate() error { if o.SouthboundGetJobsIDStatusHandler == nil { unregistered = append(unregistered, "southbound.GetJobsIDStatusHandler") } + if o.SouthboundGetJobsIDStatusSubscribeHandler == nil { + unregistered = append(unregistered, "southbound.GetJobsIDStatusSubscribeHandler") + } if o.SouthboundGetJobsIDTagsHandler == nil { unregistered = append(unregistered, "southbound.GetJobsIDTagsHandler") } @@ -282,6 +300,8 @@ func (o *WorkflowExecutorAPI) ProducersFor(mediaTypes []string) map[string]runti switch mt { case "application/json": result["application/json"] = o.JSONProducer + case "text/event-stream": + result["text/event-stream"] = o.TextEventStreamProducer } if p, ok := o.customProducers[mt]; ok { @@ -341,6 +361,10 @@ func (o *WorkflowExecutorAPI) initHandlerCache() { if o.handlers["GET"] == nil { o.handlers["GET"] = make(map[string]http.Handler) } + o.handlers["GET"]["/jobs/{id}/status/subscribe"] = southbound.NewGetJobsIDStatusSubscribe(o.context, o.SouthboundGetJobsIDStatusSubscribeHandler) + if o.handlers["GET"] == nil { + o.handlers["GET"] = make(map[string]http.Handler) + } o.handlers["GET"]["/jobs/{id}/tags"] = southbound.NewGetJobsIDTags(o.context, o.SouthboundGetJobsIDTagsHandler) if o.handlers["GET"] == nil { o.handlers["GET"] = make(map[string]http.Handler) diff --git a/go.mod b/go.mod index de92bbe9..06c5e4e5 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.17 github.com/muesli/mango-cobra v1.2.0 github.com/muesli/roff v0.1.0 + github.com/olebedev/emitter v0.0.0-20230411050614-349169dec2ba github.com/rs/cors v1.9.0 github.com/rs/zerolog v1.30.0 github.com/spf13/cobra v1.7.0 @@ -37,6 +38,7 @@ require ( github.com/steinfletcher/apitest v1.5.15 github.com/steinfletcher/apitest-jsonpath v1.7.2 github.com/stretchr/testify v1.8.4 + github.com/tmaxmax/go-sse v0.6.0 github.com/tsenart/vegeta/v12 v12.11.0 github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 go.uber.org/goleak v1.2.1 @@ -88,6 +90,7 @@ require ( ) require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-openapi/analysis v0.21.4 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -103,6 +106,7 @@ require ( github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect go.mongodb.org/mongo-driver v1.11.6 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect + golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.10.0 // indirect diff --git a/go.sum b/go.sum index 659696ae..c8173531 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef/go.mod h1:W github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so= github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e h1:mWOqoK5jV13ChKf/aF3plwQ96laasTJgZi4f1aSOu+M= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cnf/structhash v0.0.0-20201127153200-e1b16c1ebc08 h1:ox2F0PSMlrAAiAdknSRMDrAr8mfxPCfSZolH+/qQnyQ= github.com/cnf/structhash v0.0.0-20201127153200-e1b16c1ebc08/go.mod h1:pCxVEbcm3AMg7ejXyorUXi6HQCzOIBf7zEDVPtw0/U4= github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= @@ -227,6 +229,8 @@ github.com/muesli/roff v0.1.0/go.mod h1:pjAHQM9hdUUwm/krAfrLGgJkXJ+YuhtsfZ42kieB github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/olebedev/emitter v0.0.0-20230411050614-349169dec2ba h1:/Q5vvLs180BFH7u+Nakdrr1B9O9RAxVaIurFQy0c8QQ= +github.com/olebedev/emitter v0.0.0-20230411050614-349169dec2ba/go.mod h1:eT2/Pcsim3XBjbvldGiJBvvgiqZkAFyiOJJsDKXs/ts= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= @@ -287,6 +291,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tmaxmax/go-sse v0.6.0 h1:FHt1n2ljccxnn+hWcflzbQqTyGgYIkyO0p/ap4w6IG0= +github.com/tmaxmax/go-sse v0.6.0/go.mod h1:WQsByT1/dnQCLgQw3639eARXhNhg+4EFdm2fLnj8e0c= github.com/tsenart/go-tsz v0.0.0-20180814235614-0bd30b3df1c3 h1:pcQGQzTwCg//7FgVywqge1sW9Yf8VMsMdG58MI5kd8s= github.com/tsenart/go-tsz v0.0.0-20180814235614-0bd30b3df1c3/go.mod h1:SWZznP1z5Ki7hDT2ioqiFKEse8K9tU2OUvaRI0NeGQo= github.com/tsenart/vegeta/v12 v12.11.0 h1:f6V4xW6KMc5BLAWPzEu9l6Vclif44AW4IxPC7TSZwDM= @@ -328,6 +334,7 @@ golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= diff --git a/internal/handler/job/create.go b/internal/handler/job/create.go index 55b9b298..63f60ecc 100644 --- a/internal/handler/job/create.go +++ b/internal/handler/job/create.go @@ -26,7 +26,6 @@ import ( func CreateJob(ctx context.Context, storage persistence.Storage, request *model.JobRequest) (*model.Job, error) { log := logging.LoggerFromCtx(ctx) contextLogger := log.With().Str("clientId", request.ClientID).Str("name", request.Workflow).Logger() - contextLogger.Debug().Msg("Creating new job") wf, err := storage.GetWorkflow(ctx, request.Workflow) if err != nil { diff --git a/internal/handler/job/get.go b/internal/handler/job/get.go index 666f3c44..3620f616 100644 --- a/internal/handler/job/get.go +++ b/internal/handler/job/get.go @@ -18,16 +18,11 @@ import ( ) func GetJob(ctx context.Context, storage persistence.Storage, id string, history bool) (*model.Job, error) { - log := logging.LoggerFromCtx(ctx) - contextLogger := log.With(). - Str("id", id). - Bool("history", history). - Logger() - contextLogger.Debug().Msg("Fetching job") - fetchParams := persistence.FetchParams{History: history} job, err := storage.GetJob(ctx, id, fetchParams) if err != nil { + log := logging.LoggerFromCtx(ctx) + log.Error().Str("id", id).Bool("history", history).Err(err).Msg("Failed to fetch job") return nil, fault.Wrap(err) } return job, nil diff --git a/internal/handler/job/status/subscribe.go b/internal/handler/job/status/subscribe.go new file mode 100644 index 00000000..68721cec --- /dev/null +++ b/internal/handler/job/status/subscribe.go @@ -0,0 +1,70 @@ +package status + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "context" + "errors" + + "github.com/Southclaws/fault" + "github.com/Southclaws/fault/ftag" + "github.com/olebedev/emitter" + "github.com/rs/zerolog/log" + "github.com/siemens/wfx/internal/handler/job" + "github.com/siemens/wfx/internal/workflow" + "github.com/siemens/wfx/middleware/logging" + "github.com/siemens/wfx/persistence" +) + +// AddSubscriber sets up a new subscription for the specified jobID, allowing the retrieval of job status updates. +// It returns a channel that will receive events related to the specified jobID. +func AddSubscriber(ctx context.Context, storage persistence.Storage, jobID string) (<-chan emitter.Event, error) { + contextLogger := logging.LoggerFromCtx(ctx).With().Str("jobID", jobID).Logger() + contextLogger.Debug().Msg("Adding subscriber for job updates") + + // job must exist and be in a non-terminal state + existingJob, err := job.GetJob(ctx, storage, jobID, false) + if err != nil { + return nil, fault.Wrap(err) + } + if workflow.IsTerminal(existingJob.Workflow, existingJob.Status.State) { + contextLogger.Error().Str("jobID", jobID).Msg("Attempted to subscribe to a job which is in a terminal state") + return nil, fault.Wrap( + errors.New("attempted to subscribe to a job which is in a terminal state"), + ftag.With(ftag.InvalidArgument)) + } + // job is in correct state, subscription is allowed + + // set up subscription and only *then* send the initial status + ch := e.On(jobID) + contextLogger.Debug().Msg("Sending current job status") + e.Emit(jobID, *existingJob.Status) + + contextLogger.Info().Msg("Added subscriber for job updates") + return ch, nil +} + +// ShutdownSubscribers disconnects all subscribers. +func ShutdownSubscribers() { + // note: e.Off("*") does not match topics containing the character '/' + for _, topic := range e.Topics() { + log.Debug().Str("topic", topic).Msg("Closing subscribers") + e.Off(topic) + } + log.Info().Msg("Unsubscribed all subscribers") +} + +// SubscriberCount counts the total number of subscribers across all topics. +func SubscriberCount() int { + count := 0 + for _, topic := range e.Topics() { + count += len(e.Listeners(topic)) + } + return count +} diff --git a/internal/handler/job/status/subscribe_test.go b/internal/handler/job/status/subscribe_test.go new file mode 100644 index 00000000..e972743a --- /dev/null +++ b/internal/handler/job/status/subscribe_test.go @@ -0,0 +1,103 @@ +package status + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "context" + "testing" + + "github.com/Southclaws/fault/ftag" + "github.com/siemens/wfx/generated/model" + "github.com/siemens/wfx/internal/workflow" + "github.com/siemens/wfx/workflow/dau" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAddSubscriberAndShutdown(t *testing.T) { + db := newInMemoryDB(t) + + wf, err := db.CreateWorkflow(context.Background(), dau.PhasedWorkflow()) + require.NoError(t, err) + + job, err := db.CreateJob(context.Background(), &model.Job{ + ClientID: "klaus", + Workflow: wf, + Status: &model.JobStatus{State: "CREATED"}, + }) + require.NoError(t, err) + require.Equal(t, "CREATED", job.Status.State) + require.False(t, workflow.IsTerminal(wf, job.Status.State)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err = AddSubscriber(ctx, db, job.ID) + require.NoError(t, err) + assert.Equal(t, 1, SubscriberCount()) + + // test shutdown + ShutdownSubscribers() + assert.Equal(t, 0, SubscriberCount()) +} + +func TestAddSubscriber_NotFound(t *testing.T) { + db := newInMemoryDB(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err := AddSubscriber(ctx, db, "42") + require.NotNil(t, err) + assert.Equal(t, ftag.NotFound, ftag.Get(err)) +} + +func TestAddSubscriber_TerminalState(t *testing.T) { + db := newInMemoryDB(t) + wf, err := db.CreateWorkflow(context.Background(), dau.DirectWorkflow()) + require.NoError(t, err) + + job, err := db.CreateJob(context.Background(), &model.Job{ + ClientID: "klaus", + Workflow: wf, + Status: &model.JobStatus{State: "ACTIVATED"}, + }) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err = AddSubscriber(ctx, db, job.ID) + assert.ErrorContains(t, err, "attempted to subscribe to a job which is in a terminal state") + assert.Equal(t, ftag.InvalidArgument, ftag.Get(err)) +} + +func TestCountSubscribers(t *testing.T) { + db := newInMemoryDB(t) + + wf, err := db.CreateWorkflow(context.Background(), dau.PhasedWorkflow()) + require.NoError(t, err) + + job, err := db.CreateJob(context.Background(), &model.Job{ + ClientID: "klaus", + Workflow: wf, + Status: &model.JobStatus{State: "CREATED"}, + }) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err = AddSubscriber(ctx, db, job.ID) + require.NoError(t, err) + assert.Equal(t, 1, SubscriberCount()) + _, err = AddSubscriber(ctx, db, job.ID) + require.NoError(t, err) + assert.Equal(t, 2, SubscriberCount()) +} diff --git a/internal/handler/job/status/update.go b/internal/handler/job/status/update.go index f4a24258..6ec3f7da 100644 --- a/internal/handler/job/status/update.go +++ b/internal/handler/job/status/update.go @@ -14,12 +14,15 @@ import ( "github.com/Southclaws/fault" "github.com/Southclaws/fault/ftag" + "github.com/olebedev/emitter" "github.com/siemens/wfx/generated/model" "github.com/siemens/wfx/internal/workflow" "github.com/siemens/wfx/middleware/logging" "github.com/siemens/wfx/persistence" ) +var e *emitter.Emitter = emitter.New(32) + func Update(ctx context.Context, storage persistence.Storage, jobID string, newStatus *model.JobStatus, actor model.EligibleEnum) (*model.JobStatus, error) { log := logging.LoggerFromCtx(ctx) contextLogger := log.With(). @@ -84,11 +87,25 @@ func Update(ctx context.Context, storage persistence.Storage, jobID string, newS log.Error().Err(err).Msg("Failed to persist job update") return nil, fault.Wrap(err) } + // NOTE: at this point, the job status has been updated successfully; + // there's always the possibility that we are unable to send the response + // to the clients or update subscribers if from != to { contextLogger.Info().Msg("Updated job status") } else { contextLogger.Debug().Msg("Updated job status") } + + // async notify subscribers about the update event + contextLogger.Debug().Msg("Notifying subscribers") + go func() { + <-e.Emit(jobID, *result.Status) + if workflow.IsTerminal(job.Workflow, result.Status.State) { + contextLogger.Debug().Str("state", result.Status.State). + Msg("Job has reached a terminal state. Disconnecting subscribers.") + e.Off(jobID) + } + }() return result.Status, nil } diff --git a/internal/handler/job/status/update_test.go b/internal/handler/job/status/update_test.go index c7bf2176..7eccb436 100644 --- a/internal/handler/job/status/update_test.go +++ b/internal/handler/job/status/update_test.go @@ -11,8 +11,10 @@ package status import ( "context" "fmt" + "sync" "testing" + "github.com/olebedev/emitter" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -143,6 +145,61 @@ func TestUpdateJobStatusNotAllowed(t *testing.T) { assert.Nil(t, status) } +func TestUpdateJob_NotifySubscribers(t *testing.T) { + db := newInMemoryDB(t) + wf := createDirectWorkflow(t, db) + job, err2 := db.CreateJob(context.Background(), &model.Job{ + ClientID: "abc", + Workflow: wf, + Status: &model.JobStatus{ClientID: "abc", State: "ACTIVATING"}, + }) + require.NoError(t, err2) + assert.Equal(t, "ACTIVATING", job.Status.State) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, err := AddSubscriber(ctx, db, job.ID) + require.NoError(t, err) + + var g sync.WaitGroup + g.Add(1) + allEvents := make([]emitter.Event, 0) + // start event collector + go func() { + defer g.Done() + for event := range ch { + allEvents = append(allEvents, event) + } + }() + + _, err = Update(context.Background(), db, job.ID, &model.JobStatus{ + ClientID: "klaus", + State: "ACTIVATED", + Progress: 100, + }, model.EligibleEnumCLIENT) + require.NoError(t, err) + + // we've reached a terminal state, so our channel should be closed now + g.Wait() + + { + // first event is current job status + event := allEvents[0] + status := event.Args[0].(model.JobStatus) + assert.Equal(t, job.Status.State, status.State) + } + + { + // second event is our update + event := allEvents[1] + status := event.Args[0].(model.JobStatus) + assert.Equal(t, "ACTIVATED", status.State) + } + + assert.Equal(t, 2, len(allEvents)) +} + func createDirectWorkflow(t *testing.T, db persistence.Storage) *model.Workflow { wf, err := db.CreateWorkflow(context.Background(), dau.DirectWorkflow()) require.NoError(t, err) diff --git a/internal/producer/text_event_stream.go b/internal/producer/text_event_stream.go new file mode 100644 index 00000000..6323fac7 --- /dev/null +++ b/internal/producer/text_event_stream.go @@ -0,0 +1,37 @@ +package producer + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "encoding/json" + "io" + + "github.com/Southclaws/fault" + "github.com/go-openapi/runtime" +) + +func TextEventStreamProducer() runtime.Producer { + return runtime.ProducerFunc(func(rw io.Writer, data any) error { + if _, err := rw.Write([]byte("data: ")); err != nil { + return fault.Wrap(err) + } + b, err := json.Marshal(data) + if err != nil { + return fault.Wrap(err) + } + if _, err := rw.Write(b); err != nil { + return fault.Wrap(err) + } + // text/event-stream responses are "chunked" with double newline breaks + if _, err := rw.Write([]byte("\n\n")); err != nil { + return fault.Wrap(err) + } + return nil + }) +} diff --git a/internal/producer/text_event_stream_test.go b/internal/producer/text_event_stream_test.go new file mode 100644 index 00000000..f7680b8e --- /dev/null +++ b/internal/producer/text_event_stream_test.go @@ -0,0 +1,34 @@ +package producer + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "bytes" + "testing" + + "github.com/siemens/wfx/generated/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTextEventStreamProducer(t *testing.T) { + prod := TextEventStreamProducer() + event := model.JobStatus{ + ClientID: "klaus", + Message: "hello world", + State: "INSTALLING", + } + + buf := new(bytes.Buffer) + err := prod.Produce(buf, event) + require.NoError(t, err) + assert.Equal(t, `data: {"clientId":"klaus","message":"hello world","state":"INSTALLING"} + +`, buf.String()) +} diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go index c6ab958b..ebe33278 100644 --- a/internal/workflow/workflow.go +++ b/internal/workflow/workflow.go @@ -10,6 +10,7 @@ package workflow import "github.com/siemens/wfx/generated/model" +// FindStateGroup tries to find the group of a state. If not found, it returns the empty string. func FindStateGroup(workflow *model.Workflow, state string) string { for _, group := range workflow.Groups { for _, s := range group.States { @@ -42,3 +43,13 @@ func FollowImmediateTransitions(workflow *model.Workflow, from string) string { current = to } } + +// IsTerminal checks if the given state is terminal, i.e. there are no outgoing transitions from it. +func IsTerminal(workflow *model.Workflow, state string) bool { + for _, transition := range workflow.Transitions { + if transition.From == state { + return false + } + } + return true +} diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go index fb6c645c..902737be 100644 --- a/internal/workflow/workflow_test.go +++ b/internal/workflow/workflow_test.go @@ -18,8 +18,14 @@ import ( func TestFindStateGroup(t *testing.T) { workflow := dau.PhasedWorkflow() - group := FindStateGroup(workflow, "DOWNLOAD") - assert.Equal(t, "OPEN", group) + { + group := FindStateGroup(workflow, "DOWNLOAD") + assert.Equal(t, "OPEN", group) + } + { + group := FindStateGroup(workflow, "FOO") + assert.Equal(t, "", group) + } } func TestFollowTransitions(t *testing.T) { @@ -39,3 +45,11 @@ func TestFollowTransitions(t *testing.T) { actual := FollowImmediateTransitions(&model.Workflow{Transitions: transitions}, "a") assert.Equal(t, d, actual, "should warp from a to d") } + +func TestIsTerminal(t *testing.T) { + wf := dau.DirectWorkflow() + assert.True(t, IsTerminal(wf, "ACTIVATED")) + assert.True(t, IsTerminal(wf, "TERMINATED")) + assert.False(t, IsTerminal(wf, "INSTALL")) + assert.False(t, IsTerminal(wf, "INSTALLING")) +} diff --git a/middleware/logging/writer.go b/middleware/logging/writer.go index b0ae1c82..8e95dbc6 100644 --- a/middleware/logging/writer.go +++ b/middleware/logging/writer.go @@ -43,3 +43,9 @@ func (w *responseWriter) Write(b []byte) (int, error) { n, err := w.bodyWriter.Write(b) return n, fault.Wrap(err) } + +// Flush implements the http.Flusher interface. This is needed to implement server-sent events. +func (w *responseWriter) Flush() { + flusher := w.httpWriter.(http.Flusher) + flusher.Flush() +} diff --git a/middleware/logging/writer_test.go b/middleware/logging/writer_test.go index f08f6fb1..92cb9b0e 100644 --- a/middleware/logging/writer_test.go +++ b/middleware/logging/writer_test.go @@ -38,3 +38,11 @@ func TestWriter(t *testing.T) { assert.Equal(t, "hello world", string(body)) } + +func TestWriterImplementsFlusher(t *testing.T) { + recorder := httptest.NewRecorder() + var w http.ResponseWriter = newMyResponseWriter(recorder) + flusher, ok := w.(http.Flusher) + assert.True(t, ok) + flusher.Flush() +} diff --git a/middleware/responder/sse/main_test.go b/middleware/responder/sse/main_test.go new file mode 100644 index 00000000..2fabaa3f --- /dev/null +++ b/middleware/responder/sse/main_test.go @@ -0,0 +1,19 @@ +package sse + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/middleware/responder/sse/responder.go b/middleware/responder/sse/responder.go new file mode 100644 index 00000000..199fa77b --- /dev/null +++ b/middleware/responder/sse/responder.go @@ -0,0 +1,67 @@ +package sse + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/go-openapi/runtime" + "github.com/go-openapi/runtime/middleware" + "github.com/olebedev/emitter" + "github.com/siemens/wfx/middleware/logging" +) + +// Responder sends a stream of events to the client. +func Responder(ctx context.Context, events <-chan emitter.Event) middleware.ResponderFunc { + return func(w http.ResponseWriter, p runtime.Producer) { + log := logging.LoggerFromCtx(ctx) + + flusher, _ := w.(http.Flusher) + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + flusher.Flush() + + running := true + for running { + log.Debug().Msg("Waiting for next event") + select { + case ev, ok := <-events: + if !ok { + running = false + } else { + b, err := json.Marshal(ev.Args[0]) + if err != nil { + log.Error().Err(err).Msg("Failed to marshal status event") + continue + } + log.Debug().RawJSON("event", b).Msg("Received status event. Notifying client.") + + _, _ = w.Write([]byte("data: ")) + _, _ = w.Write(b) + // text/event-stream responses are "chunked" with double newline breaks + _, _ = w.Write([]byte("\n\n")) + + flusher.Flush() + log.Debug().Msg("Waiting for next status event") + } + case <-ctx.Done(): + // this typically happens when the client closes the connection + log.Debug().Msg("Context is done") + running = false + } + } + log.Debug().Msg("SSE Responder finished") + } +} diff --git a/middleware/responder/sse/responder_test.go b/middleware/responder/sse/responder_test.go new file mode 100644 index 00000000..e2406a8c --- /dev/null +++ b/middleware/responder/sse/responder_test.go @@ -0,0 +1,54 @@ +package sse + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "context" + "net/http/httptest" + "sync" + "testing" + + "github.com/olebedev/emitter" + "github.com/siemens/wfx/generated/model" + "github.com/siemens/wfx/internal/producer" + "github.com/stretchr/testify/assert" +) + +func TestSSEResponder(t *testing.T) { + events := make(chan emitter.Event) + + rw := httptest.NewRecorder() + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + responder := Responder(context.Background(), events) + responder.WriteResponse(rw, producer.JSONProducer()) + }() + + jobStatus := model.JobStatus{ + ClientID: "klaus", + Message: "hello world", + State: "INSTALLING", + } + events <- emitter.Event{ + Topic: "", + OriginalTopic: "", + Flags: 0, + Args: []any{jobStatus}, + } + close(events) + + wg.Wait() + + assert.Equal(t, `data: {"clientId":"klaus","message":"hello world","state":"INSTALLING"} + +`, rw.Body.String()) +} diff --git a/middleware/responder/util/content_type.go b/middleware/responder/util/content_type.go new file mode 100644 index 00000000..c0d7743d --- /dev/null +++ b/middleware/responder/util/content_type.go @@ -0,0 +1,29 @@ +package util + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "net/http" + + "github.com/go-openapi/runtime" + "github.com/go-openapi/runtime/middleware" + "github.com/rs/zerolog/log" + "github.com/siemens/wfx/internal/producer" +) + +// ForceJSONResponse generates a JSON response using the provided payload. +func ForceJSONResponse(statusCode int, payload any) middleware.ResponderFunc { + return func(rw http.ResponseWriter, _ runtime.Producer) { + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(statusCode) + if err := producer.JSONProducer().Produce(rw, payload); err != nil { + log.Error().Err(err).Msg("Failed to generate JSON response") + } + } +} diff --git a/middleware/responder/util/content_type_test.go b/middleware/responder/util/content_type_test.go new file mode 100644 index 00000000..948ec06d --- /dev/null +++ b/middleware/responder/util/content_type_test.go @@ -0,0 +1,33 @@ +package util + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/siemens/wfx/internal/producer" + "github.com/stretchr/testify/assert" +) + +func TestForceContentType(t *testing.T) { + resp := map[string]string{"hello": "world"} + f := ForceJSONResponse(http.StatusNotFound, resp) + rec := httptest.NewRecorder() + f.WriteResponse(rec, producer.JSONProducer()) + + result := rec.Result() + assert.Equal(t, "application/json", result.Header.Get("Content-Type")) + assert.Equal(t, http.StatusNotFound, result.StatusCode) + b, _ := io.ReadAll(result.Body) + body := string(b) + assert.JSONEq(t, `{"hello":"world"}`, body) +} diff --git a/middleware/responder/util/main_test.go b/middleware/responder/util/main_test.go new file mode 100644 index 00000000..92b2c098 --- /dev/null +++ b/middleware/responder/util/main_test.go @@ -0,0 +1,19 @@ +package util + +/* + * SPDX-FileCopyrightText: 2023 Siemens AG + * + * SPDX-License-Identifier: Apache-2.0 + * + * Author: Michael Adler + */ + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/spec/wfx.swagger.yml b/spec/wfx.swagger.yml index a3768945..90c7680d 100644 --- a/spec/wfx.swagger.yml +++ b/spec/wfx.swagger.yml @@ -422,6 +422,11 @@ x-paths-templates: logref: 11cc67762090e15b79a1387eca65ba65 message: Job ID was not found + jobTerminalStateError: &jobTerminalStateError + code: wfx.jobTerminalState + logref: 916f0a913a3e4a52a96bd271e029c201 + message: The request was invalid because the job is in a terminal state + workflowNotFoundError: &workflowNotFoundError code: wfx.workflowNotFound logref: c452719774086b6e803bb8f6ecea9899 @@ -792,6 +797,47 @@ paths: errors: - <<: *jobNotFoundError + /jobs/{id}/status/subscribe: + get: + summary: Subscribe to job status updates + description: > + Obtain instant notifications when there is a change in the job status. + This endpoint utilizes server-sent events (SSE), where responses are "chunked" with double newline breaks. + For example, a single event might look like this: + data: {"clientId":"example_client","state":"INSTALLING"}\n\n + + Note: The first event is always the current job status, i.e. equivalent to calling GET on /jobs/{id}/status. + tags: + - jobs + - northbound + - southbound + produces: + - application/json + - text/event-stream + parameters: + - $ref: "#/parameters/jobId" + responses: + "default": + description: Other error with any status code and response body format. + "200": + description: A stream of server-sent events + "400": + description: Bad Request + schema: + $ref: "#/definitions/ErrorResponse" + examples: + Error responses occurring at this operation for invalid requests: + errors: + - <<: *jobTerminalStateError + "404": + description: Not Found + schema: + $ref: "#/definitions/ErrorResponse" + examples: + Error responses occurring at this operation while updating a non-existent job: + errors: + - <<: *jobNotFoundError + /jobs/{id}/definition: get: summary: Get job definition diff --git a/test/04-operations.bats b/test/04-operations.bats index b14d0130..eecb5c70 100755 --- a/test/04-operations.bats +++ b/test/04-operations.bats @@ -81,3 +81,27 @@ teardown() { done return 1 } + +@test "Subscribe job status" { + wfx & + wait_wfx_running 2 + wfxctl workflow create --filter=.transitions ../share/demo/kanban/wfx.workflow.kanban.yml + ID=$(echo '{ "title": "Expose Job API" }' | + wfxctl job create --workflow wfx.workflow.kanban \ + --client-id Dana \ + --filter='.id' --raw - 2>/dev/null) + ( + sleep 1 + for state in PROGRESS VALIDATE DONE; do + wfxctl job update-status \ + --actor=client \ + --id "$ID" \ + --state "$state" 1>/dev/null 2>&1 + done + ) & + run sh -c "wfxctl job subscribe-status --id $ID | jq -r .state" + assert_output "NEW +PROGRESS +VALIDATE +DONE" +}