Skip to content

Commit

Permalink
feat: instant job update notifications using server-sent events
Browse files Browse the repository at this point in the history
This commit introduces a new feature that enables clients to receive
instant notifications for job status updates.

Details:

- A new endpoint `/jobs/{id}/status/subscribe` has been added, which
  implements instant notifications through Server-Sent Events (SSE).
- By subscribing to this endpoint, clients can efficiently keep track of
  job status changes without the need to repeatedly poll the server.
- A client reference implementation has been added to wfxctl.

Signed-off-by: Michael Adler <[email protected]>
  • Loading branch information
michaeladler committed Jun 30, 2023
1 parent fed52c9 commit 18c9db7
Show file tree
Hide file tree
Showing 47 changed files with 2,796 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Instant job update notifications using server-sent events (see #11)

### Fixed

- Send HTTP status code 404 when attempting to access the file server while it is disabled
Expand Down
6 changes: 6 additions & 0 deletions api/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ var WorkflowInvalid = model.Error{
Logref: "18f57adc70dd79c7fb4f1246be8a6e04",
Message: "Workflow validation failed",
}

var JobTerminalState = model.Error{
Code: "wfx.jobTerminalState",
Logref: "916f0a913a3e4a52a96bd271e029c201",
Message: "The request was invalid because the job is in a terminal state",
}
113 changes: 113 additions & 0 deletions api/job_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"context"
"fmt"
"net/http"
"sync"
"testing"
"time"

"github.com/siemens/wfx/generated/model"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/workflow/dau"
"github.com/steinfletcher/apitest"
Expand Down Expand Up @@ -118,3 +121,113 @@ func TestJobStatusUpdate(t *testing.T) {
Assert(jsonpath.Contains(`$.state`, "DOWNLOADING")).
End()
}

func TestJobStatusSubscribe(t *testing.T) {
db := newInMemoryDB(t)
north, south := createNorthAndSouth(t, db)

handlers := []http.Handler{north, south}
for i, name := range allAPIs {

job := persistJob(t, db)
jobPath := fmt.Sprintf("/api/wfx/v1/jobs/%s/status/subscribe", job.ID)

handler := handlers[i]
t.Run(name, func(t *testing.T) {
var wgSubscription sync.WaitGroup
wgSubscription.Add(1)
var wgAssertionDone sync.WaitGroup
wgAssertionDone.Add(1)
go func() {
defer wgAssertionDone.Done()
apitest.New().
Handler(handler).
Get(jobPath).
Expect(t).
Status(http.StatusOK).
Body(`data: {"clientId":"foo","definitionHash":"4f53cda18c2baa0c0354bb5f9a3ecbe5ed12ab4d8e11ba873c2f11161202b945","state":"INSTALL"}
data: {"definitionHash":"4f53cda18c2baa0c0354bb5f9a3ecbe5ed12ab4d8e11ba873c2f11161202b945","state":"INSTALLING"}
`).
End()
}()

go func() {
defer wgSubscription.Done()
// wait for subscription created by the above GET request
for i := 0; i < 100; i++ {
count := status.SubscriberCount()
if count > 0 {
break
}
time.Sleep(time.Millisecond * 10)
}
}()

wgSubscription.Wait()

// add ourselves as subscriber as well
ch, err := status.AddSubscriber(context.Background(), db, job.ID)
require.NoError(t, err)
_, err = status.Update(context.Background(), db, job.ID, &model.JobStatus{State: "INSTALLING"}, model.EligibleEnumCLIENT)
require.NoError(t, err)
<-ch
<-ch
// now it's safe to shutdown
status.ShutdownSubscribers()

wgAssertionDone.Wait()
})
}
}

func TestJobStatusSubscribe_NotFound(t *testing.T) {
db := newInMemoryDB(t)
north, south := createNorthAndSouth(t, db)

handlers := []http.Handler{north, south}
for i, name := range allAPIs {
handler := handlers[i]
t.Run(name, func(t *testing.T) {
apitest.New().
Handler(handler).
Get("/api/wfx/v1/jobs/42/status/subscribe").
Expect(t).
Status(http.StatusNotFound).
End()
})
}
}

func TestJobStatusSubscribe_TerminalState(t *testing.T) {
db := newInMemoryDB(t)

wf := dau.DirectWorkflow()
_, _ = workflow.CreateWorkflow(context.Background(), db, wf)

tmpJob := model.Job{
ClientID: "foo",
Workflow: wf,
Status: &model.JobStatus{State: "ACTIVATED"},
}

job, err := db.CreateJob(context.Background(), &tmpJob)
require.NoError(t, err)

jobPath := fmt.Sprintf("/api/wfx/v1/jobs/%s/status/subscribe", job.ID)

north, south := createNorthAndSouth(t, db)
handlers := []http.Handler{north, south}
for i, name := range allAPIs {
handler := handlers[i]
t.Run(name, func(t *testing.T) {
apitest.New().
Handler(handler).
Get(jobPath).
Expect(t).
Status(http.StatusBadRequest).
End()
})
}
}
13 changes: 13 additions & 0 deletions api/northbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/internal/producer"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/persistence"
)
Expand Down Expand Up @@ -263,5 +264,17 @@ func NewNorthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return northbound.NewDeleteJobsIDTagsOK().WithPayload(tags)
})

