Skip to content

Commit

Permalink
manager: refactor manager code to be less bad
Browse files Browse the repository at this point in the history
This moves streamer connecting logic out of the manager and into the
streamer (in a later commit). Song updates now hold the mutex for
longer such that we get a consistent ordering if two calls are in
flight at the same time.
  • Loading branch information
Wessie committed May 13, 2024
1 parent e211686 commit 27d5789
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 169 deletions.
125 changes: 59 additions & 66 deletions manager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (m *Manager) CurrentStatus(ctx context.Context) (eventstream.Stream[radio.S
// Status returns the current status of the radio
func (m *Manager) Status(ctx context.Context) (*radio.Status, error) {
m.mu.Lock()
status := m.status.Copy()
status := m.status
m.mu.Unlock()
return &status, nil
}
Expand All @@ -54,17 +54,16 @@ func (m *Manager) UpdateUser(ctx context.Context, u *radio.User) error {
ctx, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

defer m.updateStreamStatus(true)
m.userStream.Send(u)

m.mu.Lock()

if u != nil {
m.mu.Lock()
m.status.StreamerName = u.DJ.Name
m.status.User = *u
go m.updateStreamStatus(true, m.status)
m.mu.Unlock()
}

m.mu.Unlock()
if u != nil {
m.logger.Info().Str("username", u.Username).Msg("updating stream user")
} else {
Expand All @@ -90,32 +89,15 @@ func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) erro
return nil
}

// check if a robot is streaming
// TODO: don't hardcode this
//isRobot := m.status.User.Username == "AFK"
isRobot := true

// check if we're on a fallback stream
if info.IsFallback {
m.logger.Info().Str("fallback", new.Metadata).Msg("fallback engaged")
// if we have a robot user we want to start the automated streamer, but only if
// there isn't already a timer running
if isRobot {
// TODO: don't hardcode this
timeout := time.Second * 15
m.tryStartStreamer(timeout)
}
m.status.SongInfo.IsFallback = info.IsFallback
m.mu.Unlock()
return nil
}
// if we're not on a fallback we want to stop the timer for the automated streamer
m.stopStartStreamer()
m.mu.Unlock()

// otherwise continue like it's a new song
defer m.updateStreamStatus(true)

// otherwise it's a legit song change
ss, tx, err := m.Storage.SongTx(ctx, nil)
if err != nil {
return errors.E(op, err)
Expand Down Expand Up @@ -146,81 +128,92 @@ func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) erro
// set end to start if we got passed a zero time
info.End = info.Start
}
if song.Length > 0 {
if song.Length > 0 && info.End.Equal(info.Start) {
// add the song length if we have one
info.End = info.End.Add(song.Length)
}

var prev radio.Status
var prevInfo radio.SongInfo
var listenerCountDiff *radio.Listeners
// store copies of the information we need later
prevStatus := m.status
songListenerDiff := m.songStartListenerCount

// critical section to swap our new song with the previous one
m.mu.Lock()
// now update the fields we should update
m.status.Song = *song
m.status.SongInfo = info
m.songStartListenerCount = m.status.Listeners
go m.updateStreamStatus(true, m.status)

prev, m.status.Song = m.status, *song
prevInfo, m.status.SongInfo = m.status.SongInfo, info
// calculate the listener diff between start of song and end of song
songListenerDiff -= m.status.Listeners

// record listener count and calculate the difference between start/end of song
currentListenerCount := m.status.Listeners
// update and retrieve listener count of start of song
var startListenerCount radio.Listeners
startListenerCount, m.songStartListenerCount = m.songStartListenerCount, currentListenerCount
m.logger.Info().Str("metadata", song.Metadata).Dur("song_length", song.Length).Msg("updating stream song")

// send an event out
m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info})
m.mu.Unlock()

