From 27d578959f3f4b7d0f94ee1c46e243fa28276649 Mon Sep 17 00:00:00 2001 From: Wessie Date: Mon, 13 May 2024 20:34:24 +0100 Subject: [PATCH] manager: refactor manager code to be less bad This moves streamer connecting logic out of the manager and into the streamer (in a later commit). Song updates now hold the mutex for longer such that we get a consistent ordering if two calls are in flight at the same time. --- manager/api.go | 125 +++++++++++++++++++++++------------------------- manager/main.go | 121 +++++++++++----------------------------------- radio.go | 16 ++----- 3 files changed, 93 insertions(+), 169 deletions(-) diff --git a/manager/api.go b/manager/api.go index 9368a1d9..710d7b5e 100644 --- a/manager/api.go +++ b/manager/api.go @@ -43,7 +43,7 @@ func (m *Manager) CurrentStatus(ctx context.Context) (eventstream.Stream[radio.S // Status returns the current status of the radio func (m *Manager) Status(ctx context.Context) (*radio.Status, error) { m.mu.Lock() - status := m.status.Copy() + status := m.status m.mu.Unlock() return &status, nil } @@ -54,17 +54,16 @@ func (m *Manager) UpdateUser(ctx context.Context, u *radio.User) error { ctx, span := otel.Tracer("").Start(ctx, string(op)) defer span.End() - defer m.updateStreamStatus(true) m.userStream.Send(u) - m.mu.Lock() - if u != nil { + m.mu.Lock() m.status.StreamerName = u.DJ.Name m.status.User = *u + go m.updateStreamStatus(true, m.status) + m.mu.Unlock() } - m.mu.Unlock() if u != nil { m.logger.Info().Str("username", u.Username).Msg("updating stream user") } else { @@ -90,32 +89,15 @@ func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) erro return nil } - // check if a robot is streaming - // TODO: don't hardcode this - //isRobot := m.status.User.Username == "AFK" - isRobot := true - // check if we're on a fallback stream if info.IsFallback { m.logger.Info().Str("fallback", new.Metadata).Msg("fallback engaged") - // if we have a robot user we want to start the automated streamer, but only if - // there isn't already a timer running - if isRobot { - // TODO: don't hardcode this - timeout := time.Second * 15 - m.tryStartStreamer(timeout) - } m.status.SongInfo.IsFallback = info.IsFallback m.mu.Unlock() return nil } - // if we're not on a fallback we want to stop the timer for the automated streamer - m.stopStartStreamer() - m.mu.Unlock() - - // otherwise continue like it's a new song - defer m.updateStreamStatus(true) + // otherwise it's a legit song change ss, tx, err := m.Storage.SongTx(ctx, nil) if err != nil { return errors.E(op, err) @@ -146,81 +128,92 @@ func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) erro // set end to start if we got passed a zero time info.End = info.Start } - if song.Length > 0 { + if song.Length > 0 && info.End.Equal(info.Start) { // add the song length if we have one info.End = info.End.Add(song.Length) } - var prev radio.Status - var prevInfo radio.SongInfo - var listenerCountDiff *radio.Listeners + // store copies of the information we need later + prevStatus := m.status + songListenerDiff := m.songStartListenerCount - // critical section to swap our new song with the previous one - m.mu.Lock() + // now update the fields we should update + m.status.Song = *song + m.status.SongInfo = info + m.songStartListenerCount = m.status.Listeners + go m.updateStreamStatus(true, m.status) - prev, m.status.Song = m.status, *song - prevInfo, m.status.SongInfo = m.status.SongInfo, info + // calculate the listener diff between start of song and end of song + songListenerDiff -= m.status.Listeners - // record listener count and calculate the difference between start/end of song - currentListenerCount := m.status.Listeners - // update and retrieve listener count of start of song - var startListenerCount radio.Listeners - startListenerCount, m.songStartListenerCount = m.songStartListenerCount, currentListenerCount + m.logger.Info().Str("metadata", song.Metadata).Dur("song_length", song.Length).Msg("updating stream song") + // send an event out + m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info}) m.mu.Unlock() - // only calculate a diff if we have more than 10 listeners - if currentListenerCount > 10 && startListenerCount > 10 { - diff := currentListenerCount - startListenerCount - listenerCountDiff = &diff + // finish updating extra fields for the previous status + err = m.finishSongUpdate(ctx, tx, prevStatus, &songListenerDiff) + if err != nil { + return errors.E(op, err) } - m.logger.Info().Str("metadata", song.Metadata).Dur("song_length", song.Length).Msg("updating stream song") + if err = tx.Commit(); err != nil { + return errors.E(op, errors.TransactionCommit, err, prevStatus) + } + return nil +} - // send an event out - m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info}) +func (m *Manager) finishSongUpdate(ctx context.Context, tx radio.StorageTx, status radio.Status, ldiff *radio.Listeners) error { + const op errors.Op = "manager/Manager.finishSongUpdate" - // ============================================= - // finish up database work for the previous song - // - // after this point, any reference to the `song` variable is an error, so we - // make it nil so it will panic if done by mistake - song = nil - if prev.Song.ID == 0 { // protect against a zero'd song + if status.Song.ID == 0 { + // no song to update return nil } + if tx == nil { + // no transaction was passed to us, we require one + panic("no tx given to finishSongUpdate") + } + + // check if we want to skip inserting a listener diff; this is mostly here + // to avoid fallback jumps to register as 0s + if ldiff != nil && (status.Listeners < 10 || status.Listeners+*ldiff < 10) { + ldiff = nil + } - // insert a played entry - err = ss.AddPlay(prev.Song, prev.User, listenerCountDiff) + ss, _, err := m.Storage.SongTx(ctx, tx) if err != nil { return errors.E(op, err) } - // update lastplayed if the streamer is a robot and the song has a track - if prev.Song.HasTrack() && isRobot { + // insert an entry that this song was played + err = ss.AddPlay(status.Song, status.User, ldiff) + if err != nil { + return errors.E(op, err) + } + + // if we have the song in the database, also update that + if status.Song.HasTrack() { ts, _, err := m.Storage.TrackTx(ctx, tx) if err != nil { return errors.E(op, err) } - err = ts.UpdateLastPlayed(prev.Song.TrackID) + err = ts.UpdateLastPlayed(status.Song.TrackID) if err != nil { - return errors.E(op, err, prev) + return errors.E(op, err, status) } } - // update song length only if it didn't already have one - if prev.Song.Length == 0 { - err = ss.UpdateLength(prev.Song, time.Since(prevInfo.Start)) + // and the song length if it was still unknown + if status.Song.Length == 0 { + err = ss.UpdateLength(status.Song, time.Since(status.SongInfo.Start)) if err != nil { - return errors.E(op, err, prev) + return errors.E(op, err, status) } } - err = tx.Commit() - if err != nil { - return errors.E(op, errors.TransactionCommit, err, prev) - } return nil } @@ -230,11 +223,11 @@ func (m *Manager) UpdateThread(ctx context.Context, thread radio.Thread) error { ctx, span := otel.Tracer("").Start(ctx, string(op)) defer span.End() - defer m.updateStreamStatus(true) m.threadStream.Send(thread) m.mu.Lock() m.status.Thread = thread + go m.updateStreamStatus(true, m.status) m.mu.Unlock() return nil } @@ -245,11 +238,11 @@ func (m *Manager) UpdateListeners(ctx context.Context, listeners radio.Listeners ctx, span := otel.Tracer("").Start(ctx, string(op)) defer span.End() - defer m.updateStreamStatus(false) m.listenerStream.Send(listeners) m.mu.Lock() m.status.Listeners = listeners + go m.updateStreamStatus(false, m.status) m.mu.Unlock() return nil } diff --git a/manager/main.go b/manager/main.go index 9a200a2b..55f2066d 100644 --- a/manager/main.go +++ b/manager/main.go @@ -77,7 +77,6 @@ func NewManager(ctx context.Context, cfg config.Config) (*Manager, error) { m.listenerStream = eventstream.NewEventStream(radio.Listeners(old.Listeners)) m.statusStream = eventstream.NewEventStream(*old) - m.client.streamer = cfg.Streamer return &m, nil } @@ -88,14 +87,9 @@ type Manager struct { Storage radio.StorageService - // Other components - client struct { - streamer radio.StreamerService - } // mu protects the fields below and their contents - mu sync.Mutex - status radio.Status - autoStreamerTimer *time.Timer + mu sync.Mutex + status radio.Status // listener count at the start of a song songStartListenerCount radio.Listeners @@ -109,45 +103,37 @@ type Manager struct { // updateStreamStatus is a legacy layer to keep supporting streamstatus table usage // in the website. -func (m *Manager) updateStreamStatus(send bool) { - go func() { - m.mu.Lock() - status := m.status.Copy() - m.mu.Unlock() - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - defer cancel() - - ss := m.Storage.Status(ctx) - - // do some minor adjustments so that we can safely pass the status object - // straight to the Exec - if !status.Song.HasTrack() { - status.Song.DatabaseTrack = &radio.DatabaseTrack{} - } - // streamstatus can be empty and we set a start time of now if it's zero - if status.SongInfo.Start.IsZero() { - status.SongInfo.Start = time.Now() - } - // streamstatus expects an end equal to start if it's unknown - if status.SongInfo.End.IsZero() { - status.SongInfo.End = status.SongInfo.Start - } +func (m *Manager) updateStreamStatus(send bool, status radio.Status) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + + // do some minor adjustments so that we can safely pass the status object + // straight to the Exec + if !status.Song.HasTrack() { + status.Song.DatabaseTrack = &radio.DatabaseTrack{} + } + // streamstatus can be empty and we set a start time of now if it's zero + if status.SongInfo.Start.IsZero() { + status.SongInfo.Start = time.Now() + } + // streamstatus expects an end equal to start if it's unknown + if status.SongInfo.End.IsZero() { + status.SongInfo.End = status.SongInfo.Start + } - if send { - m.statusStream.Send(status) - } + if send { + m.statusStream.Send(status) + } - err := ss.Store(status) - if err != nil { - m.logger.Error().Err(err).Msg("update stream status") - return - } - }() + err := m.Storage.Status(ctx).Store(status) + if err != nil { + m.logger.Error().Err(err).Msg("update stream status") + return + } } -// loadStreamStatus is to load the legacy streamstatus table, we should only do this -// at startup +// loadStreamStatus is to load the legacy streamstatus table, we should only +// do this at startup func (m *Manager) loadStreamStatus(ctx context.Context) (*radio.Status, error) { status, err := m.Storage.Status(ctx).Load() if err != nil { @@ -174,52 +160,3 @@ func (m *Manager) loadStreamStatus(ctx context.Context) (*radio.Status, error) { return status, nil } - -// tryStartStreamer tries to start the streamer after waiting the timeout period given -// -// tryStartStreamer needs to be called with m.mu held -func (m *Manager) tryStartStreamer(timeout time.Duration) { - if m.autoStreamerTimer != nil { - return - } - - m.logger.Info().Dur("timeout", timeout).Msg("trying to start streamer") - m.autoStreamerTimer = time.AfterFunc(timeout, func() { - // we lock here to lower the chance of a race between UpdateUser and this - // timer firing - m.mu.Lock() - defer m.mu.Unlock() - if m.autoStreamerTimer == nil { - // this means we got cancelled before we could run, but a race occurred - // between the call to Stop and this function, and we won that race. We - // don't want that to happen so cancel the starting - return - } - // reset ourselves - m.autoStreamerTimer = nil - - err := m.client.streamer.Start(context.Background()) - if err != nil { - m.logger.Error().Err(err).Msg("failed to start streamer") - // if we failed to start, try again with atleast 10 seconds timeout - if timeout < time.Second*10 { - timeout = time.Second * 10 - } - m.tryStartStreamer(timeout) - return - } - }) -} - -// stopStartStreamer stops the timer created by tryStartStreamer and sets the timer to -// nil again. -// -// stopStartStreamer needs to be called with m.mu held -func (m *Manager) stopStartStreamer() { - if m.autoStreamerTimer == nil { - return - } - - m.autoStreamerTimer.Stop() - m.autoStreamerTimer = nil -} diff --git a/radio.go b/radio.go index 055141d1..59d44c91 100644 --- a/radio.go +++ b/radio.go @@ -41,6 +41,11 @@ func CalculateRequestDelay(requestCount int) time.Duration { return time.Duration(time.Duration(dur/2) * time.Second) } +// IsRobot indicates if the user has the Robot flag +func IsRobot(user User) bool { + return user.UserPermissions.HasExplicit(PermRobot) +} + // CalculateCooldown sees if the cooldown given has passed since `last` and returns // the remaining time if any and a bool indicating if it has passed since then or // not. It always returns true if `last` is zero. @@ -87,17 +92,6 @@ func (s *Status) IsZero() bool { return ok } -// Copy makes a deep-copy of the status object -func (s Status) Copy() Status { - c := s - if s.Song.HasTrack() { - track := *s.Song.DatabaseTrack - c.Song.DatabaseTrack = &track - } - - return s -} - // UserID is an identifier corresponding to an user type UserID int32