Skip to content

Commit

Permalink
feat: instant job event notifications
Browse files Browse the repository at this point in the history
This commit introduces a more efficient mechanism for notifications
related to job events, such as creation, updating, and deletion. Unlike
the previous architecture, where clients had to constantly poll the
server for updates, the new implementation uses server-sent events (SSE)
to allow clients to receive updates from the server, improving
efficiency and reducing network overhead.

Key Features:

- Introduced a new SSE-enabled endpoint, `/jobs/events`, which accepts
  filter parameters to customize the stream of events received.
- Clients can subscribe to `/jobs/events` to receive updates on job
  events that meet specified filter criteria.
- Provided a client reference implementation through `wfxctl`.

Signed-off-by: Michael Adler <[email protected]>
  • Loading branch information
michaeladler committed Sep 1, 2023
1 parent 6839c4d commit 5ce13b2
Show file tree
Hide file tree
Showing 63 changed files with 3,355 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Job event notifications via server-sent events (see #11)

### Fixed

- Send HTTP status code 404 when attempting to access the file server while it is disabled
Expand Down
116 changes: 116 additions & 0 deletions api/job_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package api

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"context"
"encoding/json"
"io"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/siemens/wfx/generated/model"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/workflow/dau"
"github.com/steinfletcher/apitest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestJobEventsSubscribe(t *testing.T) {
log.Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.Stamp})

db := newInMemoryDB(t)
wf := dau.DirectWorkflow()
_, err := workflow.CreateWorkflow(context.Background(), db, wf)
require.NoError(t, err)

north, south := createNorthAndSouth(t, db)

handlers := []http.Handler{north, south}
for i, name := range allAPIs {
handler := handlers[i]
t.Run(name, func(t *testing.T) {
var jobID string
{
job, err := job.CreateJob(context.Background(), db,
&model.JobRequest{ClientID: "foo", Workflow: wf.Name})
require.NoError(t, err)
jobID = job.ID
}
require.NotEmpty(t, jobID)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

ch, _ := events.AddSubscriber(context.Background(), events.FilterParams{IDs: []string{jobID}})
// wait for event created by our status.Update
<-ch
// now our GET request should have received the response as well,
// add some extra time to be safe
time.Sleep(100 * time.Millisecond)
events.ShutdownSubscribers()
}()

wg.Add(1)
go func() {
defer wg.Done()
// wait for subscriber which is created by our GET request below and our test goroutine
for events.SubscriberCount() != 2 {
time.Sleep(50 * time.Millisecond)
}
// update job
_, err = status.Update(context.Background(), db, jobID, &model.JobStatus{State: "INSTALLING"}, model.EligibleEnumCLIENT)
require.NoError(t, err)
}()

result := apitest.New().
Handler(handler).
Get("/api/wfx/v1/jobs/events").Query("ids", jobID).
Expect(t).
Status(http.StatusOK).
Header("Content-Type", "text/event-stream").
End()

data, _ := io.ReadAll(result.Response.Body)
body := string(data)
require.NotEmpty(t, body)

lines := strings.Split(body, "\n")
assert.Len(t, lines, 4)

// check body starts with data:
assert.True(t, strings.HasPrefix(lines[0], "data: "))

// check content is a job and state is INSTALLING
var ev events.JobEvent
err = json.Unmarshal([]byte(strings.TrimPrefix(lines[0], "data: ")), &ev)
require.NoError(t, err)
assert.Equal(t, "INSTALLING", ev.Job.Status.State)
assert.Equal(t, wf.Name, ev.Job.Workflow.Name)

assert.Equal(t, "id: 1", lines[1])

wg.Wait()
events.ShutdownSubscribers()
})
}
}
23 changes: 23 additions & 0 deletions api/northbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package api

import (
"net/http"
"strings"

"github.com/Southclaws/fault"
"github.com/Southclaws/fault/ftag"
Expand All @@ -21,10 +22,12 @@ import (
"github.com/siemens/wfx/generated/northbound/restapi/operations/northbound"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/definition"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/middleware/responder/sse"
"github.com/siemens/wfx/persistence"
)

Expand Down Expand Up @@ -263,5 +266,25 @@ func NewNorthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return northbound.NewDeleteJobsIDTagsOK().WithPayload(tags)
})

