diff --git a/templates/base.tmpl b/templates/base.tmpl index e547f5b6..86fe143d 100644 --- a/templates/base.tmpl +++ b/templates/base.tmpl @@ -1,5 +1,5 @@ {{define "full-page"}} - + {{ template "head" . }} {{ template "scripts" . }} diff --git a/templates/default/home.tmpl b/templates/default/home.tmpl index 5d1be840..dd51d02a 100644 --- a/templates/default/home.tmpl +++ b/templates/default/home.tmpl @@ -21,7 +21,17 @@ - {{template "nowplaying" .Status}} +
+ {{template "nowplaying" .Status}} +
+
+
+

Listeners: {{.Status.Listeners}}

+
+
+

00:00 / 00:00

+
+
@@ -31,8 +41,12 @@
- {{template "lastplayed" .LastPlayed}} - {{template "queue" .Queue}} +
+ {{template "lastplayed" .LastPlayed}} +
+
+ {{template "queue" .Queue}} +
@@ -66,46 +80,34 @@ Home {{printjson .}} {{end}} {{define "nowplaying"}}

-

{{.Song.Metadata}}

+

{{.Song.Metadata}}

{{if .Song.DatabaseTrack}}{{.Song.Tags}}{{end}}

-
-
-

Listeners: {{.Listeners}}

-
-
-

00:00 / 00:00

-
-
{{end}} {{define "lastplayed"}} -
-

Last Played

-
-
    - {{range $song := .}} -
  • - - {{$song.Metadata}} -
  • - {{end}} -
-
-
+

Last Played

+
+
    + {{range $song := .}} +
  • + + {{$song.Metadata}} +
  • + {{end}} +
+
{{end}} {{define "queue"}} -
-

Queue

-
-
    - {{range $song := .}} -
  • - {{$song.Metadata}} - -
  • - {{end}} -
-
-
+

Queue

+
+
    + {{range $song := .}} +
  • + {{$song.Metadata}} + +
  • + {{end}} +
