Skip to content

Commit

Permalink
feat: instant job event notifications
Browse files Browse the repository at this point in the history
This commit introduces a more efficient mechanism for notifications
related to job events, such as creation, updating, and deletion. Unlike
the previous architecture where clients had to constantly poll the
server for updates, the new implementation uses server-sent events (SSE)
to allow client to receive updates from the server , improving
efficiency and reducing network overhead.

Key Features:

- Introduced a new SSE-enabled endpoint, `/jobs/events`, which accepts
  filter parameters to customize the stream of events received.
- Clients can subscribe to `/jobs/events` for to receive updates on job
  events that meet specified filter criteria.
- Provided a client reference implementation through `wfxctl`.

Signed-off-by: Michael Adler <[email protected]>
  • Loading branch information
michaeladler committed Aug 25, 2023
1 parent 9dc0031 commit c7c68b7
Show file tree
Hide file tree
Showing 63 changed files with 3,297 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Job event notifications via server-sent events (see #11)

### Fixed

- Send HTTP status code 404 when attempting to access the file server while it is disabled
Expand All @@ -17,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- The response to a job status update always contains the workflow name now (see #11)

### Removed

## [0.1.0] - 2023-02-06
Expand Down
114 changes: 114 additions & 0 deletions api/job_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package api

/*
* SPDX-FileCopyrightText: 2023 Siemens AG
*
* SPDX-License-Identifier: Apache-2.0
*
* Author: Michael Adler <[email protected]>
*/

import (
"context"
"encoding/json"
"io"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/siemens/wfx/generated/model"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/workflow/dau"
"github.com/steinfletcher/apitest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestJobEventsSubscribe(t *testing.T) {
log.Logger = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.Stamp})

db := newInMemoryDB(t)
wf := dau.DirectWorkflow()
_, err := workflow.CreateWorkflow(context.Background(), db, wf)
require.NoError(t, err)

north, south := createNorthAndSouth(t, db)

handlers := []http.Handler{north, south}
for i, name := range allAPIs {
handler := handlers[i]
t.Run(name, func(t *testing.T) {
var jobID string
{
job, err := job.CreateJob(context.Background(), db,
&model.JobRequest{ClientID: "foo", Workflow: wf.Name})
require.NoError(t, err)
jobID = job.ID
}
require.NotEmpty(t, jobID)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

ch, _ := events.AddSubscriber(context.Background(), events.FilterParams{IDs: []string{jobID}})
// wait for event created by our status.Update
<-ch
// now our GET request should have received the response as well,
// add some extra time to be safe
time.Sleep(100 * time.Millisecond)
events.ShutdownSubscribers()
}()

wg.Add(1)
go func() {
defer wg.Done()
// wait for subscriber which is created by our GET request below and our test goroutine
for events.SubscriberCount() != 2 {
time.Sleep(50 * time.Millisecond)
}
// update job
_, err = status.Update(context.Background(), db, jobID, &model.JobStatus{State: "INSTALLING"}, model.EligibleEnumCLIENT)
require.NoError(t, err)
}()

result := apitest.New().
Handler(handler).
Get("/api/wfx/v1/jobs/events").Query("ids", jobID).
Expect(t).
Status(http.StatusOK).
Header("Content-Type", "text/event-stream").
End()

data, _ := io.ReadAll(result.Response.Body)
body := string(data)
require.NotEmpty(t, body)

// check body ends with two newlines as required by SSE spec
assert.True(t, strings.HasSuffix(body, "\n\n"))

// check body starts with data:
assert.True(t, strings.HasPrefix(body, "data: "))

// check content is a job and state is INSTALLING
var ev events.JobEvent
err = json.Unmarshal([]byte(strings.TrimPrefix(body, "data: ")), &ev)
require.NoError(t, err)
assert.Equal(t, "INSTALLING", ev.Job.Status.State)
assert.Equal(t, wf.Name, ev.Job.Workflow.Name)

wg.Wait()
events.ShutdownSubscribers()
})
}
}
23 changes: 23 additions & 0 deletions api/northbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package api

import (
"net/http"
"strings"

"github.com/Southclaws/fault"
"github.com/Southclaws/fault/ftag"
Expand All @@ -21,10 +22,12 @@ import (
"github.com/siemens/wfx/generated/northbound/restapi/operations/northbound"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/definition"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/middleware/responder/sse"
"github.com/siemens/wfx/persistence"
)

Expand Down Expand Up @@ -263,5 +266,25 @@ func NewNorthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return northbound.NewDeleteJobsIDTagsOK().WithPayload(tags)
})

