diff --git a/manager/api.go b/manager/api.go index da6e100e..d876278c 100644 --- a/manager/api.go +++ b/manager/api.go @@ -2,13 +2,13 @@ package manager import ( "context" - "strings" "time" radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/errors" "github.com/R-a-dio/valkyrie/rpc" "github.com/R-a-dio/valkyrie/util/eventstream" + "github.com/rs/zerolog" "go.opentelemetry.io/otel" "google.golang.org/grpc" ) @@ -52,76 +52,52 @@ func (m *Manager) Status(ctx context.Context) (*radio.Status, error) { // UpdateUser sets information about the current streamer func (m *Manager) UpdateUser(ctx context.Context, u *radio.User) error { const op errors.Op = "manager/Manager.UpdateUser" - ctx, span := otel.Tracer("").Start(ctx, string(op)) + _, span := otel.Tracer("").Start(ctx, string(op)) defer span.End() m.userStream.Send(u) - - // only update status if we get a non-nil user, this leaves the status - // data with the last known DJ so that we can display something - if u != nil { - m.mu.Lock() - m.status.StreamerName = u.DJ.Name - m.status.User = *u - go m.updateStreamStatus(true, m.status) - m.mu.Unlock() - } - if u != nil { m.logger.Info().Str("username", u.Username).Msg("updating stream user") } else { - m.logger.Info().Str("username", "Fallback").Msg("updating stream user") + m.logger.Info().Str("username", "fallback").Msg("updating stream user") } return nil } // UpdateSong sets information about the currently playing song -func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) error { +func (m *Manager) UpdateSong(ctx context.Context, su *radio.SongUpdate) error { const op errors.Op = "manager/Manager.UpdateSong" ctx, span := otel.Tracer("").Start(ctx, string(op)) defer span.End() - new := update - info := update.Info - - // trim any whitespace on the edges - new.Metadata = strings.TrimSpace(new.Metadata) + // hydrate the song we got, this will deal with any weird metadata whitespace and + // fills in the hashes if they don't exist + su.Song.Hydrate() // empty metadata, we ignore - if new.Metadata == "" { + if su.Song.Metadata == "" { m.logger.Info().Msg("skipping empty metadata") return nil } - // first we check if this is the same song as the previous one we received to - // avoid double announcement or drifting start/end timings - m.mu.Lock() - if m.status.Song.Metadata == new.Metadata { - m.mu.Unlock() - return nil - } - - // otherwise it's a legit song change - ss, tx, err := m.Storage.SongTx(ctx, nil) - if err != nil { - return errors.E(op, err) - } - defer tx.Rollback() - // we assume that the song we received has very little or no data except for the - // Metadata field. So we try and find more info from that - song, err := ss.FromMetadata(new.Metadata) + // fill in the rest of the song data + ss := m.Storage.Song(ctx) + // songs send to this endpoint only require their metadata to be set, and since + // we can get a hash from that we use that to lookup more information + song, err := ss.FromHash(su.Song.Hash) if err != nil && !errors.Is(errors.SongUnknown, err) { return errors.E(op, err) } // if we don't have this song in the database create a new entry for it if song == nil { - song, err = ss.Create(radio.NewSong(new.Metadata)) + song, err = ss.Create(su.Song) if err != nil { return errors.E(op, err) } } + var info = su.Info // calculate start and end time only if they're zero if info.Start.IsZero() { // we assume the song just started if it wasn't set @@ -131,73 +107,158 @@ func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) erro // set end to start if we got passed a zero time info.End = info.Start } - if song.Length > 0 && info.End.Equal(info.Start) { - // add the song length if we have one - info.End = info.End.Add(song.Length) - } - - // store copies of the information we need later - prevStatus := m.status - songListenerDiff := m.songStartListenerCount - // now update the fields we should update - m.status.Song = *song - m.status.SongInfo = info - m.songStartListenerCount = m.status.Listeners - defer func(status radio.Status) { - go m.updateStreamStatus(true, status) - }(m.status) + // check if end is equal to start, either from what we just did above or from + // the updater having given us both equal + if info.End.Equal(info.Start) { + // if we got a song length from either the updater or the storage we add + // it to the end time as an estimated end + if su.Song.Length > 0 { // updater song + info.End = info.End.Add(su.Song.Length) + } + if song.Length > 0 { // storage song + info.End = info.End.Add(song.Length) + } + } - // calculate the listener diff between start of song and end of song - songListenerDiff -= m.status.Listeners + // now we've filled in any missing information on both 'song' and 'info' so we + // can now check if we are even interested in this thing + if song.EqualTo(m.songStream.Latest().Song) { + // same song as latest, so skip the update + return nil + } m.logger.Info().Str("metadata", song.Metadata).Dur("song_length", song.Length).Msg("updating stream song") - m.mu.Unlock() + m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info}) + return nil +} - // finish updating extra fields for the previous status - err = m.finishSongUpdate(ctx, tx, prevStatus, &songListenerDiff) - if err != nil { - return errors.E(op, err) - } +// UpdateThread sets the current thread information on the front page and chats +func (m *Manager) UpdateThread(ctx context.Context, thread radio.Thread) error { + const op errors.Op = "manager/Manager.UpdateThread" + _, span := otel.Tracer("").Start(ctx, string(op)) + defer span.End() - if err = tx.Commit(); err != nil { - return errors.E(op, errors.TransactionCommit, err, prevStatus) - } + m.threadStream.Send(thread) + return nil +} - // send an event out - m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info}) +// UpdateListeners sets the listener count +func (m *Manager) UpdateListeners(ctx context.Context, listeners radio.Listeners) error { + const op errors.Op = "manager/Manager.UpdateListeners" + _, span := otel.Tracer("").Start(ctx, string(op)) + defer span.End() + + m.listenerStream.Send(listeners) return nil } -func (m *Manager) finishSongUpdate(ctx context.Context, tx radio.StorageTx, status radio.Status, ldiff *radio.Listeners) error { - const op errors.Op = "manager/Manager.finishSongUpdate" +// runStatusUpdates is in charge of keeping m.status up-to-date from the other +// data streams. +func (m *Manager) runStatusUpdates(ctx context.Context) { + userCh := m.userStream.Sub() + defer m.userStream.Leave(userCh) + var user *radio.User + + threadCh := m.threadStream.Sub() + defer m.threadStream.Leave(threadCh) + var thread string + + songCh := m.songStream.Sub() + defer m.songStream.Leave(songCh) + var su *radio.SongUpdate + + listenerCh := m.listenerStream.Sub() + defer m.listenerStream.Leave(listenerCh) + var listenerCount radio.Listeners + var songStartListenerCount radio.Listeners + + for { + var sendStatus = true + var songUpdate = false + + select { + case <-ctx.Done(): + return + case user = <-userCh: + case thread = <-threadCh: + case su = <-songCh: + songUpdate = true + case listenerCount = <-listenerCh: + sendStatus = false + } - if status.Song.ID == 0 { - // no song to update - return nil - } - if tx == nil { - // no transaction was passed to us, we require one - panic("no tx given to finishSongUpdate") + m.mu.Lock() + // if we're about to update the song, we need to do some bookkeeping on + // the previous song + if err := m.finishSong(ctx, m.status, songStartListenerCount); err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("failed finishSong") + } + + // update user + if user != nil { + m.status.StreamerName = user.DJ.Name + m.status.User = *user + } + // update song + if su != nil { + m.status.Song = su.Song + m.status.SongInfo = su.Info + } + // update thread + m.status.Thread = thread + // update listener count + m.status.Listeners = listenerCount + + if sendStatus { + // send a copy to the status stream + m.statusStream.Send(m.status) + } + // and a copy to the persistent storage + m.updateStreamStatus(m.status) + m.mu.Unlock() + + // and finally, if we just did a song update we want to record how many + // listeners we have at the start of it + if songUpdate { + songStartListenerCount = listenerCount + } } +} + +func (m *Manager) finishSong(ctx context.Context, status radio.Status, startListenerCount radio.Listeners) error { + const op errors.Op = "manager/Manager.finishSong" - // check if we want to skip inserting a listener diff; this is mostly here + // calculate the listener difference + diff := startListenerCount - status.Listeners + var ldiff = &diff + // check if we want to skip the listener diff; this is mostly here // to avoid fallback jumps to register as 0s - if ldiff != nil && (status.Listeners < 10 || status.Listeners+*ldiff < 10) { + if status.Listeners < 10 || status.Listeners+diff < 10 { ldiff = nil } - ss, _, err := m.Storage.SongTx(ctx, tx) + // start a transaction for if any of the storage calls fail + ss, tx, err := m.Storage.SongTx(ctx, nil) if err != nil { return errors.E(op, err) } + defer tx.Rollback() - // insert an entry that this song was played + // add a play to the song err = ss.AddPlay(status.Song, status.User, ldiff) if err != nil { return errors.E(op, err) } + // update the song length if it didn't have one yet + if status.Song.Length == 0 { + err = ss.UpdateLength(status.Song, time.Since(status.SongInfo.Start)) + if err != nil { + return errors.E(op, err, status) + } + } + // if we have the song in the database, also update that if status.Song.HasTrack() { ts, _, err := m.Storage.TrackTx(ctx, tx) @@ -211,43 +272,10 @@ func (m *Manager) finishSongUpdate(ctx context.Context, tx radio.StorageTx, stat } } - // and the song length if it was still unknown - if status.Song.Length == 0 { - err = ss.UpdateLength(status.Song, time.Since(status.SongInfo.Start)) - if err != nil { - return errors.E(op, err, status) - } + // commit the transaction + if err = tx.Commit(); err != nil { + return errors.E(op, err) } return nil } - -// UpdateThread sets the current thread information on the front page and chats -func (m *Manager) UpdateThread(ctx context.Context, thread radio.Thread) error { - const op errors.Op = "manager/Manager.UpdateThread" - ctx, span := otel.Tracer("").Start(ctx, string(op)) - defer span.End() - - m.threadStream.Send(thread) - - m.mu.Lock() - m.status.Thread = thread - go m.updateStreamStatus(true, m.status) - m.mu.Unlock() - return nil -} - -// UpdateListeners sets the listener count -func (m *Manager) UpdateListeners(ctx context.Context, listeners radio.Listeners) error { - const op errors.Op = "manager/Manager.UpdateListeners" - ctx, span := otel.Tracer("").Start(ctx, string(op)) - defer span.End() - - m.listenerStream.Send(listeners) - - m.mu.Lock() - m.status.Listeners = listeners - go m.updateStreamStatus(false, m.status) - m.mu.Unlock() - return nil -} diff --git a/manager/main.go b/manager/main.go index ff186686..e42e16c6 100644 --- a/manager/main.go +++ b/manager/main.go @@ -4,11 +4,13 @@ import ( "context" "net" "sync" + "syscall" "time" radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/config" "github.com/R-a-dio/valkyrie/storage" + "github.com/R-a-dio/valkyrie/util" "github.com/R-a-dio/valkyrie/util/eventstream" "github.com/rs/zerolog" ) @@ -26,6 +28,7 @@ func Execute(ctx context.Context, cfg config.Config) error { if err != nil { return err } + defer srv.Stop() ln, err := net.Listen("tcp", cfg.Conf().Manager.RPCAddr.String()) if err != nil { @@ -40,7 +43,8 @@ func Execute(ctx context.Context, cfg config.Config) error { // wait for our context to be canceled or Serve to error out select { case <-ctx.Done(): - srv.Stop() + return nil + case <-util.Signal(syscall.SIGUSR2): return nil case err = <-errCh: return err @@ -76,7 +80,7 @@ func NewManager(ctx context.Context, cfg config.Config) (*Manager, error) { m.songStream = eventstream.NewEventStream(&radio.SongUpdate{Song: old.Song, Info: old.SongInfo}) m.listenerStream = eventstream.NewEventStream(radio.Listeners(old.Listeners)) m.statusStream = eventstream.NewEventStream(*old) - + go m.runStatusUpdates(ctx) return &m, nil } @@ -90,8 +94,6 @@ type Manager struct { // mu protects the fields below and their contents mu sync.Mutex status radio.Status - // listener count at the start of a song - songStartListenerCount radio.Listeners // streaming support userStream *eventstream.EventStream[*radio.User] @@ -103,28 +105,10 @@ type Manager struct { // updateStreamStatus is a legacy layer to keep supporting streamstatus table usage // in the website. -func (m *Manager) updateStreamStatus(send bool, status radio.Status) { +func (m *Manager) updateStreamStatus(status radio.Status) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() - // do some minor adjustments so that we can safely pass the status object - // straight to the Exec - if !status.Song.HasTrack() { - status.Song.DatabaseTrack = &radio.DatabaseTrack{} - } - // streamstatus can be empty and we set a start time of now if it's zero - if status.SongInfo.Start.IsZero() { - status.SongInfo.Start = time.Now() - } - // streamstatus expects an end equal to start if it's unknown - if status.SongInfo.End.IsZero() { - status.SongInfo.End = status.SongInfo.Start - } - - if send { - m.statusStream.Send(status) - } - err := m.Storage.Status(ctx).Store(status) if err != nil { m.logger.Error().Err(err).Msg("update stream status") diff --git a/radio.go b/radio.go index c50200d2..0f170148 100644 --- a/radio.go +++ b/radio.go @@ -80,6 +80,7 @@ type Status struct { func (s *Status) IsZero() bool { ok := s.User.ID == 0 && + s.User.DJ.ID == 0 && s.Song.ID == 0 && s.SongInfo == (SongInfo{}) && s.StreamerName == "" && diff --git a/storage/mariadb/status.go b/storage/mariadb/status.go index da21df8b..f47fe3ea 100644 --- a/storage/mariadb/status.go +++ b/storage/mariadb/status.go @@ -2,6 +2,7 @@ package mariadb import ( "database/sql" + "time" radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/errors" @@ -18,6 +19,27 @@ func (ss StatusStorage) Store(status radio.Status) error { handle, deferFn := ss.handle.span(op) defer deferFn() + if status.IsZero() { + return errors.E(op, errors.InvalidArgument) + } + + // the named query below will try to access some fields that are beyond + // a pointer type, so make sure those fields actually exist before we + // pass it to the database driver + if !status.Song.HasTrack() { + status.Song.DatabaseTrack = &radio.DatabaseTrack{} + } + + // we also have the info Start/End times that could be zero, if they are + // they would be outside of the supported range of mariadb, so just mock + // them to be the current time + if status.SongInfo.Start.IsZero() { + status.SongInfo.Start = time.Now() + } + if status.SongInfo.End.IsZero() { + status.SongInfo.End = status.SongInfo.Start + } + var query = ` INSERT INTO streamstatus diff --git a/storage/test/status.go b/storage/test/status.go index 20905010..1b8e0bdf 100644 --- a/storage/test/status.go +++ b/storage/test/status.go @@ -5,6 +5,7 @@ import ( "time" radio "github.com/R-a-dio/valkyrie" + "github.com/R-a-dio/valkyrie/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -48,3 +49,11 @@ func (suite *Suite) TestStatusStore(t *testing.T) { assert.Equal(t, in.Thread, out.Thread) }) } + +func (suite *Suite) TestStatusStoreEmpty(t *testing.T) { + ss := suite.Storage(t).Status(suite.ctx) + + // just an empty status + err := ss.Store(radio.Status{}) + Require(t, errors.InvalidArgument, err) +} diff --git a/util/eventstream/eventstream.go b/util/eventstream/eventstream.go index 85c2e472..b12839d8 100644 --- a/util/eventstream/eventstream.go +++ b/util/eventstream/eventstream.go @@ -54,6 +54,10 @@ type EventStream[M any] struct { FallbehindFn func(chan M, M) } +func (es *EventStream[M]) Latest() M { + return *es.last.Load() +} + func (es *EventStream[M]) run() { ticker := time.NewTicker(TIMEOUT) defer ticker.Stop() diff --git a/website/api/v1/router.go b/website/api/v1/router.go index 33f7d2a7..99210de3 100644 --- a/website/api/v1/router.go +++ b/website/api/v1/router.go @@ -64,3 +64,8 @@ func (a *API) Route(r chi.Router) { r.Get("/song", a.GetSong) r.Post("/request", a.PostRequest) } + +func (a *API) Shutdown() error { + a.sse.Shutdown() + return nil +} diff --git a/website/main.go b/website/main.go index 646015d7..d6480a2d 100644 --- a/website/main.go +++ b/website/main.go @@ -162,6 +162,7 @@ func Execute(ctx context.Context, cfg config.Config) error { if err != nil { return errors.E(op, err) } + defer v1.Shutdown() r.Route("/v1", v1.Route) // admin routes