Skip to content

Commit

Permalink
website: moved Status to the shared input for all pages
Browse files Browse the repository at this point in the history
website/api/v1: moved SSE to the StreamValue callbacks completely

website: moved the statusValue up a level so that all layers can use
the same one. A load-bearing StreamValue.
  • Loading branch information
Wessie committed Feb 26, 2024
1 parent b4d3093 commit 743db8f
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 75 deletions.
4 changes: 1 addition & 3 deletions website/api/php/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
)

func NewAPI(ctx context.Context, cfg config.Config, storage radio.StorageService,
streamer radio.StreamerService, manager radio.ManagerService) (*API, error) {

statusValue := util.StreamValue(ctx, manager.CurrentStatus)
streamer radio.StreamerService, statusValue *util.Value[radio.Status]) (*API, error) {

status, err := newV0Status(ctx, storage, streamer, statusValue)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion website/api/v1/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func NewAPI(ctx context.Context, cfg config.Config, templates templates.Executor
song: song,
}

go api.runSSE(ctx)
// start up status updates
api.runStatusUpdates(ctx)

return api, nil
}
Expand Down
57 changes: 9 additions & 48 deletions website/api/v1/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,15 @@ import (
"time"

radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/errors"
"github.com/R-a-dio/valkyrie/templates"
"github.com/R-a-dio/valkyrie/util"
"github.com/R-a-dio/valkyrie/util/sse"
"github.com/rs/zerolog"
"github.com/rs/zerolog/hlog"
)

func prepareStream[T any](ctx context.Context, fn func(context.Context) (T, error)) (T, error) {
for {
s, err := fn(ctx)
if err == nil {
return s, nil
}
zerolog.Ctx(ctx).Error().Err(err).Msg("failed to prepare stream")

select {
case <-ctx.Done():
return s, ctx.Err()
case <-time.After(time.Second * 3):
}
}
}

func (a *API) runSSE(ctx context.Context) {
for {
err := a.runStatusUpdates(ctx)
if errors.IsE(err, context.Canceled) {
return
}
}
}

