Skip to content

Commit

Permalink
radio: move OneOff to the util package
Browse files Browse the repository at this point in the history
OneOff now has a bigger brother named StreamValue that keeps a
stream open in the background and continuously updates the value
until the context used to create it is canceled.

StreamValue also supports passing it a callback that is to be called
everytime a value arrives.

ircbot: move to the StreamValue for status updates

ircbot/AnnounceSong: now checks if a song is equal to the previous one
	instead of leaving this up to the caller.
  • Loading branch information
Wessie committed Feb 25, 2024
1 parent 9b7f49c commit ae0e1c9
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 122 deletions.
16 changes: 11 additions & 5 deletions ircbot/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@ type announceService struct {
config.Config
Storage radio.StorageService

bot *Bot
lastAnnounceSong time.Time
bot *Bot
lastAnnounceSongTime time.Time
lastAnnounceSong radio.Song
}

func (ann *announceService) AnnounceSong(ctx context.Context, status radio.Status) error {
const op errors.Op = "irc/announceService.AnnounceSong"

// don't do the announcement if the last one was recent enough
if time.Since(ann.lastAnnounceSong) < time.Duration(ann.Conf().IRC.AnnouncePeriod) {
zerolog.Ctx(ctx).Info().Str("metadata", status.Song.Metadata).Msg("skipping announce")
if time.Since(ann.lastAnnounceSongTime) < time.Duration(ann.Conf().IRC.AnnouncePeriod) {
zerolog.Ctx(ctx).Info().Str("metadata", status.Song.Metadata).Msg("skipping announce: announce period")
return nil
}
if ann.lastAnnounceSong.EqualTo(status.Song) {
zerolog.Ctx(ctx).Info().Str("metadata", status.Song.Metadata).Msg("skipping announce: same as last song")
return nil
}
message := "Now starting:{red} '%s' {clear}[%s](%s), %s, %s, {green}LP:{clear} %s"
Expand Down Expand Up @@ -75,7 +80,8 @@ func (ann *announceService) AnnounceSong(ctx context.Context, status radio.Statu
)

ann.bot.c.Cmd.Message(ann.Conf().IRC.MainChannel, message)
ann.lastAnnounceSong = time.Now()
ann.lastAnnounceSong = status.Song
ann.lastAnnounceSongTime = time.Now()

//
// ======= favorite announcements below =========
Expand Down
3 changes: 2 additions & 1 deletion ircbot/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/errors"
"github.com/R-a-dio/valkyrie/util"
"github.com/lrstanley/girc"
"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -230,7 +231,7 @@ func (e Event) ArgumentTrack(key string) (*radio.Song, error) {
func (e Event) CurrentTrack() (*radio.Song, error) {
const op errors.Op = "irc/Event.CurrentTrack"

status, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus)
status, err := util.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus)
if err != nil {
return nil, errors.E(op, err)
}
Expand Down
40 changes: 14 additions & 26 deletions ircbot/commands_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ func NowPlaying(e Event) error {
// in the announcement code
message := "Now playing:{red} '%s' {clear}[%s/%s](%s), %s, %s, {green}LP:{clear} %s"

status, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus)
if err != nil {
return errors.E(op, err)
}
status := e.Bot.StatusValue.Latest()

if status.SongInfo.IsFallback {
e.EchoPublic("Stream is currently down.")
Expand Down Expand Up @@ -57,7 +54,7 @@ func NowPlaying(e Event) error {
e.EchoPublic(message,
status.Song.Metadata,
FormatPlaybackDuration(songPosition), FormatPlaybackDuration(songLength),
Pluralf("%d listeners", int64(status.Listeners)),
Pluralf("%d listeners", e.Bot.ListenersValue.Latest()),
Pluralf("%d faves", favoriteCount),
Pluralf("played %d times", playedCount),
FormatLongDuration(lastPlayedDiff),
Expand Down Expand Up @@ -200,10 +197,7 @@ func StreamerUserInfo(e Event) error {
name := e.Arguments["DJ"]
if name == "" || !HasAccess(e.Client, e.Event) {
// simple path with no argument or no access
status, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus)
if err != nil {
return errors.E(op, err)
}
status := e.Bot.StatusValue.Latest()
e.EchoPublic("Current DJ: {green}%s", status.StreamerName)
return nil
}
Expand Down Expand Up @@ -340,12 +334,9 @@ func ThreadURL(e Event) error {
}
}

resp, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus)
if err != nil {
return errors.E(op, err)
}
status := e.Bot.StatusValue.Latest()

e.Echo("Thread: %s", resp.Thread)
e.Echo("Thread: %s", status.Thread)
return nil
}

Expand Down Expand Up @@ -426,20 +417,17 @@ func KillStreamer(e Event) error {
case <-e.Ctx.Done():
}

status, err := radio.OneOff(e.Ctx, e.Bot.Manager.CurrentStatus)
if err != nil {
status := e.Bot.StatusValue.Latest()

until := time.Until(status.SongInfo.End)
if force {
e.EchoPublic("Disconnecting right now")
} else if until == 0 {
e.EchoPublic("Disconnecting after the current song")
} else {
until := time.Until(status.SongInfo.End)
if force {
e.EchoPublic("Disconnecting right now")
} else if until == 0 {
e.EchoPublic("Disconnecting after the current song")
} else {
e.EchoPublic("Disconnecting in about %s",
FormatLongDuration(until),
)
}
e.EchoPublic("Disconnecting in about %s",
FormatLongDuration(until),
)
}

return nil
Expand Down
94 changes: 19 additions & 75 deletions ircbot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/R-a-dio/valkyrie/errors"
"github.com/R-a-dio/valkyrie/search"
"github.com/R-a-dio/valkyrie/storage"
"github.com/R-a-dio/valkyrie/util"
"github.com/lrstanley/girc"
)

Expand Down Expand Up @@ -44,6 +45,14 @@ func Execute(ctx context.Context, cfg config.Config) error {
return err
}

b.StatusValue = util.StreamValue(ctx, manager.CurrentStatus, func(ctx context.Context, s radio.Status) {
err := announce.AnnounceSong(ctx, s)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("failed to announce")
}
})
b.ListenersValue = util.StreamValue(ctx, manager.CurrentListeners)