serverAPI.NorthboundGetJobsEventsHandler = northbound.GetJobsEventsHandlerFunc(
func(params northbound.GetJobsEventsParams) middleware.Responder {
ctx := params.HTTPRequest.Context()
var filter events.FilterParams
if params.Ids != nil {
filter.IDs = strings.Split(*params.Ids, ",")
}
if params.ClientIds != nil {
filter.ClientIDs = strings.Split(*params.ClientIds, ",")
}

Check warning on line 278 in api/northbound.go

View check run for this annotation

Codecov / codecov/patch

api/northbound.go#L277-L278

Added lines #L277 - L278 were not covered by tests
if params.Workflows != nil {
filter.Workflows = strings.Split(*params.Workflows, ",")
}

Check warning on line 281 in api/northbound.go

View check run for this annotation

Codecov / codecov/patch

api/northbound.go#L280-L281

Added lines #L280 - L281 were not covered by tests
eventChan, err := events.AddSubscriber(ctx, filter)
if err != nil {
return northbound.NewGetJobsEventsDefault(http.StatusInternalServerError)
}

Check warning on line 285 in api/northbound.go

View check run for this annotation

Codecov / codecov/patch

api/northbound.go#L284-L285

Added lines #L284 - L285 were not covered by tests
return sse.Responder(ctx, eventChan)
})

return serverAPI, nil
}
23 changes: 23 additions & 0 deletions api/southbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package api

import (
"net/http"
"strings"

"github.com/Southclaws/fault"
"github.com/Southclaws/fault/ftag"
Expand All @@ -21,10 +22,12 @@ import (
"github.com/siemens/wfx/generated/southbound/restapi/operations/southbound"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/definition"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/middleware/responder/sse"
"github.com/siemens/wfx/persistence"
)

Expand Down Expand Up @@ -176,5 +179,25 @@ func NewSouthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return southbound.NewGetJobsIDTagsOK().WithPayload(tags)
})

serverAPI.SouthboundGetJobsEventsHandler = southbound.GetJobsEventsHandlerFunc(
func(params southbound.GetJobsEventsParams) middleware.Responder {
ctx := params.HTTPRequest.Context()
var filter events.FilterParams
if params.Ids != nil {
filter.IDs = strings.Split(*params.Ids, ",")
}
if params.ClientIds != nil {
filter.ClientIDs = strings.Split(*params.ClientIds, ",")
}

Check warning on line 191 in api/southbound.go

View check run for this annotation

Codecov / codecov/patch

api/southbound.go#L190-L191

Added lines #L190 - L191 were not covered by tests
if params.Workflows != nil {
filter.Workflows = strings.Split(*params.Workflows, ",")
}

Check warning on line 194 in api/southbound.go

View check run for this annotation

Codecov / codecov/patch

api/southbound.go#L193-L194

Added lines #L193 - L194 were not covered by tests
eventChan, err := events.AddSubscriber(ctx, filter)
if err != nil {
return southbound.NewGetJobsEventsDefault(http.StatusInternalServerError)
}

Check warning on line 198 in api/southbound.go

View check run for this annotation

Codecov / codecov/patch

api/southbound.go#L197-L198

Added lines #L197 - L198 were not covered by tests
return sse.Responder(ctx, eventChan)
})

return serverAPI, nil
}
7 changes: 5 additions & 2 deletions api/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ func createNorthAndSouth(t *testing.T, db persistence.Storage) (http.Handler, ht
}

func persistJob(t *testing.T, db persistence.Storage) *model.Job {
wf, err := workflow.CreateWorkflow(context.Background(), db, dau.DirectWorkflow())
require.NoError(t, err)
wf := dau.DirectWorkflow()
if found, _ := workflow.GetWorkflow(context.Background(), db, wf.Name); found == nil {
_, err := workflow.CreateWorkflow(context.Background(), db, wf)
require.NoError(t, err)
}

jobReq := model.JobRequest{
ClientID: "foo",
Expand Down
6 changes: 5 additions & 1 deletion cmd/wfx/cmd/root/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/cmd/wfx/metadata"
"github.com/siemens/wfx/internal/config"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/persistence"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -255,7 +256,10 @@ Examples of tasks are installation of firmware or other types of commands issued
}
}

// Create a context with a timeout to allow outstanding requests to complete
// shut down (disconnect) subscribers otherwise we cannot stop the web server due to open connections
events.ShutdownSubscribers()