func (a *API) runStatusUpdates(ctx context.Context) error {
const op errors.Op = "website/api/v1/API.runSongUpdates"

log := zerolog.Ctx(ctx).With().Str("sse", "song").Logger()

statusStream, err := prepareStream(ctx, a.manager.CurrentStatus)
if err != nil {
return errors.E(op, err)
}
func (a *API) runStatusUpdates(ctx context.Context) {
log := zerolog.Ctx(ctx).With().Str("sse", "updates").Logger()

var previous radio.Status

Expand All @@ -64,27 +31,23 @@ func (a *API) runStatusUpdates(ctx context.Context) error {
a.sse.SendListeners(i)
})

for {
status, err := statusStream.Next()
if err != nil {
log.Error().Err(err).Msg("source failure")
break
}

_ = util.StreamValue(ctx, a.manager.CurrentStatus, func(ctx context.Context, status radio.Status) {
// if status is zero it probably means it was an initial value or there is no stream
// either way skip the propagation to the sse stream
if status.IsZero() {
log.Debug().Msg("zero value")
continue
return
}

// only send events if the relevant data to said event has changed
// since our previous status
// only pass an update through if the song is different from the previous one
if !status.Song.EqualTo(previous.Song) {
log.Debug().Str("event", EventMetadata).Any("value", status).Msg("sending")
a.sse.SendNowPlaying(status)
go a.sendQueue(ctx)
go a.sendLastPlayed(ctx)
}

// same goes for the user one, only pass it through if the user actually changed
if status.User.ID != previous.User.ID {
log.Debug().Str("event", EventStreamer).Any("value", status.User).Msg("sending")
a.sse.SendStreamer(status.User)
Expand All @@ -93,9 +56,7 @@ func (a *API) runStatusUpdates(ctx context.Context) error {
}

previous = status
}

return nil
})
}

func (a *API) sendQueue(ctx context.Context) {
Expand Down
8 changes: 6 additions & 2 deletions website/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/R-a-dio/valkyrie/search"
"github.com/R-a-dio/valkyrie/storage"
"github.com/R-a-dio/valkyrie/templates"
"github.com/R-a-dio/valkyrie/util"
"github.com/R-a-dio/valkyrie/util/daypass"
"github.com/R-a-dio/valkyrie/website/admin"
phpapi "github.com/R-a-dio/valkyrie/website/api/php"
Expand Down Expand Up @@ -52,6 +53,9 @@ func Execute(ctx context.Context, cfg config.Config) error {
// RPC clients
streamer := cfg.Conf().Streamer.Client()
manager := cfg.Conf().Manager.Client()

// RPC values
statusValue := util.StreamValue(ctx, manager.CurrentStatus)
// templates
siteTemplates, err := templates.FromDirectory(cfg.Conf().TemplatePath)
if err != nil {
Expand Down Expand Up @@ -93,7 +97,7 @@ func Execute(ctx context.Context, cfg config.Config) error {
authentication := vmiddleware.NewAuthentication(storage, executor, sessionManager)
r.Use(authentication.UserMiddleware)
// shared input handling, stuff the base template needs
r.Use(vmiddleware.InputMiddleware(cfg))
r.Use(vmiddleware.InputMiddleware(cfg, statusValue))
// theme state management
r.Use(templates.ThemeCtx(storage))

Expand All @@ -112,7 +116,7 @@ func Execute(ctx context.Context, cfg config.Config) error {
// it's mostly self-contained to the /api/* route, except for /request that
// leaked out at some point
logger.Info().Str("event", "init").Str("part", "api_v0").Msg("")
v0, err := phpapi.NewAPI(ctx, cfg, storage, streamer, manager)
v0, err := phpapi.NewAPI(ctx, cfg, storage, streamer, statusValue)
if err != nil {
return errors.E(op, err)
}
Expand Down
10 changes: 8 additions & 2 deletions website/middleware/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type inputKey struct{}

// InputMiddleware generates an Input for each request and makes it available
// through InputFromRequest
func InputMiddleware(cfg config.Config) func(http.Handler) http.Handler {
func InputMiddleware(cfg config.Config, status *util.Value[radio.Status]) func(http.Handler) http.Handler {
PublicStreamURL := template.URL(cfg.Conf().Website.PublicStreamURL)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -26,6 +26,7 @@ func InputMiddleware(cfg config.Config) func(http.Handler) http.Handler {
IsUser: user != nil,
User: user,
StreamURL: PublicStreamURL,
Status: status.Latest(),
}

ctx = context.WithValue(ctx, inputKey{}, input)
Expand All @@ -37,7 +38,11 @@ func InputMiddleware(cfg config.Config) func(http.Handler) http.Handler {

// InputFromRequest returns the Input associated with the request
func InputFromRequest(r *http.Request) Input {
v := r.Context().Value(inputKey{})
return InputFromContext(r.Context())
}

func InputFromContext(ctx context.Context) Input {
v := ctx.Value(inputKey{})
if v == nil {
return Input{}
}
Expand All @@ -49,6 +54,7 @@ type Input struct {
IsUser bool
IsHTMX bool
User *radio.User
Status radio.Status
StreamURL template.URL
}

Expand Down
3 changes: 0 additions & 3 deletions website/public/home.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
type HomeInput struct {
middleware.Input

Status radio.Status
Queue []radio.QueueEntry
LastPlayed []radio.Song
News []radio.NewsPost
Expand Down Expand Up @@ -42,8 +41,6 @@ func (s State) getHome(w http.ResponseWriter, r *http.Request) error {
input := NewHomeInput(r)
ctx := r.Context()

input.Status = s.StatusValue.Latest()

queue, err := s.Streamer.Queue(ctx)
if err != nil {
return errors.E(op, errors.InternalServer, err)
Expand Down
29 changes: 13 additions & 16 deletions website/public/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/config"
"github.com/R-a-dio/valkyrie/templates"
"github.com/R-a-dio/valkyrie/util"
"github.com/R-a-dio/valkyrie/util/daypass"
"github.com/rs/zerolog/hlog"

Expand All @@ -25,27 +24,25 @@ func NewState(
search radio.SearchService) State {

return State{
Config: cfg,
Daypass: dp,
Templates: exec,
Manager: manager,
Streamer: streamer,
Storage: storage,
Search: search,
StatusValue: util.StreamValue(ctx, manager.CurrentStatus),
Config: cfg,
Daypass: dp,
Templates: exec,
Manager: manager,
Streamer: streamer,
Storage: storage,
Search: search,
}
}

type State struct {
config.Config

Daypass *daypass.Daypass
Templates templates.Executor
Manager radio.ManagerService
Streamer radio.StreamerService
Storage radio.StorageService
Search radio.SearchService
StatusValue *util.Value[radio.Status]
Daypass *daypass.Daypass
Templates templates.Executor
Manager radio.ManagerService
Streamer radio.StreamerService
Storage radio.StorageService
Search radio.SearchService
}

func (s *State) errorHandler(w http.ResponseWriter, r *http.Request, err error) {
Expand Down

0 comments on commit 743db8f

Please sign in to comment.