diff --git a/website/api/php/api.go b/website/api/php/api.go index 6dbf63f3..2dcd3381 100644 --- a/website/api/php/api.go +++ b/website/api/php/api.go @@ -26,9 +26,7 @@ import ( ) func NewAPI(ctx context.Context, cfg config.Config, storage radio.StorageService, - streamer radio.StreamerService, manager radio.ManagerService) (*API, error) { - - statusValue := util.StreamValue(ctx, manager.CurrentStatus) + streamer radio.StreamerService, statusValue *util.Value[radio.Status]) (*API, error) { status, err := newV0Status(ctx, storage, streamer, statusValue) if err != nil { diff --git a/website/api/v1/router.go b/website/api/v1/router.go index 605d1a89..de0a5ddb 100644 --- a/website/api/v1/router.go +++ b/website/api/v1/router.go @@ -32,7 +32,8 @@ func NewAPI(ctx context.Context, cfg config.Config, templates templates.Executor song: song, } - go api.runSSE(ctx) + // start up status updates + api.runStatusUpdates(ctx) return api, nil } diff --git a/website/api/v1/sse.go b/website/api/v1/sse.go index 30eee9b7..154cbfda 100644 --- a/website/api/v1/sse.go +++ b/website/api/v1/sse.go @@ -11,7 +11,6 @@ import ( "time" radio "github.com/R-a-dio/valkyrie" - "github.com/R-a-dio/valkyrie/errors" "github.com/R-a-dio/valkyrie/templates" "github.com/R-a-dio/valkyrie/util" "github.com/R-a-dio/valkyrie/util/sse" @@ -19,40 +18,8 @@ import ( "github.com/rs/zerolog/hlog" ) -func prepareStream[T any](ctx context.Context, fn func(context.Context) (T, error)) (T, error) { - for { - s, err := fn(ctx) - if err == nil { - return s, nil - } - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to prepare stream") - - select { - case <-ctx.Done(): - return s, ctx.Err() - case <-time.After(time.Second * 3): - } - } -} - -func (a *API) runSSE(ctx context.Context) { - for { - err := a.runStatusUpdates(ctx) - if errors.IsE(err, context.Canceled) { - return - } - } -} - -func (a *API) runStatusUpdates(ctx context.Context) error { - const op errors.Op = "website/api/v1/API.runSongUpdates" - - log := zerolog.Ctx(ctx).With().Str("sse", "song").Logger() - - statusStream, err := prepareStream(ctx, a.manager.CurrentStatus) - if err != nil { - return errors.E(op, err) - } +func (a *API) runStatusUpdates(ctx context.Context) { + log := zerolog.Ctx(ctx).With().Str("sse", "updates").Logger() var previous radio.Status @@ -64,20 +31,15 @@ func (a *API) runStatusUpdates(ctx context.Context) error { a.sse.SendListeners(i) }) - for { - status, err := statusStream.Next() - if err != nil { - log.Error().Err(err).Msg("source failure") - break - } - + _ = util.StreamValue(ctx, a.manager.CurrentStatus, func(ctx context.Context, status radio.Status) { + // if status is zero it probably means it was an initial value or there is no stream + // either way skip the propagation to the sse stream if status.IsZero() { log.Debug().Msg("zero value") - continue + return } - // only send events if the relevant data to said event has changed - // since our previous status + // only pass an update through if the song is different from the previous one if !status.Song.EqualTo(previous.Song) { log.Debug().Str("event", EventMetadata).Any("value", status).Msg("sending") a.sse.SendNowPlaying(status) @@ -85,6 +47,7 @@ func (a *API) runStatusUpdates(ctx context.Context) error { go a.sendLastPlayed(ctx) } + // same goes for the user one, only pass it through if the user actually changed if status.User.ID != previous.User.ID { log.Debug().Str("event", EventStreamer).Any("value", status.User).Msg("sending") a.sse.SendStreamer(status.User) @@ -93,9 +56,7 @@ func (a *API) runStatusUpdates(ctx context.Context) error { } previous = status - } - - return nil + }) } func (a *API) sendQueue(ctx context.Context) { diff --git a/website/main.go b/website/main.go index 8ebf9eea..280aa39d 100644 --- a/website/main.go +++ b/website/main.go @@ -11,6 +11,7 @@ import ( "github.com/R-a-dio/valkyrie/search" "github.com/R-a-dio/valkyrie/storage" "github.com/R-a-dio/valkyrie/templates" + "github.com/R-a-dio/valkyrie/util" "github.com/R-a-dio/valkyrie/util/daypass" "github.com/R-a-dio/valkyrie/website/admin" phpapi "github.com/R-a-dio/valkyrie/website/api/php" @@ -52,6 +53,9 @@ func Execute(ctx context.Context, cfg config.Config) error { // RPC clients streamer := cfg.Conf().Streamer.Client() manager := cfg.Conf().Manager.Client() + + // RPC values + statusValue := util.StreamValue(ctx, manager.CurrentStatus) // templates siteTemplates, err := templates.FromDirectory(cfg.Conf().TemplatePath) if err != nil { @@ -93,7 +97,7 @@ func Execute(ctx context.Context, cfg config.Config) error { authentication := vmiddleware.NewAuthentication(storage, executor, sessionManager) r.Use(authentication.UserMiddleware) // shared input handling, stuff the base template needs - r.Use(vmiddleware.InputMiddleware(cfg)) + r.Use(vmiddleware.InputMiddleware(cfg, statusValue)) // theme state management r.Use(templates.ThemeCtx(storage)) @@ -112,7 +116,7 @@ func Execute(ctx context.Context, cfg config.Config) error { // it's mostly self-contained to the /api/* route, except for /request that // leaked out at some point logger.Info().Str("event", "init").Str("part", "api_v0").Msg("") - v0, err := phpapi.NewAPI(ctx, cfg, storage, streamer, manager) + v0, err := phpapi.NewAPI(ctx, cfg, storage, streamer, statusValue) if err != nil { return errors.E(op, err) } diff --git a/website/middleware/input.go b/website/middleware/input.go index e208c59f..e5dcbcfa 100644 --- a/website/middleware/input.go +++ b/website/middleware/input.go @@ -14,7 +14,7 @@ type inputKey struct{} // InputMiddleware generates an Input for each request and makes it available // through InputFromRequest -func InputMiddleware(cfg config.Config) func(http.Handler) http.Handler { +func InputMiddleware(cfg config.Config, status *util.Value[radio.Status]) func(http.Handler) http.Handler { PublicStreamURL := template.URL(cfg.Conf().Website.PublicStreamURL) return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -26,6 +26,7 @@ func InputMiddleware(cfg config.Config) func(http.Handler) http.Handler { IsUser: user != nil, User: user, StreamURL: PublicStreamURL, + Status: status.Latest(), } ctx = context.WithValue(ctx, inputKey{}, input) @@ -37,7 +38,11 @@ func InputMiddleware(cfg config.Config) func(http.Handler) http.Handler { // InputFromRequest returns the Input associated with the request func InputFromRequest(r *http.Request) Input { - v := r.Context().Value(inputKey{}) + return InputFromContext(r.Context()) +} + +func InputFromContext(ctx context.Context) Input { + v := ctx.Value(inputKey{}) if v == nil { return Input{} } @@ -49,6 +54,7 @@ type Input struct { IsUser bool IsHTMX bool User *radio.User + Status radio.Status StreamURL template.URL } diff --git a/website/public/home.go b/website/public/home.go index bf22b462..8397c0a6 100644 --- a/website/public/home.go +++ b/website/public/home.go @@ -12,7 +12,6 @@ import ( type HomeInput struct { middleware.Input - Status radio.Status Queue []radio.QueueEntry LastPlayed []radio.Song News []radio.NewsPost @@ -42,8 +41,6 @@ func (s State) getHome(w http.ResponseWriter, r *http.Request) error { input := NewHomeInput(r) ctx := r.Context() - input.Status = s.StatusValue.Latest() - queue, err := s.Streamer.Queue(ctx) if err != nil { return errors.E(op, errors.InternalServer, err) diff --git a/website/public/state.go b/website/public/state.go index 11959116..d76fbce3 100644 --- a/website/public/state.go +++ b/website/public/state.go @@ -7,7 +7,6 @@ import ( radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/config" "github.com/R-a-dio/valkyrie/templates" - "github.com/R-a-dio/valkyrie/util" "github.com/R-a-dio/valkyrie/util/daypass" "github.com/rs/zerolog/hlog" @@ -25,27 +24,25 @@ func NewState( search radio.SearchService) State { return State{ - Config: cfg, - Daypass: dp, - Templates: exec, - Manager: manager, - Streamer: streamer, - Storage: storage, - Search: search, - StatusValue: util.StreamValue(ctx, manager.CurrentStatus), + Config: cfg, + Daypass: dp, + Templates: exec, + Manager: manager, + Streamer: streamer, + Storage: storage, + Search: search, } } type State struct { config.Config - Daypass *daypass.Daypass - Templates templates.Executor - Manager radio.ManagerService - Streamer radio.StreamerService - Storage radio.StorageService - Search radio.SearchService - StatusValue *util.Value[radio.Status] + Daypass *daypass.Daypass + Templates templates.Executor + Manager radio.ManagerService + Streamer radio.StreamerService + Storage radio.StorageService + Search radio.SearchService } func (s *State) errorHandler(w http.ResponseWriter, r *http.Request, err error) {