serverAPI.NorthboundGetJobsIDStatusSubscribeHandler = northbound.GetJobsIDStatusSubscribeHandlerFunc(
func(params northbound.GetJobsIDStatusSubscribeParams) middleware.Responder {
eventChan, err := status.AddSubscriber(params.HTTPRequest.Context(), storage, params.ID)
if ftag.Get(err) == ftag.NotFound {
return northbound.NewGetJobsIDStatusSubscribeNotFound().WithPayload(&model.ErrorResponse{Errors: []*model.Error{&JobNotFound}})
}
if ftag.Get(err) == ftag.InvalidArgument {
return northbound.NewGetJobsIDStatusSubscribeBadRequest().WithPayload(&model.ErrorResponse{Errors: []*model.Error{&JobTerminalState}})
}
return producer.SSEResponder(eventChan)
})

return serverAPI, nil
}
13 changes: 13 additions & 0 deletions api/southbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/internal/producer"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/persistence"
)
Expand Down Expand Up @@ -176,5 +177,17 @@ func NewSouthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return southbound.NewGetJobsIDTagsOK().WithPayload(tags)
})

serverAPI.SouthboundGetJobsIDStatusSubscribeHandler = southbound.GetJobsIDStatusSubscribeHandlerFunc(
func(params southbound.GetJobsIDStatusSubscribeParams) middleware.Responder {
eventChan, err := status.AddSubscriber(params.HTTPRequest.Context(), storage, params.ID)
if ftag.Get(err) == ftag.NotFound {
return southbound.NewGetJobsIDStatusSubscribeNotFound().WithPayload(&model.ErrorResponse{Errors: []*model.Error{&JobNotFound}})
}
if ftag.Get(err) == ftag.InvalidArgument {
return southbound.NewGetJobsIDStatusSubscribeBadRequest().WithPayload(&model.ErrorResponse{Errors: []*model.Error{&JobTerminalState}})
}
return producer.SSEResponder(eventChan)
})

return serverAPI, nil
}
7 changes: 5 additions & 2 deletions api/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ func createNorthAndSouth(t *testing.T, db persistence.Storage) (http.Handler, ht
}

func persistJob(t *testing.T, db persistence.Storage) *model.Job {
wf, err := workflow.CreateWorkflow(context.Background(), db, dau.DirectWorkflow())
require.NoError(t, err)
wf := dau.DirectWorkflow()
if found, _ := workflow.GetWorkflow(context.Background(), db, wf.Name); found == nil {
_, err := workflow.CreateWorkflow(context.Background(), db, wf)
require.NoError(t, err)
}

jobReq := model.JobRequest{
ClientID: "foo",
Expand Down
6 changes: 5 additions & 1 deletion cmd/wfx/cmd/root/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/cmd/wfx/metadata"
"github.com/siemens/wfx/internal/config"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/persistence"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -254,7 +255,10 @@ Examples of tasks are installation of firmware or other types of commands issued
}
}

// Create a context with a timeout to allow outstanding requests to complete
// shut down (disconnect) subscribers otherwise we cannot stop the web server due to open connections
status.ShutdownSubscribers()