// only calculate a diff if we have more than 10 listeners
if currentListenerCount > 10 && startListenerCount > 10 {
diff := currentListenerCount - startListenerCount
listenerCountDiff = &diff
// finish updating extra fields for the previous status
err = m.finishSongUpdate(ctx, tx, prevStatus, &songListenerDiff)
if err != nil {
return errors.E(op, err)
}

m.logger.Info().Str("metadata", song.Metadata).Dur("song_length", song.Length).Msg("updating stream song")
if err = tx.Commit(); err != nil {
return errors.E(op, errors.TransactionCommit, err, prevStatus)
}
return nil
}

// send an event out
m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info})
func (m *Manager) finishSongUpdate(ctx context.Context, tx radio.StorageTx, status radio.Status, ldiff *radio.Listeners) error {
const op errors.Op = "manager/Manager.finishSongUpdate"

// =============================================
// finish up database work for the previous song
//
// after this point, any reference to the `song` variable is an error, so we
// make it nil so it will panic if done by mistake
song = nil
if prev.Song.ID == 0 { // protect against a zero'd song
if status.Song.ID == 0 {
// no song to update
return nil
}
if tx == nil {
// no transaction was passed to us, we require one
panic("no tx given to finishSongUpdate")
}

// check if we want to skip inserting a listener diff; this is mostly here
// to avoid fallback jumps to register as 0s
if ldiff != nil && (status.Listeners < 10 || status.Listeners+*ldiff < 10) {
ldiff = nil
}

// insert a played entry
err = ss.AddPlay(prev.Song, prev.User, listenerCountDiff)
ss, _, err := m.Storage.SongTx(ctx, tx)
if err != nil {
return errors.E(op, err)
}

// update lastplayed if the streamer is a robot and the song has a track
if prev.Song.HasTrack() && isRobot {
// insert an entry that this song was played
err = ss.AddPlay(status.Song, status.User, ldiff)
if err != nil {
return errors.E(op, err)
}

// if we have the song in the database, also update that
if status.Song.HasTrack() {
ts, _, err := m.Storage.TrackTx(ctx, tx)
if err != nil {
return errors.E(op, err)
}

err = ts.UpdateLastPlayed(prev.Song.TrackID)
err = ts.UpdateLastPlayed(status.Song.TrackID)
if err != nil {
return errors.E(op, err, prev)
return errors.E(op, err, status)
}
}

// update song length only if it didn't already have one
if prev.Song.Length == 0 {
err = ss.UpdateLength(prev.Song, time.Since(prevInfo.Start))
// and the song length if it was still unknown
if status.Song.Length == 0 {
err = ss.UpdateLength(status.Song, time.Since(status.SongInfo.Start))
if err != nil {
return errors.E(op, err, prev)
return errors.E(op, err, status)
}
}

err = tx.Commit()
if err != nil {
return errors.E(op, errors.TransactionCommit, err, prev)
}
return nil
}

Expand All @@ -230,11 +223,11 @@ func (m *Manager) UpdateThread(ctx context.Context, thread radio.Thread) error {
ctx, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

defer m.updateStreamStatus(true)
m.threadStream.Send(thread)

m.mu.Lock()
m.status.Thread = thread
go m.updateStreamStatus(true, m.status)
m.mu.Unlock()
return nil
}
Expand All @@ -245,11 +238,11 @@ func (m *Manager) UpdateListeners(ctx context.Context, listeners radio.Listeners
ctx, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

defer m.updateStreamStatus(false)
m.listenerStream.Send(listeners)

m.mu.Lock()
m.status.Listeners = listeners
go m.updateStreamStatus(false, m.status)
m.mu.Unlock()
return nil
}
121 changes: 29 additions & 92 deletions manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func NewManager(ctx context.Context, cfg config.Config) (*Manager, error) {
m.listenerStream = eventstream.NewEventStream(radio.Listeners(old.Listeners))
m.statusStream = eventstream.NewEventStream(*old)

m.client.streamer = cfg.Streamer
return &m, nil
}

Expand All @@ -88,14 +87,9 @@ type Manager struct {

Storage radio.StorageService

// Other components
client struct {
streamer radio.StreamerService
}
// mu protects the fields below and their contents
mu sync.Mutex
status radio.Status
autoStreamerTimer *time.Timer
mu sync.Mutex
status radio.Status
// listener count at the start of a song
songStartListenerCount radio.Listeners

Expand All @@ -109,45 +103,37 @@ type Manager struct {

// updateStreamStatus is a legacy layer to keep supporting streamstatus table usage
// in the website.
func (m *Manager) updateStreamStatus(send bool) {
go func() {
m.mu.Lock()
status := m.status.Copy()
m.mu.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()

ss := m.Storage.Status(ctx)

// do some minor adjustments so that we can safely pass the status object
// straight to the Exec
if !status.Song.HasTrack() {
status.Song.DatabaseTrack = &radio.DatabaseTrack{}
}
// streamstatus can be empty and we set a start time of now if it's zero
if status.SongInfo.Start.IsZero() {
status.SongInfo.Start = time.Now()
}
// streamstatus expects an end equal to start if it's unknown
if status.SongInfo.End.IsZero() {
status.SongInfo.End = status.SongInfo.Start
}
func (m *Manager) updateStreamStatus(send bool, status radio.Status) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()

// do some minor adjustments so that we can safely pass the status object
// straight to the Exec
if !status.Song.HasTrack() {
status.Song.DatabaseTrack = &radio.DatabaseTrack{}
}
// streamstatus can be empty and we set a start time of now if it's zero
if status.SongInfo.Start.IsZero() {
status.SongInfo.Start = time.Now()
}
// streamstatus expects an end equal to start if it's unknown
if status.SongInfo.End.IsZero() {
status.SongInfo.End = status.SongInfo.Start
}

if send {
m.statusStream.Send(status)
}
if send {
m.statusStream.Send(status)
}

err := ss.Store(status)
if err != nil {
m.logger.Error().Err(err).Msg("update stream status")
return
}
}()
err := m.Storage.Status(ctx).Store(status)
if err != nil {
m.logger.Error().Err(err).Msg("update stream status")
return
}
}

// loadStreamStatus is to load the legacy streamstatus table, we should only do this
// at startup
// loadStreamStatus is to load the legacy streamstatus table, we should only
// do this at startup
func (m *Manager) loadStreamStatus(ctx context.Context) (*radio.Status, error) {
status, err := m.Storage.Status(ctx).Load()
if err != nil {
Expand All @@ -174,52 +160,3 @@ func (m *Manager) loadStreamStatus(ctx context.Context) (*radio.Status, error) {

return status, nil
}

// tryStartStreamer tries to start the streamer after waiting the timeout period given
//
// tryStartStreamer needs to be called with m.mu held
func (m *Manager) tryStartStreamer(timeout time.Duration) {
if m.autoStreamerTimer != nil {
return
}

m.logger.Info().Dur("timeout", timeout).Msg("trying to start streamer")
m.autoStreamerTimer = time.AfterFunc(timeout, func() {
// we lock here to lower the chance of a race between UpdateUser and this
// timer firing
m.mu.Lock()
defer m.mu.Unlock()
if m.autoStreamerTimer == nil {
// this means we got cancelled before we could run, but a race occurred
// between the call to Stop and this function, and we won that race. We
// don't want that to happen so cancel the starting
return
}
// reset ourselves
m.autoStreamerTimer = nil

err := m.client.streamer.Start(context.Background())
if err != nil {
m.logger.Error().Err(err).Msg("failed to start streamer")
// if we failed to start, try again with atleast 10 seconds timeout
if timeout < time.Second*10 {
timeout = time.Second * 10
}
m.tryStartStreamer(timeout)
return
}
})
}

// stopStartStreamer stops the timer created by tryStartStreamer and sets the timer to
// nil again.
//
// stopStartStreamer needs to be called with m.mu held
func (m *Manager) stopStartStreamer() {
if m.autoStreamerTimer == nil {
return
}

m.autoStreamerTimer.Stop()
m.autoStreamerTimer = nil
}
Loading

0 comments on commit 27d5789

Please sign in to comment.