From ad9157e1056a37e7d90aa1240f987e6b44baf749 Mon Sep 17 00:00:00 2001 From: Wessie Date: Fri, 26 Jan 2024 11:02:43 +0000 Subject: [PATCH] templates: rework the loader to be a function this makes the loading threadsafe by nature of it not sharing any state between calls, other parts of the template reloading isn't thread-safe yet. website: start implementing theme-supported SSE API; instead of sending plain-text we send html appropriate for each client. --- go.mod | 9 +-- go.sum | 10 ++++ templates/default/home.tmpl | 87 ++++++++++++++++------------- templates/executor.go | 37 ++++++++++-- templates/loader.go | 107 +++++++++++++++++------------------ util/sse/sse_test.go | 14 ++--- website/api/v1/router.go | 2 + website/api/v1/sse.go | 108 ++++++++++++++++++++++++++++++------ 8 files changed, 247 insertions(+), 127 deletions(-) diff --git a/go.mod b/go.mod index 6024ba14..f2e0b50d 100644 --- a/go.mod +++ b/go.mod @@ -16,14 +16,15 @@ require ( github.com/lrstanley/girc v0.0.0-20230911164840-f47717952bf9 github.com/olivere/elastic/v7 v7.0.32 github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300 - golang.org/x/crypto v0.17.0 + golang.org/x/crypto v0.18.0 google.golang.org/grpc v1.60.1 ) require ( github.com/golang/protobuf v1.5.3 // indirect - golang.org/x/net v0.19.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect + golang.org/x/net v0.20.0 // indirect + golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect ) @@ -36,6 +37,6 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/pkg/errors v0.9.1 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/tools v0.16.0 // indirect + golang.org/x/tools v0.17.0 // indirect google.golang.org/protobuf v1.32.0 ) diff --git a/go.sum b/go.sum index 29c3d336..db7ac2d9 100644 --- a/go.sum +++ b/go.sum @@ -85,16 +85,26 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA= +golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM= golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= +golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= +golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik= google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE= diff --git a/templates/default/home.tmpl b/templates/default/home.tmpl index 796deefa..5d1be840 100644 --- a/templates/default/home.tmpl +++ b/templates/default/home.tmpl @@ -21,19 +21,7 @@ -

-

{{.Status.Song.Metadata}}

-

{{if .Status.Song.DatabaseTrack}}{{.Status.Song.Tags}}{{end}}

-

- -
-
-

Listeners: {{.Status.Listeners}}

-
-
-

00:00 / 00:00

-
-
+ {{template "nowplaying" .Status}}
@@ -43,32 +31,8 @@
-
-

Last Played

-
-
    - {{range $song := .LastPlayed}} -
  • - {{Until $song.LastPlayed | PrettyDuration}} - {{$song.Metadata}} -
  • - {{end}} -
-
-
-
-

Queue

-
-
    - {{range $song := .Queue}} -
  • - {{$song.Metadata}} - {{Until $song.ExpectedStartTime | PrettyDuration}} -
  • - {{end}} -
-
-
+ {{template "lastplayed" .LastPlayed}} + {{template "queue" .Queue}}
@@ -99,4 +63,49 @@
Home {{printjson .}} +{{end}} +{{define "nowplaying"}} +

+

{{.Song.Metadata}}

+

{{if .Song.DatabaseTrack}}{{.Song.Tags}}{{end}}

+

+ +
+
+

Listeners: {{.Listeners}}

+
+
+

00:00 / 00:00

+
+
+{{end}} +{{define "lastplayed"}} +
+

Last Played

+
+
    + {{range $song := .}} +
  • + + {{$song.Metadata}} +
  • + {{end}} +
+
+
+{{end}} +{{define "queue"}} +
+

Queue

+
+
    + {{range $song := .}} +
  • + {{$song.Metadata}} + +
  • + {{end}} +
