Skip to content

Commit

Permalink
manager: refactor the manager to be less messy
Browse files Browse the repository at this point in the history
radio: add DJID check to Status.IsZero

util/eventstream: at Latest() method that returns the latest value

website/api/v1, website: add Shutdown to the v1/API api

storage/mariadb: make StatusStorage.Store less prone to blowing up
if an empty/missing data Status is given as argument

storage/test: add test for the above
  • Loading branch information
Wessie committed Jun 3, 2024
1 parent 107cd8b commit c506adc
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 139 deletions.
260 changes: 144 additions & 116 deletions manager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package manager

import (
"context"
"strings"
"time"

radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/errors"
"github.com/R-a-dio/valkyrie/rpc"
"github.com/R-a-dio/valkyrie/util/eventstream"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -52,76 +52,52 @@ func (m *Manager) Status(ctx context.Context) (*radio.Status, error) {
// UpdateUser sets information about the current streamer
func (m *Manager) UpdateUser(ctx context.Context, u *radio.User) error {
const op errors.Op = "manager/Manager.UpdateUser"
ctx, span := otel.Tracer("").Start(ctx, string(op))
_, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

m.userStream.Send(u)

// only update status if we get a non-nil user, this leaves the status
// data with the last known DJ so that we can display something
if u != nil {
m.mu.Lock()
m.status.StreamerName = u.DJ.Name
m.status.User = *u
go m.updateStreamStatus(true, m.status)
m.mu.Unlock()
}

if u != nil {
m.logger.Info().Str("username", u.Username).Msg("updating stream user")
} else {
m.logger.Info().Str("username", "Fallback").Msg("updating stream user")
m.logger.Info().Str("username", "fallback").Msg("updating stream user")
}
return nil
}

// UpdateSong sets information about the currently playing song
func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) error {
func (m *Manager) UpdateSong(ctx context.Context, su *radio.SongUpdate) error {
const op errors.Op = "manager/Manager.UpdateSong"
ctx, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

new := update
info := update.Info

// trim any whitespace on the edges
new.Metadata = strings.TrimSpace(new.Metadata)
// hydrate the song we got, this will deal with any weird metadata whitespace and
// fills in the hashes if they don't exist
su.Song.Hydrate()

// empty metadata, we ignore
if new.Metadata == "" {
if su.Song.Metadata == "" {
m.logger.Info().Msg("skipping empty metadata")
return nil
}
// first we check if this is the same song as the previous one we received to
// avoid double announcement or drifting start/end timings
m.mu.Lock()
if m.status.Song.Metadata == new.Metadata {
m.mu.Unlock()
return nil
}

// otherwise it's a legit song change
ss, tx, err := m.Storage.SongTx(ctx, nil)
if err != nil {
return errors.E(op, err)
}
defer tx.Rollback()

// we assume that the song we received has very little or no data except for the
// Metadata field. So we try and find more info from that
song, err := ss.FromMetadata(new.Metadata)
// fill in the rest of the song data
ss := m.Storage.Song(ctx)
// songs send to this endpoint only require their metadata to be set, and since
// we can get a hash from that we use that to lookup more information
song, err := ss.FromHash(su.Song.Hash)
if err != nil && !errors.Is(errors.SongUnknown, err) {
return errors.E(op, err)
}

// if we don't have this song in the database create a new entry for it
if song == nil {
song, err = ss.Create(radio.NewSong(new.Metadata))
song, err = ss.Create(su.Song)
if err != nil {
return errors.E(op, err)
}
}

var info = su.Info
// calculate start and end time only if they're zero
if info.Start.IsZero() {
// we assume the song just started if it wasn't set
Expand All @@ -131,73 +107,158 @@ func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) erro
// set end to start if we got passed a zero time
info.End = info.Start
}
if song.Length > 0 && info.End.Equal(info.Start) {
// add the song length if we have one
info.End = info.End.Add(song.Length)
}

// store copies of the information we need later
prevStatus := m.status
songListenerDiff := m.songStartListenerCount

// now update the fields we should update
m.status.Song = *song
m.status.SongInfo = info
m.songStartListenerCount = m.status.Listeners
defer func(status radio.Status) {
go m.updateStreamStatus(true, status)
}(m.status)
// check if end is equal to start, either from what we just did above or from
// the updater having given us both equal
if info.End.Equal(info.Start) {
// if we got a song length from either the updater or the storage we add
// it to the end time as an estimated end
if su.Song.Length > 0 { // updater song
info.End = info.End.Add(su.Song.Length)
}
if song.Length > 0 { // storage song
info.End = info.End.Add(song.Length)
}
}

// calculate the listener diff between start of song and end of song
songListenerDiff -= m.status.Listeners
// now we've filled in any missing information on both 'song' and 'info' so we
// can now check if we are even interested in this thing
if song.EqualTo(m.songStream.Latest().Song) {
// same song as latest, so skip the update
return nil
}

m.logger.Info().Str("metadata", song.Metadata).Dur("song_length", song.Length).Msg("updating stream song")
m.mu.Unlock()
m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info})
return nil
}

// finish updating extra fields for the previous status
err = m.finishSongUpdate(ctx, tx, prevStatus, &songListenerDiff)
if err != nil {
return errors.E(op, err)
}
// UpdateThread sets the current thread information on the front page and chats
func (m *Manager) UpdateThread(ctx context.Context, thread radio.Thread) error {
const op errors.Op = "manager/Manager.UpdateThread"
_, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

if err = tx.Commit(); err != nil {
return errors.E(op, errors.TransactionCommit, err, prevStatus)
}
m.threadStream.Send(thread)
return nil
}