// create a context with a timeout to allow outstanding requests to complete
var timeout time.Duration
k.Read(func(k *koanf.Koanf) {
timeout = k.Duration(gracefulTimeoutFlag)
Expand Down
2 changes: 2 additions & 0 deletions cmd/wfxctl/cmd/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/getstatus"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/gettags"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/query"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/subscribestatus"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatedefinition"
"github.com/siemens/wfx/cmd/wfxctl/cmd/job/updatestatus"
"github.com/spf13/cobra"
Expand All @@ -43,4 +44,5 @@ func init() {
Command.AddCommand(addtags.Command)
Command.AddCommand(deltags.Command)
Command.AddCommand(gettags.Command)
Command.AddCommand(subscribestatus.Command)
}
92 changes: 92 additions & 0 deletions cmd/wfxctl/cmd/job/subscribestatus/subscribe_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package subscribestatus

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"os"
"time"

"github.com/Southclaws/fault"
"github.com/go-openapi/runtime"
"github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/tmaxmax/go-sse"

"github.com/siemens/wfx/cmd/wfxctl/errutil"
"github.com/siemens/wfx/cmd/wfxctl/flags"
generatedClient "github.com/siemens/wfx/generated/client"
"github.com/siemens/wfx/generated/client/jobs"
)

const (
idFlag = "id"
)

func init() {
f := Command.Flags()
f.String(idFlag, "", "job id")
}

type SSETransport struct {
baseCmd *flags.BaseCmd
}

// Submit implements the runtime.ClientTransport interface.
func (t SSETransport) Submit(op *runtime.ClientOperation) (interface{}, error) {
cfg := t.baseCmd.CreateTransportConfig()
rt := client.New(cfg.Host, generatedClient.DefaultBasePath, cfg.Schemes)
req := errutil.Must(rt.CreateHttpRequest(op))

httpClient := errutil.Must(t.baseCmd.CreateHTTPClient())
httpClient.Timeout = 0

client := sse.Client{
HTTPClient: httpClient,
DefaultReconnectionTime: time.Second * 5,
ResponseValidator: sse.DefaultValidator,
}

conn := client.NewConnection(req)
unsubscribe := conn.SubscribeMessages(func(event sse.Event) {
_, _ = os.Stdout.Write(event.Data)
os.Stdout.Write([]byte("\n"))
})
defer unsubscribe()

err := conn.Connect()
if err != nil {
return nil, fault.Wrap(err)
}

return jobs.NewGetJobsIDStatusSubscribeOK(), nil
}

var Command = &cobra.Command{
Use: "subscribe-status",
Short: "Subscribe to job update events",
Example: `
wfxctl job subscribe-status --id=1
`,
TraverseChildren: true,
Run: func(cmd *cobra.Command, args []string) {
params := jobs.NewGetJobsIDStatusSubscribeParams().
WithID(flags.Koanf.String(idFlag))
if params.ID == "" {
log.Fatal().Msg("Job ID missing")
}

baseCmd := flags.NewBaseCmd()
executor := generatedClient.New(SSETransport{baseCmd: &baseCmd}, strfmt.Default)
if _, err := executor.Jobs.GetJobsIDStatusSubscribe(params); err != nil {
log.Fatal().Err(err).Msg("Failed to subscribe to job status")
}
},
}
48 changes: 48 additions & 0 deletions cmd/wfxctl/cmd/job/subscribestatus/subscribe_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package subscribestatus

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"

"github.com/siemens/wfx/cmd/wfxctl/flags"
"github.com/stretchr/testify/assert"
)

func TestSubscribeJobStatus(t *testing.T) {
const expectedPath = "/api/wfx/v1/jobs/1/status/subscribe"
var actualPath string

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
actualPath = r.URL.Path

w.Header().Add("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`data: "hello world"
`))
}))
defer ts.Close()

u, _ := url.Parse(ts.URL)
_ = flags.Koanf.Set(flags.ClientHostFlag, u.Hostname())
port, _ := strconv.Atoi(u.Port())
_ = flags.Koanf.Set(flags.ClientPortFlag, port)

_ = flags.Koanf.Set(idFlag, "1")

err := Command.Execute()
assert.NoError(t, err)

assert.Equal(t, expectedPath, actualPath)
}
Loading

0 comments on commit 18c9db7

Please sign in to comment.