errCh := make(chan error, 2)
go func() {
// run the irc client
Expand All @@ -53,10 +62,6 @@ func Execute(ctx context.Context, cfg config.Config) error {
// run the grpc server
errCh <- srv.Serve(ln)
}()
go func() {
// setup our listener for new songs on the stream
errCh <- WaitForStatus(ctx, manager, announce)
}()

// wait for our context to be canceled or Serve to error out
select {
Expand Down Expand Up @@ -109,8 +114,12 @@ func NewBot(ctx context.Context, cfg config.Config) (*Bot, error) {
c: girc.New(ircConf),
}

RegisterCommonHandlers(b, b.c)
RegisterCommandHandlers(ctx, b)
if err = RegisterCommonHandlers(b, b.c); err != nil {
return nil, err
}
if err = RegisterCommandHandlers(ctx, b); err != nil {
return nil, err
}

go b.syncConfiguration(ctx)
return b, nil
Expand All @@ -125,6 +134,10 @@ type Bot struct {
Streamer radio.StreamerService
Searcher radio.SearchService

// Values used by commands
StatusValue *util.Value[radio.Status]
ListenersValue *util.Value[radio.Listeners]

c *girc.Client
}

Expand Down Expand Up @@ -193,72 +206,3 @@ func (b *Bot) syncConfiguration(ctx context.Context) {
}
}
}

func WaitForStatus(ctx context.Context, manager radio.ManagerService, announce radio.AnnounceService) error {
const op errors.Op = "ircbot.WaitForStatus"

var noRetry = make(chan time.Time)
close(noRetry)
var retry <-chan time.Time = noRetry

var previous radio.Status

for {
// if we lost connection or are just starting out we retry the connection
// only way to exit this loop is by the context being canceled
select {
case <-ctx.Done():
return errors.E(op, ctx.Err())
case <-retry:
}
retry = noRetry

// connect to the status stream
zerolog.Ctx(ctx).Info().Msg("connecting to manager for status updates")
stream, err := manager.CurrentStatus(ctx)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("failed to connect to manager")
// if it fails we retry in a short period
retry = time.After(time.Second * 5)
continue
}

zerolog.Ctx(ctx).Info().Msg("starting status update reading")
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-retry:
}
retry = noRetry