// send an event out
m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info})
// UpdateListeners sets the listener count
func (m *Manager) UpdateListeners(ctx context.Context, listeners radio.Listeners) error {
const op errors.Op = "manager/Manager.UpdateListeners"
_, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

m.listenerStream.Send(listeners)
return nil
}

func (m *Manager) finishSongUpdate(ctx context.Context, tx radio.StorageTx, status radio.Status, ldiff *radio.Listeners) error {
const op errors.Op = "manager/Manager.finishSongUpdate"
// runStatusUpdates is in charge of keeping m.status up-to-date from the other
// data streams.
func (m *Manager) runStatusUpdates(ctx context.Context) {
userCh := m.userStream.Sub()
defer m.userStream.Leave(userCh)
var user *radio.User

threadCh := m.threadStream.Sub()
defer m.threadStream.Leave(threadCh)
var thread string

songCh := m.songStream.Sub()
defer m.songStream.Leave(songCh)
var su *radio.SongUpdate

listenerCh := m.listenerStream.Sub()
defer m.listenerStream.Leave(listenerCh)
var listenerCount radio.Listeners
var songStartListenerCount radio.Listeners

for {
var sendStatus = true
var songUpdate = false

select {
case <-ctx.Done():
return
case user = <-userCh:
case thread = <-threadCh:
case su = <-songCh:
songUpdate = true
case listenerCount = <-listenerCh:
sendStatus = false
}

if status.Song.ID == 0 {
// no song to update
return nil
}
if tx == nil {
// no transaction was passed to us, we require one
panic("no tx given to finishSongUpdate")
m.mu.Lock()
// if we're about to update the song, we need to do some bookkeeping on
// the previous song
if err := m.finishSong(ctx, m.status, songStartListenerCount); err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("failed finishSong")
}

// update user
if user != nil {
m.status.StreamerName = user.DJ.Name
m.status.User = *user
}
// update song
if su != nil {
m.status.Song = su.Song
m.status.SongInfo = su.Info
}
// update thread
m.status.Thread = thread
// update listener count
m.status.Listeners = listenerCount

if sendStatus {
// send a copy to the status stream
m.statusStream.Send(m.status)
}
// and a copy to the persistent storage
m.updateStreamStatus(m.status)
m.mu.Unlock()

// and finally, if we just did a song update we want to record how many
// listeners we have at the start of it
if songUpdate {
songStartListenerCount = listenerCount
}
}
}

func (m *Manager) finishSong(ctx context.Context, status radio.Status, startListenerCount radio.Listeners) error {
const op errors.Op = "manager/Manager.finishSong"

// check if we want to skip inserting a listener diff; this is mostly here
// calculate the listener difference
diff := startListenerCount - status.Listeners
var ldiff = &diff
// check if we want to skip the listener diff; this is mostly here
// to avoid fallback jumps to register as 0s
if ldiff != nil && (status.Listeners < 10 || status.Listeners+*ldiff < 10) {
if status.Listeners < 10 || status.Listeners+diff < 10 {
ldiff = nil
}

ss, _, err := m.Storage.SongTx(ctx, tx)
// start a transaction for if any of the storage calls fail
ss, tx, err := m.Storage.SongTx(ctx, nil)
if err != nil {
return errors.E(op, err)
}
defer tx.Rollback()

// insert an entry that this song was played
// add a play to the song
err = ss.AddPlay(status.Song, status.User, ldiff)
if err != nil {
return errors.E(op, err)
}

// update the song length if it didn't have one yet
if status.Song.Length == 0 {
err = ss.UpdateLength(status.Song, time.Since(status.SongInfo.Start))
if err != nil {
return errors.E(op, err, status)
}
}

// if we have the song in the database, also update that
if status.Song.HasTrack() {
ts, _, err := m.Storage.TrackTx(ctx, tx)
Expand All @@ -211,43 +272,10 @@ func (m *Manager) finishSongUpdate(ctx context.Context, tx radio.StorageTx, stat
}
}

// and the song length if it was still unknown
if status.Song.Length == 0 {
err = ss.UpdateLength(status.Song, time.Since(status.SongInfo.Start))
if err != nil {
return errors.E(op, err, status)
}
// commit the transaction
if err = tx.Commit(); err != nil {
return errors.E(op, err)
}

return nil
}

// UpdateThread sets the current thread information on the front page and chats
func (m *Manager) UpdateThread(ctx context.Context, thread radio.Thread) error {
const op errors.Op = "manager/Manager.UpdateThread"
ctx, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

m.threadStream.Send(thread)

m.mu.Lock()
m.status.Thread = thread
go m.updateStreamStatus(true, m.status)
m.mu.Unlock()
return nil
}

// UpdateListeners sets the listener count
func (m *Manager) UpdateListeners(ctx context.Context, listeners radio.Listeners) error {
const op errors.Op = "manager/Manager.UpdateListeners"
ctx, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

m.listenerStream.Send(listeners)

m.mu.Lock()
m.status.Listeners = listeners
go m.updateStreamStatus(false, m.status)
m.mu.Unlock()
return nil
}
Loading

0 comments on commit c506adc

Please sign in to comment.