serverAPI.NorthboundGetJobsEventsHandler = northbound.GetJobsEventsHandlerFunc(
func(params northbound.GetJobsEventsParams) middleware.Responder {
ctx := params.HTTPRequest.Context()
var filter events.FilterParams
if params.Ids != nil {
filter.IDs = strings.Split(*params.Ids, ",")
}
if params.ClientIds != nil {
filter.ClientIDs = strings.Split(*params.ClientIds, ",")
}

Check warning on line 278 in api/northbound.go

View check run for this annotation

Codecov / codecov/patch

api/northbound.go#L277-L278

Added lines #L277 - L278 were not covered by tests
if params.Workflows != nil {
filter.Workflows = strings.Split(*params.Workflows, ",")
}

Check warning on line 281 in api/northbound.go

View check run for this annotation

Codecov / codecov/patch

api/northbound.go#L280-L281

Added lines #L280 - L281 were not covered by tests
eventChan, err := events.AddSubscriber(ctx, filter)
if err != nil {
return northbound.NewGetJobsEventsDefault(http.StatusInternalServerError)
}

Check warning on line 285 in api/northbound.go

View check run for this annotation

Codecov / codecov/patch

api/northbound.go#L284-L285

Added lines #L284 - L285 were not covered by tests
return sse.Responder(ctx, eventChan)
})

return serverAPI, nil
}
23 changes: 23 additions & 0 deletions api/southbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package api

import (
"net/http"
"strings"

"github.com/Southclaws/fault"
"github.com/Southclaws/fault/ftag"
Expand All @@ -21,10 +22,12 @@ import (
"github.com/siemens/wfx/generated/southbound/restapi/operations/southbound"
"github.com/siemens/wfx/internal/handler/job"
"github.com/siemens/wfx/internal/handler/job/definition"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/internal/handler/job/status"
"github.com/siemens/wfx/internal/handler/job/tags"
"github.com/siemens/wfx/internal/handler/workflow"
"github.com/siemens/wfx/middleware/logging"
"github.com/siemens/wfx/middleware/responder/sse"
"github.com/siemens/wfx/persistence"
)

Expand Down Expand Up @@ -176,5 +179,25 @@ func NewSouthboundAPI(storage persistence.Storage) (*operations.WorkflowExecutor
return southbound.NewGetJobsIDTagsOK().WithPayload(tags)
})

serverAPI.SouthboundGetJobsEventsHandler = southbound.GetJobsEventsHandlerFunc(
func(params southbound.GetJobsEventsParams) middleware.Responder {
ctx := params.HTTPRequest.Context()
var filter events.FilterParams
if params.Ids != nil {
filter.IDs = strings.Split(*params.Ids, ",")
}
if params.ClientIds != nil {
filter.ClientIDs = strings.Split(*params.ClientIds, ",")
}

Check warning on line 191 in api/southbound.go

View check run for this annotation

Codecov / codecov/patch

api/southbound.go#L190-L191

Added lines #L190 - L191 were not covered by tests
if params.Workflows != nil {
filter.Workflows = strings.Split(*params.Workflows, ",")
}

Check warning on line 194 in api/southbound.go

View check run for this annotation

Codecov / codecov/patch

api/southbound.go#L193-L194

Added lines #L193 - L194 were not covered by tests
eventChan, err := events.AddSubscriber(ctx, filter)
if err != nil {
return southbound.NewGetJobsEventsDefault(http.StatusInternalServerError)
}

Check warning on line 198 in api/southbound.go

View check run for this annotation

Codecov / codecov/patch

api/southbound.go#L197-L198

Added lines #L197 - L198 were not covered by tests
return sse.Responder(ctx, eventChan)
})

return serverAPI, nil
}
7 changes: 5 additions & 2 deletions api/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ func createNorthAndSouth(t *testing.T, db persistence.Storage) (http.Handler, ht
}

func persistJob(t *testing.T, db persistence.Storage) *model.Job {
wf, err := workflow.CreateWorkflow(context.Background(), db, dau.DirectWorkflow())
require.NoError(t, err)
wf := dau.DirectWorkflow()
if found, _ := workflow.GetWorkflow(context.Background(), db, wf.Name); found == nil {
_, err := workflow.CreateWorkflow(context.Background(), db, wf)
require.NoError(t, err)
}

jobReq := model.JobRequest{
ClientID: "foo",
Expand Down
6 changes: 5 additions & 1 deletion cmd/wfx/cmd/root/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/siemens/wfx/cmd/wfx/metadata"
"github.com/siemens/wfx/internal/config"
"github.com/siemens/wfx/internal/handler/job/events"
"github.com/siemens/wfx/persistence"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -255,7 +256,10 @@ Examples of tasks are installation of firmware or other types of commands issued
}
}

// Create a context with a timeout to allow outstanding requests to complete
// shut down (disconnect) subscribers otherwise we cannot stop the web server due to open connections
events.ShutdownSubscribers()

// create a context with a timeout to allow outstanding requests to complete
var timeout time.Duration
k.Read(func(k *koanf.Koanf) {
timeout = k.Duration(gracefulTimeoutFlag)
Expand Down
Loading

0 comments on commit c7c68b7

Please sign in to comment.