+
+
{{end}} \ No newline at end of file diff --git a/templates/executor.go b/templates/executor.go index c5d16a1c..28f665c2 100644 --- a/templates/executor.go +++ b/templates/executor.go @@ -3,20 +3,21 @@ package templates import ( "bytes" "io" + "slices" "github.com/R-a-dio/valkyrie/errors" "github.com/R-a-dio/valkyrie/util/pool" ) +var bufferPool = pool.NewResetPool(func() *bytes.Buffer { return new(bytes.Buffer) }) + type Executor struct { site *Site - pool *pool.ResetPool[*bytes.Buffer] } func NewExecutor(site *Site) *Executor { return &Executor{ site: site, - pool: pool.NewResetPool(func() *bytes.Buffer { return new(bytes.Buffer) }), } } @@ -40,6 +41,8 @@ func (e *Executor) ExecutePartial(theme, page string, output io.Writer, input an return nil } +// ExecuteTemplate selects a theme, page and template and feeds it the input given and writing the template output +// to the output writer. Output is buffered until template execution is done before writing to output. func (e *Executor) ExecuteTemplate(theme, page string, template string, output io.Writer, input any) error { const op errors.Op = "templates/Executor.ExecuteTemplate" @@ -48,8 +51,8 @@ func (e *Executor) ExecuteTemplate(theme, page string, template string, output i return errors.E(op, err) } - b := e.pool.Get() - defer e.pool.Put(b) + b := bufferPool.Get() + defer bufferPool.Put(b) err = tmpl.ExecuteTemplate(b, template, input) if err != nil { @@ -62,3 +65,29 @@ func (e *Executor) ExecuteTemplate(theme, page string, template string, output i } return nil } + +// ExecuteTemplateAll executes the template given feeding the input given for all known themes +func (e *Executor) ExecuteTemplateAll(template string, input any) (map[string][]byte, error) { + const op errors.Op = "templates/Executor.ExecuteTemplateAll" + + var out = make(map[string][]byte) + + b := bufferPool.Get() + defer bufferPool.Put(b) + + for _, theme := range e.site.ThemeNames() { + tmpl, err := e.site.Template(theme, "home") + if err != nil { + return nil, errors.E(op, err) + } + + err = tmpl.ExecuteTemplate(b, template, input) + if err != nil { + return nil, errors.E(op, err) + } + + out[theme] = slices.Clone(b.Bytes()) + b.Reset() + } + return out, nil +} diff --git a/templates/loader.go b/templates/loader.go index a0ba1f99..fd68e419 100644 --- a/templates/loader.go +++ b/templates/loader.go @@ -12,9 +12,11 @@ import ( "path/filepath" "slices" "strings" + "sync" "text/tabwriter" "github.com/R-a-dio/valkyrie/errors" + "golang.org/x/exp/maps" ) const ( @@ -28,8 +30,9 @@ const ( // Site is an overarching struct containing all the themes of the website. type Site struct { - loader Loader + fs fs.FS + mu sync.RWMutex themes Themes cache map[string]*template.Template } @@ -37,7 +40,10 @@ type Site struct { func (s *Site) Reload() error { const op errors.Op = "templates/Reload" - themes, err := s.loader.Load() + s.mu.Lock() + defer s.mu.Unlock() + + themes, err := LoadThemes(s.fs) if err != nil { return errors.E(op, err) } @@ -53,6 +59,12 @@ func (s *Site) Executor() *Executor { return NewExecutor(s) } +func (s *Site) ThemeNames() []string { + keys := maps.Keys(s.themes) + slices.Sort(keys) + return keys +} + // Template returns a Template associated with the theme and page name given. // // If theme does not exist it uses the default-theme @@ -148,12 +160,12 @@ func FromFS(fsys fs.FS) (*Site, error) { var err error tmpl := Site{ - loader: NewLoader(fsys), + fs: fsys, //bufferPool: NewPool(func() *bytes.Buffer { return new(bytes.Buffer) }), cache: make(map[string]*template.Template), } - tmpl.themes, err = tmpl.loader.Load() + tmpl.themes, err = LoadThemes(fsys) if err != nil { return nil, errors.E(op, err) } @@ -161,30 +173,11 @@ func FromFS(fsys fs.FS) (*Site, error) { return &tmpl, nil } -func NewLoader(fsys fs.FS) Loader { - return Loader{fs: fsys} -} - -type Loader struct { - // fs is the filesystem we're loading files from - fs fs.FS - - // baseTemplates contain the files at the base of the directory hierarchy and are included - // in all bundles - baseTemplates []string - // defaultTheme is a mapping of page-name to default-template-bundle for easy backfilling of - // undefined pages in themes - defaultTheme map[string]*TemplateBundle - // defaultPartials contain the files in the DEFAULT_DIR/partials directory and are included - // in all themes as partials - defaultPartials []string -} - // TemplateBundle contains all the filenames required to construct a template instance // for the page type TemplateBundle struct { - // loader used to actually load the relative-filenames below - loader *Loader + // fs to load the relative-filenames below + fs fs.FS // the following fields contain all the filenames of the templates we're parsing // into a html/template.Template. They're listed in load-order, last one wins. base []string @@ -192,8 +185,6 @@ type TemplateBundle struct { partials []string defaultPage string page string - - cache *template.Template } // Files returns all the files in this bundle sorted in load-order @@ -215,7 +206,7 @@ func (tb *TemplateBundle) Files() []string { func (tb *TemplateBundle) Template() (*template.Template, error) { const op errors.Op = "templates/TemplateBundle.Template" - tmpl, err := createRoot().ParseFS(tb.loader.fs, tb.Files()...) + tmpl, err := createRoot().ParseFS(tb.fs, tb.Files()...) if err != nil { return nil, errors.E(op, errors.TemplateParseError, err) } @@ -247,48 +238,55 @@ func (tb ThemeBundle) Page(name string) (*TemplateBundle, error) { return tlb, nil } -func (l *Loader) Load() (Themes, error) { - const op errors.Op = "templates/Loader.Load" +type loadState struct { + fs fs.FS + + baseTemplates []string + defaultPartials []string + defaultBundle map[string]*TemplateBundle +} + +func LoadThemes(fsys fs.FS) (Themes, error) { + const op errors.Op = "templates/LoadThemes" - bt, err := readDirFilterString(l.fs, ".", isTemplate) + var state loadState + var err error + + state.baseTemplates, err = readDirFilterString(fsys, ".", isTemplate) if err != nil { return nil, errors.E(op, err) } - l.baseTemplates = bt // find our default directory - defaultBundle, err := l.loadSubDir(DEFAULT_DIR) + state.defaultBundle, err = state.loadSubDir(DEFAULT_DIR) if err != nil { return nil, errors.E(op, err) } // sanity check that we have atleast 1 bundle - if len(defaultBundle) == 0 { + if len(state.defaultBundle) == 0 { return nil, errors.E(op, "default bundle empty") } - // grab the partials from the first bundle - for _, v := range defaultBundle { - l.defaultPartials = v.partials + // grab the partials from any template bundle + for _, v := range state.defaultBundle { + state.defaultPartials = v.partials break } - // plant the defaults in the loader so the other themes can use them - l.defaultTheme = defaultBundle - // read the rest of the directories - subdirs, err := readDirFilterString(l.fs, ".", func(e fs.DirEntry) bool { return e.IsDir() }) + subdirs, err := readDirFilterString(fsys, ".", func(e fs.DirEntry) bool { return e.IsDir() }) if err != nil { return nil, errors.E(op, err) } themes := Themes{ - DEFAULT_DIR: ThemeBundle{DEFAULT_DIR, defaultBundle}, + DEFAULT_DIR: ThemeBundle{DEFAULT_DIR, state.defaultBundle}, } for _, subdir := range subdirs { if subdir == DEFAULT_DIR { // skip the default dir since we already loaded it earlier continue } - bundles, err := l.loadSubDir(subdir) + bundles, err := state.loadSubDir(subdir) if err != nil { return nil, errors.E(op, err) } @@ -312,19 +310,19 @@ func noExt(s string) string { // it looks for `*.tmpl` files in this subdirectory and in a `partials/` subdirectory // if one exists. Returns a map of `filename:bundle` where the bundle is a TemplateBundle // that contains all the filenames required to construct the page named after the filename. -func (l *Loader) loadSubDir(dir string) (map[string]*TemplateBundle, error) { - const op errors.Op = "templates/Loader.loadSubDir" +func (ls loadState) loadSubDir(dir string) (map[string]*TemplateBundle, error) { + const op errors.Op = "templates/loadState.loadSubDir" var bundle = TemplateBundle{ - loader: l, - base: l.baseTemplates, - defaultPartials: l.defaultPartials, + fs: ls.fs, + base: ls.baseTemplates, + defaultPartials: ls.defaultPartials, } // read the partials subdirectory partialDir := path.Join(dir, PARTIAL_DIR) - entries, err := readDirFilter(l.fs, partialDir, isTemplate) + entries, err := readDirFilter(ls.fs, partialDir, isTemplate) if err != nil && !errors.IsE(err, fs.ErrNotExist) { return nil, errors.E(op, err) } @@ -337,7 +335,7 @@ func (l *Loader) loadSubDir(dir string) (map[string]*TemplateBundle, error) { bundle.partials = partials // read the actual directory - entries, err = readDirFilter(l.fs, dir, isTemplate) + entries, err = readDirFilter(ls.fs, dir, isTemplate) if err != nil { return nil, errors.E(op, err) } @@ -347,7 +345,7 @@ func (l *Loader) loadSubDir(dir string) (map[string]*TemplateBundle, error) { name := entry.Name() // create a bundle for each page in this directory bundle := bundle - defaultPage := l.defaultTheme[noExt(name)] + defaultPage := ls.defaultBundle[noExt(name)] if defaultPage != nil { bundle.defaultPage = defaultPage.page } @@ -357,16 +355,15 @@ func (l *Loader) loadSubDir(dir string) (map[string]*TemplateBundle, error) { } // if there are no defaults to handle, we're done - if l.defaultTheme == nil { + if ls.defaultBundle == nil { return bundles, nil } // otherwise check for missing pages, these are pages defined // in the default theme but not in this current theme. Copy over // the default pages if they're missing. - for name, page := range l.defaultTheme { - _, ok := bundles[name] - if ok { + for name, page := range ls.defaultBundle { + if _, ok := bundles[name]; ok { continue } bundles[name] = page diff --git a/util/sse/sse_test.go b/util/sse/sse_test.go index f5265dc7..bab92511 100644 --- a/util/sse/sse_test.go +++ b/util/sse/sse_test.go @@ -1,4 +1,4 @@ -package v1 +package sse import ( "bytes" @@ -15,15 +15,15 @@ type eventCase struct { var eventEncodeCases = []eventCase{ { "simple message", - Event{Name: EventQueue, Data: []byte("queue information")}, + Event{Name: "queue", Data: []byte("queue information")}, "event: queue\ndata: queue information\n\n", }, { "simple newlines", - Event{Name: EventThread, Data: []byte("some data\nwith\nnewlines")}, + Event{Name: "thread", Data: []byte("some data\nwith\nnewlines")}, "event: thread\ndata: some data\ndata: with\ndata: newlines\n\n", }, { "double newline", - Event{Name: EventMetadata, Data: []byte("some data\n\nwith newlines")}, + Event{Name: "metadata", Data: []byte("some data\n\nwith newlines")}, "event: metadata\ndata: some data\ndata\ndata: with newlines\n\n", }, { "encode id", @@ -35,7 +35,7 @@ var eventEncodeCases = []eventCase{ "retry: 10000\n\n", }, { "encode everything", - Event{ID: []byte("100"), Name: EventMetadata, Retry: time.Second * 50, Data: []byte("some data\nand a newline")}, + Event{ID: []byte("100"), Name: "metadata", Retry: time.Second * 50, Data: []byte("some data\nand a newline")}, "id: 100\nevent: metadata\nretry: 50000\ndata: some data\ndata: and a newline\n\n", }, } @@ -54,14 +54,14 @@ func TestEventEncode(t *testing.T) { } func BenchmarkEventEncodeSimple(b *testing.B) { - e := Event{Name: EventMetadata, Data: []byte("some data\nand a newline")} + e := Event{Name: "metadata", Data: []byte("some data\nand a newline")} for i := 0; i < b.N; i++ { e.Encode() } } func BenchmarkEventEncodeEverything(b *testing.B) { - e := Event{ID: []byte("100"), Name: EventMetadata, Retry: time.Second * 50, Data: []byte("some data\nand a newline")} + e := Event{ID: []byte("100"), Name: "metadata", Retry: time.Second * 50, Data: []byte("some data\nand a newline")} for i := 0; i < b.N; i++ { e.Encode() } diff --git a/website/api/v1/router.go b/website/api/v1/router.go index e1a4ca47..4d3c9f58 100644 --- a/website/api/v1/router.go +++ b/website/api/v1/router.go @@ -16,6 +16,7 @@ func NewAPI(ctx context.Context, cfg config.Config) (*API, error) { Context: ctx, Config: cfg, sse: NewStream(), + manager: cfg.Conf().Manager.Client(), } go func() { @@ -58,6 +59,7 @@ type API struct { Context context.Context Config config.Config sse *Stream + manager radio.ManagerService } func (a *API) Router() chi.Router { diff --git a/website/api/v1/sse.go b/website/api/v1/sse.go index 78c58511..e9e29c6e 100644 --- a/website/api/v1/sse.go +++ b/website/api/v1/sse.go @@ -1,12 +1,56 @@ package v1 import ( + "context" "log" "maps" "net/http" + "strconv" "sync" + "time" + + radio "github.com/R-a-dio/valkyrie" + "github.com/R-a-dio/valkyrie/templates" + "github.com/R-a-dio/valkyrie/util/sse" + "github.com/R-a-dio/valkyrie/website/middleware" ) +func prepareStream[T any](ctx context.Context, fn func(context.Context) (T, error)) T { + for { + s, err := fn(ctx) + if err == nil { + return s + } + time.Sleep(time.Second * 3) + } +} + +func (a *API) runSSE(ctx context.Context) error { + // prepare our eventstreams from the manager + go a.runSongUpdates(ctx) + + return nil +} + +func (a *API) runSongUpdates(ctx context.Context) error { + song_stream := prepareStream(ctx, a.manager.CurrentSong) + + for { + us, err := song_stream.Next() + if err != nil { + break + } + + if us == nil { + continue + } + + a.sse.SendEvent(EventMetadata, []byte(us.Metadata)) + } + + return nil +} + const ( SUBSCRIBE = "subscribe" SEND = "send" @@ -24,23 +68,26 @@ const ( EventThread = "thread" ) -type EventName string +type EventName = string type Stream struct { + // manager goroutine channel reqs chan request - subs []chan message // mu guards last mu *sync.RWMutex last map[EventName]message + // shutdown indicator shutdownCh chan struct{} + + // templates for the site, used in theme support + site *templates.Site } func NewStream() *Stream { s := &Stream{ reqs: make(chan request), - subs: make([]chan message, 0, 128), mu: new(sync.RWMutex), last: make(map[EventName]message), shutdownCh: make(chan struct{}), @@ -54,6 +101,8 @@ func NewStream() *Stream { func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { controller := http.NewResponseController(w) + themeIdx := s.themeIndex(middleware.GetTheme(r.Context())) + log.Println("sse: subscribing") ch := s.sub() defer func() { @@ -62,6 +111,10 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") + // send a sync timestamp + now := strconv.FormatInt(time.Now().UnixMilli(), 10) + w.Write(sse.Event{Name: "time", Data: []byte(now)}.Encode()) + // send events that have already happened, one for each event so that // we're certain the page is current log.Println("sse: cloning initial") @@ -70,16 +123,17 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mu.RUnlock() for _, m := range init { - log.Println("sending initial event:", string(m)) - if _, err := w.Write(m); err != nil { + log.Println("sending initial event:", string(m.data[themeIdx])) + if _, err := w.Write(m.data[themeIdx]); err != nil { return } } controller.Flush() + // start the actual new-event loop log.Println("sse: starting loop") for m := range ch { - if _, err := w.Write(m); err != nil { + if _, err := w.Write(m.data[themeIdx]); err != nil { return } controller.Flush() @@ -88,7 +142,7 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { // SendEvent sends an SSE event with the data given. func (s *Stream) SendEvent(event EventName, data []byte) { - m := newMessage(event, data) + m := s.NewMessage(event, data) select { case s.reqs <- request{cmd: SEND, m: m, e: event}: @@ -97,16 +151,18 @@ func (s *Stream) SendEvent(event EventName, data []byte) { } func (s *Stream) run() { + subs := make([]chan message, 0, 128) + for req := range s.reqs { switch req.cmd { case SUBSCRIBE: - s.subs = append(s.subs, req.ch) + subs = append(subs, req.ch) case LEAVE: - for i, ch := range s.subs { + for i, ch := range subs { if ch == req.ch { - last := len(s.subs) - 1 - s.subs[i] = s.subs[last] - s.subs = s.subs[:last] + last := len(subs) - 1 + subs[i] = subs[last] + subs = subs[:last] close(ch) break } @@ -116,7 +172,7 @@ func (s *Stream) run() { s.last[req.e] = req.m s.mu.Unlock() - for _, ch := range s.subs { + for _, ch := range subs { select { case ch <- req.m: default: @@ -124,7 +180,7 @@ func (s *Stream) run() { } case SHUTDOWN: close(s.shutdownCh) - for _, ch := range s.subs { + for _, ch := range subs { close(ch) } return @@ -157,6 +213,20 @@ func (s *Stream) Shutdown() { } } +func (s *Stream) themeIndex(theme string) int { + return 0 +} + +func (s *Stream) NewMessage(event EventName, data any) message { + switch data.(type) { + case radio.SongUpdate: + return message{} + } + + return message{} +} + +// request send over the management channel type request struct { cmd string // required ch chan message // SUB/LEAVE only @@ -164,8 +234,10 @@ type request struct { e EventName // SEND only } -func newMessage(event EventName, data []byte) message { - return message(sse.Event{Name: event, Data: data}.Encode()) +// message encapsulates an SSE event +type message struct { + // event name used in Stream.last + event EventName + // data is a slice of sse-encoded-event; one for each theme + data [][]byte } - -type message []byte