diff --git a/go.mod b/go.mod
index 6024ba14..f2e0b50d 100644
--- a/go.mod
+++ b/go.mod
@@ -16,14 +16,15 @@ require (
github.com/lrstanley/girc v0.0.0-20230911164840-f47717952bf9
github.com/olivere/elastic/v7 v7.0.32
github.com/tcolgate/mp3 v0.0.0-20170426193717-e79c5a46d300
- golang.org/x/crypto v0.17.0
+ golang.org/x/crypto v0.18.0
google.golang.org/grpc v1.60.1
)
require (
github.com/golang/protobuf v1.5.3 // indirect
- golang.org/x/net v0.19.0 // indirect
- golang.org/x/sys v0.15.0 // indirect
+ golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
+ golang.org/x/net v0.20.0 // indirect
+ golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
)
@@ -36,6 +37,6 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
- golang.org/x/tools v0.16.0 // indirect
+ golang.org/x/tools v0.17.0 // indirect
google.golang.org/protobuf v1.32.0
)
diff --git a/go.sum b/go.sum
index 29c3d336..db7ac2d9 100644
--- a/go.sum
+++ b/go.sum
@@ -85,16 +85,26 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
+golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
+golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
+golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA=
+golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
+golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
+golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
+golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
+golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc=
+golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE=
diff --git a/templates/default/home.tmpl b/templates/default/home.tmpl
index 796deefa..5d1be840 100644
--- a/templates/default/home.tmpl
+++ b/templates/default/home.tmpl
@@ -21,19 +21,7 @@
-
-
{{.Status.Song.Metadata}}
- {{if .Status.Song.DatabaseTrack}}{{.Status.Song.Tags}}{{end}}
-
-
-
-
-
Listeners: {{.Status.Listeners}}
-
-
-
+ {{template "nowplaying" .Status}}
-
-
Last Played
-
-
- {{range $song := .LastPlayed}}
- -
- {{Until $song.LastPlayed | PrettyDuration}}
- {{$song.Metadata}}
-
- {{end}}
-
-
-
-
-
Queue
-
-
- {{range $song := .Queue}}
- -
- {{$song.Metadata}}
- {{Until $song.ExpectedStartTime | PrettyDuration}}
-
- {{end}}
-
-
-
+ {{template "lastplayed" .LastPlayed}}
+ {{template "queue" .Queue}}
Home {{printjson .}}
+{{end}}
+{{define "nowplaying"}}
+
+
{{.Song.Metadata}}
+ {{if .Song.DatabaseTrack}}{{.Song.Tags}}{{end}}
+
+
+
+
+
Listeners: {{.Listeners}}
+
+
+
+{{end}}
+{{define "lastplayed"}}
+
+
Last Played
+
+
+ {{range $song := .}}
+ -
+
+ {{$song.Metadata}}
+
+ {{end}}
+
+
+
+{{end}}
+{{define "queue"}}
+
+
Queue
+
+
+ {{range $song := .}}
+ -
+ {{$song.Metadata}}
+
+
+ {{end}}
+
+
+
{{end}}
\ No newline at end of file
diff --git a/templates/executor.go b/templates/executor.go
index c5d16a1c..28f665c2 100644
--- a/templates/executor.go
+++ b/templates/executor.go
@@ -3,20 +3,21 @@ package templates
import (
"bytes"
"io"
+ "slices"
"github.com/R-a-dio/valkyrie/errors"
"github.com/R-a-dio/valkyrie/util/pool"
)
+var bufferPool = pool.NewResetPool(func() *bytes.Buffer { return new(bytes.Buffer) })
+
type Executor struct {
site *Site
- pool *pool.ResetPool[*bytes.Buffer]
}
func NewExecutor(site *Site) *Executor {
return &Executor{
site: site,
- pool: pool.NewResetPool(func() *bytes.Buffer { return new(bytes.Buffer) }),
}
}
@@ -40,6 +41,8 @@ func (e *Executor) ExecutePartial(theme, page string, output io.Writer, input an
return nil
}
+// ExecuteTemplate selects a theme, page and template and feeds it the input given and writing the template output
+// to the output writer. Output is buffered until template execution is done before writing to output.
func (e *Executor) ExecuteTemplate(theme, page string, template string, output io.Writer, input any) error {
const op errors.Op = "templates/Executor.ExecuteTemplate"
@@ -48,8 +51,8 @@ func (e *Executor) ExecuteTemplate(theme, page string, template string, output i
return errors.E(op, err)
}
- b := e.pool.Get()
- defer e.pool.Put(b)
+ b := bufferPool.Get()
+ defer bufferPool.Put(b)
err = tmpl.ExecuteTemplate(b, template, input)
if err != nil {
@@ -62,3 +65,29 @@ func (e *Executor) ExecuteTemplate(theme, page string, template string, output i
}
return nil
}
+
+// ExecuteTemplateAll executes the template given feeding the input given for all known themes
+func (e *Executor) ExecuteTemplateAll(template string, input any) (map[string][]byte, error) {
+ const op errors.Op = "templates/Executor.ExecuteTemplateAll"
+
+ var out = make(map[string][]byte)
+
+ b := bufferPool.Get()
+ defer bufferPool.Put(b)
+
+ for _, theme := range e.site.ThemeNames() {
+ tmpl, err := e.site.Template(theme, "home")
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ err = tmpl.ExecuteTemplate(b, template, input)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ out[theme] = slices.Clone(b.Bytes())
+ b.Reset()
+ }
+ return out, nil
+}
diff --git a/templates/loader.go b/templates/loader.go
index a0ba1f99..fd68e419 100644
--- a/templates/loader.go
+++ b/templates/loader.go
@@ -12,9 +12,11 @@ import (
"path/filepath"
"slices"
"strings"
+ "sync"
"text/tabwriter"
"github.com/R-a-dio/valkyrie/errors"
+ "golang.org/x/exp/maps"
)
const (
@@ -28,8 +30,9 @@ const (
// Site is an overarching struct containing all the themes of the website.
type Site struct {
- loader Loader
+ fs fs.FS
+ mu sync.RWMutex
themes Themes
cache map[string]*template.Template
}
@@ -37,7 +40,10 @@ type Site struct {
func (s *Site) Reload() error {
const op errors.Op = "templates/Reload"
- themes, err := s.loader.Load()
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ themes, err := LoadThemes(s.fs)
if err != nil {
return errors.E(op, err)
}
@@ -53,6 +59,12 @@ func (s *Site) Executor() *Executor {
return NewExecutor(s)
}
+func (s *Site) ThemeNames() []string {
+ keys := maps.Keys(s.themes)
+ slices.Sort(keys)
+ return keys
+}
+
// Template returns a Template associated with the theme and page name given.
//
// If theme does not exist it uses the default-theme
@@ -148,12 +160,12 @@ func FromFS(fsys fs.FS) (*Site, error) {
var err error
tmpl := Site{
- loader: NewLoader(fsys),
+ fs: fsys,
//bufferPool: NewPool(func() *bytes.Buffer { return new(bytes.Buffer) }),
cache: make(map[string]*template.Template),
}
- tmpl.themes, err = tmpl.loader.Load()
+ tmpl.themes, err = LoadThemes(fsys)
if err != nil {
return nil, errors.E(op, err)
}
@@ -161,30 +173,11 @@ func FromFS(fsys fs.FS) (*Site, error) {
return &tmpl, nil
}
-func NewLoader(fsys fs.FS) Loader {
- return Loader{fs: fsys}
-}
-
-type Loader struct {
- // fs is the filesystem we're loading files from
- fs fs.FS
-
- // baseTemplates contain the files at the base of the directory hierarchy and are included
- // in all bundles
- baseTemplates []string
- // defaultTheme is a mapping of page-name to default-template-bundle for easy backfilling of
- // undefined pages in themes
- defaultTheme map[string]*TemplateBundle
- // defaultPartials contain the files in the DEFAULT_DIR/partials directory and are included
- // in all themes as partials
- defaultPartials []string
-}
-
// TemplateBundle contains all the filenames required to construct a template instance
// for the page
type TemplateBundle struct {
- // loader used to actually load the relative-filenames below
- loader *Loader
+ // fs to load the relative-filenames below
+ fs fs.FS
// the following fields contain all the filenames of the templates we're parsing
// into a html/template.Template. They're listed in load-order, last one wins.
base []string
@@ -192,8 +185,6 @@ type TemplateBundle struct {
partials []string
defaultPage string
page string
-
- cache *template.Template
}
// Files returns all the files in this bundle sorted in load-order
@@ -215,7 +206,7 @@ func (tb *TemplateBundle) Files() []string {
func (tb *TemplateBundle) Template() (*template.Template, error) {
const op errors.Op = "templates/TemplateBundle.Template"
- tmpl, err := createRoot().ParseFS(tb.loader.fs, tb.Files()...)
+ tmpl, err := createRoot().ParseFS(tb.fs, tb.Files()...)
if err != nil {
return nil, errors.E(op, errors.TemplateParseError, err)
}
@@ -247,48 +238,55 @@ func (tb ThemeBundle) Page(name string) (*TemplateBundle, error) {
return tlb, nil
}
-func (l *Loader) Load() (Themes, error) {
- const op errors.Op = "templates/Loader.Load"
+type loadState struct {
+ fs fs.FS
+
+ baseTemplates []string
+ defaultPartials []string
+ defaultBundle map[string]*TemplateBundle
+}
+
+func LoadThemes(fsys fs.FS) (Themes, error) {
+ const op errors.Op = "templates/LoadThemes"
- bt, err := readDirFilterString(l.fs, ".", isTemplate)
+ var state loadState
+ var err error
+
+ state.baseTemplates, err = readDirFilterString(fsys, ".", isTemplate)
if err != nil {
return nil, errors.E(op, err)
}
- l.baseTemplates = bt
// find our default directory
- defaultBundle, err := l.loadSubDir(DEFAULT_DIR)
+ state.defaultBundle, err = state.loadSubDir(DEFAULT_DIR)
if err != nil {
return nil, errors.E(op, err)
}
// sanity check that we have atleast 1 bundle
- if len(defaultBundle) == 0 {
+ if len(state.defaultBundle) == 0 {
return nil, errors.E(op, "default bundle empty")
}
- // grab the partials from the first bundle
- for _, v := range defaultBundle {
- l.defaultPartials = v.partials
+ // grab the partials from any template bundle
+ for _, v := range state.defaultBundle {
+ state.defaultPartials = v.partials
break
}
- // plant the defaults in the loader so the other themes can use them
- l.defaultTheme = defaultBundle
-
// read the rest of the directories
- subdirs, err := readDirFilterString(l.fs, ".", func(e fs.DirEntry) bool { return e.IsDir() })
+ subdirs, err := readDirFilterString(fsys, ".", func(e fs.DirEntry) bool { return e.IsDir() })
if err != nil {
return nil, errors.E(op, err)
}
themes := Themes{
- DEFAULT_DIR: ThemeBundle{DEFAULT_DIR, defaultBundle},
+ DEFAULT_DIR: ThemeBundle{DEFAULT_DIR, state.defaultBundle},
}
for _, subdir := range subdirs {
if subdir == DEFAULT_DIR { // skip the default dir since we already loaded it earlier
continue
}
- bundles, err := l.loadSubDir(subdir)
+ bundles, err := state.loadSubDir(subdir)
if err != nil {
return nil, errors.E(op, err)
}
@@ -312,19 +310,19 @@ func noExt(s string) string {
// it looks for `*.tmpl` files in this subdirectory and in a `partials/` subdirectory
// if one exists. Returns a map of `filename:bundle` where the bundle is a TemplateBundle
// that contains all the filenames required to construct the page named after the filename.
-func (l *Loader) loadSubDir(dir string) (map[string]*TemplateBundle, error) {
- const op errors.Op = "templates/Loader.loadSubDir"
+func (ls loadState) loadSubDir(dir string) (map[string]*TemplateBundle, error) {
+ const op errors.Op = "templates/loadState.loadSubDir"
var bundle = TemplateBundle{
- loader: l,
- base: l.baseTemplates,
- defaultPartials: l.defaultPartials,
+ fs: ls.fs,
+ base: ls.baseTemplates,
+ defaultPartials: ls.defaultPartials,
}
// read the partials subdirectory
partialDir := path.Join(dir, PARTIAL_DIR)
- entries, err := readDirFilter(l.fs, partialDir, isTemplate)
+ entries, err := readDirFilter(ls.fs, partialDir, isTemplate)
if err != nil && !errors.IsE(err, fs.ErrNotExist) {
return nil, errors.E(op, err)
}
@@ -337,7 +335,7 @@ func (l *Loader) loadSubDir(dir string) (map[string]*TemplateBundle, error) {
bundle.partials = partials
// read the actual directory
- entries, err = readDirFilter(l.fs, dir, isTemplate)
+ entries, err = readDirFilter(ls.fs, dir, isTemplate)
if err != nil {
return nil, errors.E(op, err)
}
@@ -347,7 +345,7 @@ func (l *Loader) loadSubDir(dir string) (map[string]*TemplateBundle, error) {
name := entry.Name()
// create a bundle for each page in this directory
bundle := bundle
- defaultPage := l.defaultTheme[noExt(name)]
+ defaultPage := ls.defaultBundle[noExt(name)]
if defaultPage != nil {
bundle.defaultPage = defaultPage.page
}
@@ -357,16 +355,15 @@ func (l *Loader) loadSubDir(dir string) (map[string]*TemplateBundle, error) {
}
// if there are no defaults to handle, we're done
- if l.defaultTheme == nil {
+ if ls.defaultBundle == nil {
return bundles, nil
}
// otherwise check for missing pages, these are pages defined
// in the default theme but not in this current theme. Copy over
// the default pages if they're missing.
- for name, page := range l.defaultTheme {
- _, ok := bundles[name]
- if ok {
+ for name, page := range ls.defaultBundle {
+ if _, ok := bundles[name]; ok {
continue
}
bundles[name] = page
diff --git a/util/sse/sse_test.go b/util/sse/sse_test.go
index f5265dc7..bab92511 100644
--- a/util/sse/sse_test.go
+++ b/util/sse/sse_test.go
@@ -1,4 +1,4 @@
-package v1
+package sse
import (
"bytes"
@@ -15,15 +15,15 @@ type eventCase struct {
var eventEncodeCases = []eventCase{
{
"simple message",
- Event{Name: EventQueue, Data: []byte("queue information")},
+ Event{Name: "queue", Data: []byte("queue information")},
"event: queue\ndata: queue information\n\n",
}, {
"simple newlines",
- Event{Name: EventThread, Data: []byte("some data\nwith\nnewlines")},
+ Event{Name: "thread", Data: []byte("some data\nwith\nnewlines")},
"event: thread\ndata: some data\ndata: with\ndata: newlines\n\n",
}, {
"double newline",
- Event{Name: EventMetadata, Data: []byte("some data\n\nwith newlines")},
+ Event{Name: "metadata", Data: []byte("some data\n\nwith newlines")},
"event: metadata\ndata: some data\ndata\ndata: with newlines\n\n",
}, {
"encode id",
@@ -35,7 +35,7 @@ var eventEncodeCases = []eventCase{
"retry: 10000\n\n",
}, {
"encode everything",
- Event{ID: []byte("100"), Name: EventMetadata, Retry: time.Second * 50, Data: []byte("some data\nand a newline")},
+ Event{ID: []byte("100"), Name: "metadata", Retry: time.Second * 50, Data: []byte("some data\nand a newline")},
"id: 100\nevent: metadata\nretry: 50000\ndata: some data\ndata: and a newline\n\n",
},
}
@@ -54,14 +54,14 @@ func TestEventEncode(t *testing.T) {
}
func BenchmarkEventEncodeSimple(b *testing.B) {
- e := Event{Name: EventMetadata, Data: []byte("some data\nand a newline")}
+ e := Event{Name: "metadata", Data: []byte("some data\nand a newline")}
for i := 0; i < b.N; i++ {
e.Encode()
}
}
func BenchmarkEventEncodeEverything(b *testing.B) {
- e := Event{ID: []byte("100"), Name: EventMetadata, Retry: time.Second * 50, Data: []byte("some data\nand a newline")}
+ e := Event{ID: []byte("100"), Name: "metadata", Retry: time.Second * 50, Data: []byte("some data\nand a newline")}
for i := 0; i < b.N; i++ {
e.Encode()
}
diff --git a/website/api/v1/router.go b/website/api/v1/router.go
index e1a4ca47..4d3c9f58 100644
--- a/website/api/v1/router.go
+++ b/website/api/v1/router.go
@@ -16,6 +16,7 @@ func NewAPI(ctx context.Context, cfg config.Config) (*API, error) {
Context: ctx,
Config: cfg,
sse: NewStream(),
+ manager: cfg.Conf().Manager.Client(),
}
go func() {
@@ -58,6 +59,7 @@ type API struct {
Context context.Context
Config config.Config
sse *Stream
+ manager radio.ManagerService
}
func (a *API) Router() chi.Router {
diff --git a/website/api/v1/sse.go b/website/api/v1/sse.go
index 78c58511..e9e29c6e 100644
--- a/website/api/v1/sse.go
+++ b/website/api/v1/sse.go
@@ -1,12 +1,56 @@
package v1
import (
+ "context"
"log"
"maps"
"net/http"
+ "strconv"
"sync"
+ "time"
+
+ radio "github.com/R-a-dio/valkyrie"
+ "github.com/R-a-dio/valkyrie/templates"
+ "github.com/R-a-dio/valkyrie/util/sse"
+ "github.com/R-a-dio/valkyrie/website/middleware"
)
+func prepareStream[T any](ctx context.Context, fn func(context.Context) (T, error)) T {
+ for {
+ s, err := fn(ctx)
+ if err == nil {
+ return s
+ }
+ time.Sleep(time.Second * 3)
+ }
+}
+
+func (a *API) runSSE(ctx context.Context) error {
+ // prepare our eventstreams from the manager
+ go a.runSongUpdates(ctx)
+
+ return nil
+}
+
+func (a *API) runSongUpdates(ctx context.Context) error {
+ song_stream := prepareStream(ctx, a.manager.CurrentSong)
+
+ for {
+ us, err := song_stream.Next()
+ if err != nil {
+ break
+ }
+
+ if us == nil {
+ continue
+ }
+
+ a.sse.SendEvent(EventMetadata, []byte(us.Metadata))
+ }
+
+ return nil
+}
+
const (
SUBSCRIBE = "subscribe"
SEND = "send"
@@ -24,23 +68,26 @@ const (
EventThread = "thread"
)
-type EventName string
+type EventName = string
type Stream struct {
+ // manager goroutine channel
reqs chan request
- subs []chan message
// mu guards last
mu *sync.RWMutex
last map[EventName]message
+
// shutdown indicator
shutdownCh chan struct{}
+
+ // templates for the site, used in theme support
+ site *templates.Site
}
func NewStream() *Stream {
s := &Stream{
reqs: make(chan request),
- subs: make([]chan message, 0, 128),
mu: new(sync.RWMutex),
last: make(map[EventName]message),
shutdownCh: make(chan struct{}),
@@ -54,6 +101,8 @@ func NewStream() *Stream {
func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
controller := http.NewResponseController(w)
+ themeIdx := s.themeIndex(middleware.GetTheme(r.Context()))
+
log.Println("sse: subscribing")
ch := s.sub()
defer func() {
@@ -62,6 +111,10 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
+ // send a sync timestamp
+ now := strconv.FormatInt(time.Now().UnixMilli(), 10)
+ w.Write(sse.Event{Name: "time", Data: []byte(now)}.Encode())
+
// send events that have already happened, one for each event so that
// we're certain the page is current
log.Println("sse: cloning initial")
@@ -70,16 +123,17 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mu.RUnlock()
for _, m := range init {
- log.Println("sending initial event:", string(m))
- if _, err := w.Write(m); err != nil {
+ log.Println("sending initial event:", string(m.data[themeIdx]))
+ if _, err := w.Write(m.data[themeIdx]); err != nil {
return
}
}
controller.Flush()
+ // start the actual new-event loop
log.Println("sse: starting loop")
for m := range ch {
- if _, err := w.Write(m); err != nil {
+ if _, err := w.Write(m.data[themeIdx]); err != nil {
return
}
controller.Flush()
@@ -88,7 +142,7 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// SendEvent sends an SSE event with the data given.
func (s *Stream) SendEvent(event EventName, data []byte) {
- m := newMessage(event, data)
+ m := s.NewMessage(event, data)
select {
case s.reqs <- request{cmd: SEND, m: m, e: event}:
@@ -97,16 +151,18 @@ func (s *Stream) SendEvent(event EventName, data []byte) {
}
func (s *Stream) run() {
+ subs := make([]chan message, 0, 128)
+
for req := range s.reqs {
switch req.cmd {
case SUBSCRIBE:
- s.subs = append(s.subs, req.ch)
+ subs = append(subs, req.ch)
case LEAVE:
- for i, ch := range s.subs {
+ for i, ch := range subs {
if ch == req.ch {
- last := len(s.subs) - 1
- s.subs[i] = s.subs[last]
- s.subs = s.subs[:last]
+ last := len(subs) - 1
+ subs[i] = subs[last]
+ subs = subs[:last]
close(ch)
break
}
@@ -116,7 +172,7 @@ func (s *Stream) run() {
s.last[req.e] = req.m
s.mu.Unlock()
- for _, ch := range s.subs {
+ for _, ch := range subs {
select {
case ch <- req.m:
default:
@@ -124,7 +180,7 @@ func (s *Stream) run() {
}
case SHUTDOWN:
close(s.shutdownCh)
- for _, ch := range s.subs {
+ for _, ch := range subs {
close(ch)
}
return
@@ -157,6 +213,20 @@ func (s *Stream) Shutdown() {
}
}
+func (s *Stream) themeIndex(theme string) int {
+ return 0
+}
+
+func (s *Stream) NewMessage(event EventName, data any) message {
+ switch data.(type) {
+ case radio.SongUpdate:
+ return message{}
+ }
+
+ return message{}
+}
+
+// request send over the management channel
type request struct {
cmd string // required
ch chan message // SUB/LEAVE only
@@ -164,8 +234,10 @@ type request struct {
e EventName // SEND only
}
-func newMessage(event EventName, data []byte) message {
- return message(sse.Event{Name: event, Data: data}.Encode())
+// message encapsulates an SSE event
+type message struct {
+ // event name used in Stream.last
+ event EventName
+ // data is a slice of sse-encoded-event; one for each theme
+ data [][]byte
}
-
-type message []byte