+
{{end}} \ No newline at end of file diff --git a/templates/default/partials/head.tmpl b/templates/default/partials/head.tmpl index f429834c..7da436a2 100644 --- a/templates/default/partials/head.tmpl +++ b/templates/default/partials/head.tmpl @@ -6,6 +6,7 @@ + {{template "styles"}} {{end}} {{define "title"}}R/a/dio{{end}} diff --git a/templates/loader.go b/templates/loader.go index fd68e419..62194266 100644 --- a/templates/loader.go +++ b/templates/loader.go @@ -252,6 +252,7 @@ func LoadThemes(fsys fs.FS) (Themes, error) { var state loadState var err error + state.fs = fsys state.baseTemplates, err = readDirFilterString(fsys, ".", isTemplate) if err != nil { return nil, errors.E(op, err) diff --git a/util/sse/sse.go b/util/sse/sse.go index 3abdee85..ee429f63 100644 --- a/util/sse/sse.go +++ b/util/sse/sse.go @@ -2,6 +2,7 @@ package sse import ( "bytes" + "slices" "strconv" "time" @@ -49,5 +50,5 @@ func (e Event) Encode() []byte { } b.WriteString("\n") - return b.Bytes() + return slices.Clone(b.Bytes()) } diff --git a/website/api/v1/router.go b/website/api/v1/router.go index 4d3c9f58..902bab70 100644 --- a/website/api/v1/router.go +++ b/website/api/v1/router.go @@ -2,64 +2,41 @@ package v1 import ( "context" - "log" - "time" radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/config" - "github.com/R-a-dio/valkyrie/util/eventstream" + "github.com/R-a-dio/valkyrie/storage" + "github.com/R-a-dio/valkyrie/templates" "github.com/go-chi/chi/v5" ) -func NewAPI(ctx context.Context, cfg config.Config) (*API, error) { - api := &API{ - Context: ctx, - Config: cfg, - sse: NewStream(), - manager: cfg.Conf().Manager.Client(), +func NewAPI(ctx context.Context, cfg config.Config, templates *templates.Executor) (*API, error) { + song, err := storage.Open(cfg) + if err != nil { + return nil, err } - go func() { - defer api.sse.Shutdown() - - m := cfg.Conf().Manager.Client() - - var s eventstream.Stream[*radio.SongUpdate] - var err error - for { - s, err = m.CurrentSong(ctx) - if err == nil { - break - } - - log.Println("v1/api:setup:", err) - time.Sleep(time.Second * 3) - } - - for { - us, err := s.Next() - if err != nil { - log.Println("v1/api:loop:", err) - break - } - if us == nil { - log.Println("v1/api:loop: nil value") - continue - } + api := &API{ + Context: ctx, + Config: cfg, + sse: NewStream(templates), + manager: cfg.Conf().Manager.Client(), + streamer: cfg.Conf().Streamer.Client(), + song: song, + } - log.Println("v1/api:sending:", us.Metadata) - api.sse.SendEvent(EventMetadata, []byte(us.Metadata)) - } - }() + go api.runSSE(ctx) return api, nil } type API struct { - Context context.Context - Config config.Config - sse *Stream - manager radio.ManagerService + Context context.Context + Config config.Config + sse *Stream + manager radio.ManagerService + streamer radio.StreamerService + song radio.SongStorageService } func (a *API) Router() chi.Router { diff --git a/website/api/v1/sse.go b/website/api/v1/sse.go index e9e29c6e..f4c6d3d1 100644 --- a/website/api/v1/sse.go +++ b/website/api/v1/sse.go @@ -1,6 +1,7 @@ package v1 import ( + "bytes" "context" "log" "maps" @@ -26,9 +27,16 @@ func prepareStream[T any](ctx context.Context, fn func(context.Context) (T, erro } func (a *API) runSSE(ctx context.Context) error { + var wg sync.WaitGroup + + wg.Add(1) // prepare our eventstreams from the manager - go a.runSongUpdates(ctx) + go func() { + defer wg.Done() + a.runSongUpdates(ctx) + }() + wg.Wait() return nil } @@ -38,19 +46,45 @@ func (a *API) runSongUpdates(ctx context.Context) error { for { us, err := song_stream.Next() if err != nil { + log.Println("v1/api:song:", err) break } if us == nil { + log.Println("v1/api:song: nil value") continue } - a.sse.SendEvent(EventMetadata, []byte(us.Metadata)) + log.Println("v1/api:song:sending:", us) + a.sse.SendNowPlaying(us) + // TODO: add a timeout scenario + go a.sendQueue(ctx) + go a.sendLastPlayed(ctx) } return nil } +func (a *API) sendQueue(ctx context.Context) { + q, err := a.streamer.Queue(ctx) + if err != nil { + log.Println("v1/api:queue:", err) + return + } + + a.sse.SendQueue(q) +} + +func (a *API) sendLastPlayed(ctx context.Context) { + lp, err := a.song.Song(ctx).LastPlayed(0, 5) + if err != nil { + log.Println("v1/api:lastplayed:", err) + return + } + + a.sse.SendLastPlayed(lp) +} + const ( SUBSCRIBE = "subscribe" SEND = "send" @@ -82,15 +116,16 @@ type Stream struct { shutdownCh chan struct{} // templates for the site, used in theme support - site *templates.Site + templates *templates.Executor } -func NewStream() *Stream { +func NewStream(exec *templates.Executor) *Stream { s := &Stream{ reqs: make(chan request), mu: new(sync.RWMutex), last: make(map[EventName]message), shutdownCh: make(chan struct{}), + templates: exec, } go s.run() return s @@ -101,7 +136,7 @@ func NewStream() *Stream { func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { controller := http.NewResponseController(w) - themeIdx := s.themeIndex(middleware.GetTheme(r.Context())) + theme := middleware.GetTheme(r.Context()) log.Println("sse: subscribing") ch := s.sub() @@ -123,8 +158,8 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mu.RUnlock() for _, m := range init { - log.Println("sending initial event:", string(m.data[themeIdx])) - if _, err := w.Write(m.data[themeIdx]); err != nil { + log.Println("sending initial event:", string(m[theme])) + if _, err := w.Write(m[theme]); err != nil { return } } @@ -133,7 +168,7 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { // start the actual new-event loop log.Println("sse: starting loop") for m := range ch { - if _, err := w.Write(m.data[themeIdx]); err != nil { + if _, err := w.Write(m[theme]); err != nil { return } controller.Flush() @@ -141,9 +176,7 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // SendEvent sends an SSE event with the data given. -func (s *Stream) SendEvent(event EventName, data []byte) { - m := s.NewMessage(event, data) - +func (s *Stream) SendEvent(event EventName, m message) { select { case s.reqs <- request{cmd: SEND, m: m, e: event}: case <-s.shutdownCh: @@ -213,17 +246,31 @@ func (s *Stream) Shutdown() { } } -func (s *Stream) themeIndex(theme string) int { - return 0 -} +func (s *Stream) NewMessage(event EventName, template string, data any) message { + m, err := s.templates.ExecuteTemplateAll(template, data) + if err != nil { + log.Println("failed creating message", err) + return nil + } -func (s *Stream) NewMessage(event EventName, data any) message { - switch data.(type) { - case radio.SongUpdate: - return message{} + // encode template results to server-side-event format + for k, v := range m { + v = bytes.TrimSpace(v) + m[k] = sse.Event{Name: event, Data: v}.Encode() } + return m +} - return message{} +func (s *Stream) SendNowPlaying(data *radio.SongUpdate) { + s.SendEvent(EventMetadata, s.NewMessage(EventMetadata, "nowplaying", data)) +} + +func (s *Stream) SendLastPlayed(data []radio.Song) { + s.SendEvent(EventLastPlayed, s.NewMessage(EventLastPlayed, "lastplayed", data)) +} + +func (s *Stream) SendQueue(data []radio.QueueEntry) { + s.SendEvent(EventQueue, s.NewMessage(EventQueue, "queue", data)) } // request send over the management channel @@ -234,10 +281,4 @@ type request struct { e EventName // SEND only } -// message encapsulates an SSE event -type message struct { - // event name used in Stream.last - event EventName - // data is a slice of sse-encoded-event; one for each theme - data [][]byte -} +type message map[string][]byte diff --git a/website/main.go b/website/main.go index c370646e..9bbfdcbd 100644 --- a/website/main.go +++ b/website/main.go @@ -56,6 +56,8 @@ func Execute(ctx context.Context, cfg config.Config) error { // user handling authentication := vmiddleware.NewAuthentication(storage, executor, sessionManager) r.Use(authentication.UserMiddleware) + // theme state management + r.Use(vmiddleware.ThemeCtx(storage)) // legacy urls that once pointed to our stream, redirect them to the new url r.Get("/main.mp3", RedirectLegacyStream) @@ -80,7 +82,7 @@ func Execute(ctx context.Context, cfg config.Config) error { r.Route(`/request/{TrackID:[0-9]+}`, v0.RequestRoute) log.Println("starting v1 api") - v1, err := v1.NewAPI(ctx, cfg) + v1, err := v1.NewAPI(ctx, cfg, executor) if err != nil { return errors.E(op, err) }