diff --git a/ircbot/api.go b/ircbot/api.go index 2f5d4de8..73cdb55e 100644 --- a/ircbot/api.go +++ b/ircbot/api.go @@ -34,16 +34,21 @@ type announceService struct { config.Config Storage radio.StorageService - bot *Bot - lastAnnounceSong time.Time + bot *Bot + lastAnnounceSongTime time.Time + lastAnnounceSong radio.Song } func (ann *announceService) AnnounceSong(ctx context.Context, status radio.Status) error { const op errors.Op = "irc/announceService.AnnounceSong" // don't do the announcement if the last one was recent enough - if time.Since(ann.lastAnnounceSong) < time.Duration(ann.Conf().IRC.AnnouncePeriod) { - zerolog.Ctx(ctx).Info().Str("metadata", status.Song.Metadata).Msg("skipping announce") + if time.Since(ann.lastAnnounceSongTime) < time.Duration(ann.Conf().IRC.AnnouncePeriod) { + zerolog.Ctx(ctx).Info().Str("metadata", status.Song.Metadata).Msg("skipping announce: announce period") + return nil + } + if ann.lastAnnounceSong.EqualTo(status.Song) { + zerolog.Ctx(ctx).Info().Str("metadata", status.Song.Metadata).Msg("skipping announce: same as last song") return nil } message := "Now starting:{red} '%s' {clear}[%s](%s), %s, %s, {green}LP:{clear} %s" @@ -75,7 +80,8 @@ func (ann *announceService) AnnounceSong(ctx context.Context, status radio.Statu ) ann.bot.c.Cmd.Message(ann.Conf().IRC.MainChannel, message) - ann.lastAnnounceSong = time.Now() + ann.lastAnnounceSong = status.Song + ann.lastAnnounceSongTime = time.Now() // // ======= favorite announcements below ========= diff --git a/ircbot/commands.go b/ircbot/commands.go index 48ab7299..a4b15cd3 100644 --- a/ircbot/commands.go +++ b/ircbot/commands.go @@ -8,6 +8,7 @@ import ( radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/errors" + "github.com/R-a-dio/valkyrie/util" "github.com/lrstanley/girc" "github.com/rs/zerolog" ) @@ -230,7 +231,7 @@ func (e Event) ArgumentTrack(key string) (*radio.Song, error) { func (e Event) CurrentTrack() (*radio.Song, error) { const op errors.Op = "irc/Event.CurrentTrack" - status, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus) + status, err := util.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus) if err != nil { return nil, errors.E(op, err) } diff --git a/ircbot/commands_impl.go b/ircbot/commands_impl.go index 18f62077..2277d637 100644 --- a/ircbot/commands_impl.go +++ b/ircbot/commands_impl.go @@ -20,10 +20,7 @@ func NowPlaying(e Event) error { // in the announcement code message := "Now playing:{red} '%s' {clear}[%s/%s](%s), %s, %s, {green}LP:{clear} %s" - status, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus) - if err != nil { - return errors.E(op, err) - } + status := e.Bot.StatusValue.Latest() if status.SongInfo.IsFallback { e.EchoPublic("Stream is currently down.") @@ -57,7 +54,7 @@ func NowPlaying(e Event) error { e.EchoPublic(message, status.Song.Metadata, FormatPlaybackDuration(songPosition), FormatPlaybackDuration(songLength), - Pluralf("%d listeners", int64(status.Listeners)), + Pluralf("%d listeners", e.Bot.ListenersValue.Latest()), Pluralf("%d faves", favoriteCount), Pluralf("played %d times", playedCount), FormatLongDuration(lastPlayedDiff), @@ -200,10 +197,7 @@ func StreamerUserInfo(e Event) error { name := e.Arguments["DJ"] if name == "" || !HasAccess(e.Client, e.Event) { // simple path with no argument or no access - status, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus) - if err != nil { - return errors.E(op, err) - } + status := e.Bot.StatusValue.Latest() e.EchoPublic("Current DJ: {green}%s", status.StreamerName) return nil } @@ -340,12 +334,9 @@ func ThreadURL(e Event) error { } } - resp, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus) - if err != nil { - return errors.E(op, err) - } + status := e.Bot.StatusValue.Latest() - e.Echo("Thread: %s", resp.Thread) + e.Echo("Thread: %s", status.Thread) return nil } @@ -426,20 +417,17 @@ func KillStreamer(e Event) error { case <-e.Ctx.Done(): } - status, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus) - if err != nil { + status := e.Bot.StatusValue.Latest() + + until := time.Until(status.SongInfo.End) + if force { + e.EchoPublic("Disconnecting right now") + } else if until == 0 { e.EchoPublic("Disconnecting after the current song") } else { - until := time.Until(status.SongInfo.End) - if force { - e.EchoPublic("Disconnecting right now") - } else if until == 0 { - e.EchoPublic("Disconnecting after the current song") - } else { - e.EchoPublic("Disconnecting in about %s", - FormatLongDuration(until), - ) - } + e.EchoPublic("Disconnecting in about %s", + FormatLongDuration(until), + ) } return nil diff --git a/ircbot/main.go b/ircbot/main.go index 34ea5ff7..6501b664 100644 --- a/ircbot/main.go +++ b/ircbot/main.go @@ -14,6 +14,7 @@ import ( "github.com/R-a-dio/valkyrie/errors" "github.com/R-a-dio/valkyrie/search" "github.com/R-a-dio/valkyrie/storage" + "github.com/R-a-dio/valkyrie/util" "github.com/lrstanley/girc" ) @@ -44,6 +45,14 @@ func Execute(ctx context.Context, cfg config.Config) error { return err } + b.StatusValue = util.StreamValue(ctx, manager.CurrentStatus, func(ctx context.Context, s radio.Status) { + err := announce.AnnounceSong(ctx, s) + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("failed to announce") + } + }) + b.ListenersValue = util.StreamValue(ctx, manager.CurrentListeners) + errCh := make(chan error, 2) go func() { // run the irc client @@ -53,10 +62,6 @@ func Execute(ctx context.Context, cfg config.Config) error { // run the grpc server errCh <- srv.Serve(ln) }() - go func() { - // setup our listener for new songs on the stream - errCh <- WaitForStatus(ctx, manager, announce) - }() // wait for our context to be canceled or Serve to error out select { @@ -109,8 +114,12 @@ func NewBot(ctx context.Context, cfg config.Config) (*Bot, error) { c: girc.New(ircConf), } - RegisterCommonHandlers(b, b.c) - RegisterCommandHandlers(ctx, b) + if err = RegisterCommonHandlers(b, b.c); err != nil { + return nil, err + } + if err = RegisterCommandHandlers(ctx, b); err != nil { + return nil, err + } go b.syncConfiguration(ctx) return b, nil @@ -125,6 +134,10 @@ type Bot struct { Streamer radio.StreamerService Searcher radio.SearchService + // Values used by commands + StatusValue *util.Value[radio.Status] + ListenersValue *util.Value[radio.Listeners] + c *girc.Client } @@ -193,72 +206,3 @@ func (b *Bot) syncConfiguration(ctx context.Context) { } } } - -func WaitForStatus(ctx context.Context, manager radio.ManagerService, announce radio.AnnounceService) error { - const op errors.Op = "ircbot.WaitForStatus" - - var noRetry = make(chan time.Time) - close(noRetry) - var retry <-chan time.Time = noRetry - - var previous radio.Status - - for { - // if we lost connection or are just starting out we retry the connection - // only way to exit this loop is by the context being canceled - select { - case <-ctx.Done(): - return errors.E(op, ctx.Err()) - case <-retry: - } - retry = noRetry - - // connect to the status stream - zerolog.Ctx(ctx).Info().Msg("connecting to manager for status updates") - stream, err := manager.CurrentStatus(ctx) - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to connect to manager") - // if it fails we retry in a short period - retry = time.After(time.Second * 5) - continue - } - - zerolog.Ctx(ctx).Info().Msg("starting status update reading") - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-retry: - } - retry = noRetry - - status, err := stream.Next() - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed next") - // stream error means we have to get a new stream and should - // break out of this inner loop - retry = time.After(time.Second * 5) - break - } - - // if song is same as previous skip the announce - if previous.Song.EqualTo(status.Song) { - zerolog.Ctx(ctx).Info().Msg("skipping same song announce") - continue - } - - // otherwise we announce - err = announce.AnnounceSong(ctx, status) - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("failed to announce song") - continue - } - - previous = status - } - - // if we leave the inner loop it means our stream broke so we're getting a new one - // soon, clean up this current one - stream.Close() - } -} diff --git a/jobs/listenlog.go b/jobs/listenlog.go index 23fe0d6a..3508c7d2 100644 --- a/jobs/listenlog.go +++ b/jobs/listenlog.go @@ -6,6 +6,7 @@ import ( radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/config" "github.com/R-a-dio/valkyrie/storage" + "github.com/R-a-dio/valkyrie/util" ) // ExecuteListenerLog fetches the listener count from the manager and inserts a line into @@ -18,7 +19,7 @@ func ExecuteListenerLog(ctx context.Context, cfg config.Config) error { m := cfg.Conf().Manager.Client() - status, err := radio.OneOff[radio.Status](ctx, m.CurrentStatus) + status, err := util.OneOff[radio.Status](ctx, m.CurrentStatus) if err != nil { return err } diff --git a/radio.go b/radio.go index 6e313383..38261a0b 100644 --- a/radio.go +++ b/radio.go @@ -286,16 +286,6 @@ type ManagerService interface { CurrentStatus(context.Context) (eventstream.Stream[Status], error) } -func OneOff[T any](ctx context.Context, fn func(context.Context) (eventstream.Stream[T], error)) (T, error) { - s, err := fn(ctx) - if err != nil { - return *new(T), err - } - defer s.Close() - - return s.Next() -} - type StreamerService interface { Start(context.Context) error Stop(ctx context.Context, force bool) error diff --git a/util/util.go b/util/util.go index 0fc89dd3..4d006641 100644 --- a/util/util.go +++ b/util/util.go @@ -1,8 +1,89 @@ package util -import "net/http" +import ( + "context" + "net/http" + "sync/atomic" + "time" + + "github.com/R-a-dio/valkyrie/util/eventstream" + "github.com/rs/zerolog" +) // IsHTMX checks if a request was made by HTMX through the Hx-Request header func IsHTMX(r *http.Request) bool { return r.Header.Get("Hx-Request") == "true" } + +type StreamFn[T any] func(context.Context) (eventstream.Stream[T], error) + +type StreamCallbackFn[T any] func(context.Context, T) + +// OneOff creates a stream through fn and returns the first value received after which +// it closes the stream. Should be used where you only need a very sporadic value that is +// supplied by a streaming API. +func OneOff[T any](ctx context.Context, fn StreamFn[T]) (T, error) { + s, err := fn(ctx) + if err != nil { + return *new(T), err + } + defer s.Close() + + return s.Next() +} + +// StreamValue opens the stream created by StreamFn and calls any callbackFn given everytime a new +// value is returned by the stream. StreamValue also stores the last received value, accessable by +// calling .Latest +func StreamValue[T any](ctx context.Context, fn StreamFn[T], callbackFn ...StreamCallbackFn[T]) *Value[T] { + var value Value[T] + + go func() { + for { + stream, err := fn(ctx) + if err != nil { + // stream creation error most likely means the service + // is down or unavailable for some reason so retry in + // a little bit and stay alive + zerolog.Ctx(ctx).Error().Err(err).Msg("stream-value: stream error") + select { + case <-ctx.Done(): + // context was canceled, either while we were waiting on + // retrying, or that was our original error and we exit + return + case <-time.After(time.Second): + } + continue + } + + for { + v, err := stream.Next() + if err != nil { + // we either got context canceled or received some + // stream error that indicates we need a new stream, + // try and get one from the outer loop. + zerolog.Ctx(ctx).Error().Err(err).Msg("stream-value: next error") + break + } + + value.last.Store(&v) + + for _, callback := range callbackFn { + // TODO(wessie): run in goroutine? + callback(ctx, v) + } + } + stream.Close() + } + }() + + return &value +} + +type Value[T any] struct { + last atomic.Pointer[T] +} + +func (v *Value[T]) Latest() T { + return *v.last.Load() +} diff --git a/website/api/php/api.go b/website/api/php/api.go index 4f186dbc..e54d1048 100644 --- a/website/api/php/api.go +++ b/website/api/php/api.go @@ -16,6 +16,7 @@ import ( "github.com/R-a-dio/valkyrie/config" "github.com/R-a-dio/valkyrie/errors" "github.com/R-a-dio/valkyrie/search" + "github.com/R-a-dio/valkyrie/util" "github.com/R-a-dio/valkyrie/website/middleware" "github.com/rs/zerolog" "github.com/rs/zerolog/hlog" @@ -272,7 +273,7 @@ func (sri *searchResponseItem) fromSong(s radio.Song) error { } func (a *API) getCanRequest(w http.ResponseWriter, r *http.Request) { - status, err := radio.OneOff(r.Context(), a.manager.CurrentStatus) + status, err := util.OneOff(r.Context(), a.manager.CurrentStatus) if err != nil { return } @@ -566,7 +567,7 @@ func (s *v0Status) createStatusJSON(ctx context.Context) (v0StatusJSON, error) { status.ListCreatedOn = now } - ms, err := radio.OneOff(ctx, s.manager.CurrentStatus) + ms, err := util.OneOff(ctx, s.manager.CurrentStatus) if err != nil { return last, err } diff --git a/website/public/home.go b/website/public/home.go index dbaa6668..04c1b9bc 100644 --- a/website/public/home.go +++ b/website/public/home.go @@ -5,6 +5,7 @@ import ( radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/errors" + "github.com/R-a-dio/valkyrie/util" "github.com/R-a-dio/valkyrie/website/middleware" "github.com/rs/zerolog/hlog" ) @@ -42,7 +43,7 @@ func (s State) getHome(w http.ResponseWriter, r *http.Request) error { input := NewHomeInput(r) ctx := r.Context() - status, err := radio.OneOff(ctx, s.Manager.CurrentStatus) + status, err := util.OneOff(ctx, s.Manager.CurrentStatus) if err != nil { return errors.E(op, errors.InternalServer, err) }