// create a context with a timeout to allow outstanding requests to complete
var timeout time.Duration
k.Read(func(k *koanf.Koanf) {
timeout = k.Duration(gracefulTimeoutFlag)
Expand Down
141 changes: 141 additions & 0 deletions cmd/wfxctl/cmd/job/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package events

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"time"

"github.com/Southclaws/fault"
"github.com/go-openapi/runtime"
"github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/tmaxmax/go-sse"

"github.com/siemens/wfx/cmd/wfxctl/errutil"
"github.com/siemens/wfx/cmd/wfxctl/flags"
generatedClient "github.com/siemens/wfx/generated/client"
"github.com/siemens/wfx/generated/client/jobs"
"github.com/siemens/wfx/generated/model"
)

const (
idsFlag = "ids"
clientIdsFlag = "clientIds"
workflowNamesFlag = "workflowNames"
)

var validator = func(out io.Writer) sse.ResponseValidator {
return func(r *http.Response) error {
if r.StatusCode == http.StatusOK {
return nil
}

if r.Body != nil {
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
return fault.Wrap(err)
}

Check warning on line 51 in cmd/wfxctl/cmd/job/events/events.go

View check run for this annotation

Codecov / codecov/patch

cmd/wfxctl/cmd/job/events/events.go#L50-L51

Added lines #L50 - L51 were not covered by tests

errResp := new(model.ErrorResponse)
if err := json.Unmarshal(b, errResp); err != nil {
return fault.Wrap(err)
}
if len(errResp.Errors) > 0 {
for _, msg := range errResp.Errors {
fmt.Fprintf(out, "ERROR: %s (code=%s, logref=%s)\n", msg.Message, msg.Code, msg.Logref)
}

Check warning on line 60 in cmd/wfxctl/cmd/job/events/events.go

View check run for this annotation

Codecov / codecov/patch

cmd/wfxctl/cmd/job/events/events.go#L57-L60

Added lines #L57 - L60 were not covered by tests
}
}
return fmt.Errorf("received HTTP status code: %d", r.StatusCode)
}
}

func init() {
f := Command.Flags()
f.String(idsFlag, "", "job ids (comma-separated)")
f.String(clientIdsFlag, "", "client ids (comma-separated)")
f.String(workflowNamesFlag, "", "workflow names (comma-separated)")
}

type SSETransport struct {
baseCmd *flags.BaseCmd
out io.Writer
}

// Submit implements the runtime.ClientTransport interface.
func (t SSETransport) Submit(op *runtime.ClientOperation) (interface{}, error) {
cfg := t.baseCmd.CreateTransportConfig()
rt := client.New(cfg.Host, generatedClient.DefaultBasePath, cfg.Schemes)
req := errutil.Must(rt.CreateHttpRequest(op))

httpClient := errutil.Must(t.baseCmd.CreateHTTPClient())
httpClient.Timeout = 0

client := sse.Client{
HTTPClient: httpClient,
DefaultReconnectionTime: time.Second * 5,
ResponseValidator: validator(t.out),
}

conn := client.NewConnection(req)
unsubscribe := conn.SubscribeMessages(func(event sse.Event) {
_, _ = os.Stdout.WriteString(event.Data)
os.Stdout.Write([]byte("\n"))
})
defer unsubscribe()

err := conn.Connect()
if err != nil {
return nil, fault.Wrap(err)
}

Check warning on line 104 in cmd/wfxctl/cmd/job/events/events.go

View check run for this annotation

Codecov / codecov/patch

cmd/wfxctl/cmd/job/events/events.go#L103-L104

Added lines #L103 - L104 were not covered by tests

return jobs.NewGetJobsEventsOK(), nil
}

var Command = &cobra.Command{
Use: "events",
Short: "Subscribe to job events",
Example: `
wfxctl job events --ids=1,2,3
`,
TraverseChildren: true,
Run: func(cmd *cobra.Command, args []string) {
params := jobs.NewGetJobsEventsParams()

ids := flags.Koanf.String(idsFlag)
if ids != "" {
params.WithIds(&ids)
}

clientIds := flags.Koanf.String(clientIdsFlag)
if ids != "" {
params.WithClientIds(&clientIds)
}

workflowNames := flags.Koanf.String(workflowNamesFlag)
if ids != "" {
params.WithWorkflows(&workflowNames)
}

baseCmd := flags.NewBaseCmd()
transport := SSETransport{baseCmd: &baseCmd, out: cmd.OutOrStderr()}
executor := generatedClient.New(transport, strfmt.Default)
if _, err := executor.Jobs.GetJobsEvents(params); err != nil {
log.Fatal().Msg("Failed to subscribe to job status")
}

Check warning on line 139 in cmd/wfxctl/cmd/job/events/events.go

View check run for this annotation

Codecov / codecov/patch

cmd/wfxctl/cmd/job/events/events.go#L138-L139

Added lines #L138 - L139 were not covered by tests
},
}
Loading

0 comments on commit 5ce13b2

Please sign in to comment.