status, err := stream.Next()
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("failed next")
// stream error means we have to get a new stream and should
// break out of this inner loop
retry = time.After(time.Second * 5)
break
}

// if song is same as previous skip the announce
if previous.Song.EqualTo(status.Song) {
zerolog.Ctx(ctx).Info().Msg("skipping same song announce")
continue
}

// otherwise we announce
err = announce.AnnounceSong(ctx, status)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("failed to announce song")
continue
}

previous = status
}

// if we leave the inner loop it means our stream broke so we're getting a new one
// soon, clean up this current one
stream.Close()
}
}
3 changes: 2 additions & 1 deletion jobs/listenlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/config"
"github.com/R-a-dio/valkyrie/storage"
"github.com/R-a-dio/valkyrie/util"
)

// ExecuteListenerLog fetches the listener count from the manager and inserts a line into
Expand All @@ -18,7 +19,7 @@ func ExecuteListenerLog(ctx context.Context, cfg config.Config) error {

m := cfg.Conf().Manager.Client()

status, err := radio.OneOff[radio.Status](ctx, m.CurrentStatus)
status, err := util.OneOff[radio.Status](ctx, m.CurrentStatus)
if err != nil {
return err
}
Expand Down
10 changes: 0 additions & 10 deletions radio.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,6 @@ type ManagerService interface {
CurrentStatus(context.Context) (eventstream.Stream[Status], error)
}

func OneOff[T any](ctx context.Context, fn func(context.Context) (eventstream.Stream[T], error)) (T, error) {
s, err := fn(ctx)
if err != nil {
return *new(T), err
}
defer s.Close()

return s.Next()
}

type StreamerService interface {
Start(context.Context) error
Stop(ctx context.Context, force bool) error
Expand Down
83 changes: 82 additions & 1 deletion util/util.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,89 @@
package util

import "net/http"
import (
"context"
"net/http"
"sync/atomic"
"time"

"github.com/R-a-dio/valkyrie/util/eventstream"
"github.com/rs/zerolog"
)

// IsHTMX checks if a request was made by HTMX through the Hx-Request header
func IsHTMX(r *http.Request) bool {
return r.Header.Get("Hx-Request") == "true"
}

type StreamFn[T any] func(context.Context) (eventstream.Stream[T], error)

type StreamCallbackFn[T any] func(context.Context, T)

// OneOff creates a stream through fn and returns the first value received after which
// it closes the stream. Should be used where you only need a very sporadic value that is
// supplied by a streaming API.
func OneOff[T any](ctx context.Context, fn StreamFn[T]) (T, error) {
s, err := fn(ctx)
if err != nil {
return *new(T), err
}
defer s.Close()

return s.Next()
}

// StreamValue opens the stream created by StreamFn and calls any callbackFn given everytime a new
// value is returned by the stream. StreamValue also stores the last received value, accessable by
// calling .Latest
func StreamValue[T any](ctx context.Context, fn StreamFn[T], callbackFn ...StreamCallbackFn[T]) *Value[T] {
var value Value[T]

go func() {
for {
stream, err := fn(ctx)
if err != nil {
// stream creation error most likely means the service
// is down or unavailable for some reason so retry in
// a little bit and stay alive
zerolog.Ctx(ctx).Error().Err(err).Msg("stream-value: stream error")
select {
case <-ctx.Done():
// context was canceled, either while we were waiting on
// retrying, or that was our original error and we exit
return
case <-time.After(time.Second):
}
continue
}

for {
v, err := stream.Next()
if err != nil {
// we either got context canceled or received some
// stream error that indicates we need a new stream,
// try and get one from the outer loop.
zerolog.Ctx(ctx).Error().Err(err).Msg("stream-value: next error")
break
}

value.last.Store(&v)

for _, callback := range callbackFn {
// TODO(wessie): run in goroutine?
callback(ctx, v)
}
}
stream.Close()
}
}()

return &value
}

type Value[T any] struct {
last atomic.Pointer[T]
}

func (v *Value[T]) Latest() T {
return *v.last.Load()
}
Loading

0 comments on commit ae0e1c9

Please sign in to comment.