Skip to content

Commit

Permalink
streamer: add ConnectTimeout configuration
Browse files Browse the repository at this point in the history
util: add CallbackTimer that is similar to time.Timer but with
a start/stop system and a callback

streamer: only connect after a timeout if nobody is streaming
  • Loading branch information
Wessie committed May 13, 2024
1 parent b8963f9 commit 2ae539b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 3 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var defaultConfig = config{
ListenAddr: ":4545",
StreamURL: "",
RequestsEnabled: true,
ConnectTimeout: Duration(time.Second * 30),
},
IRC: irc{
Addr: ":4444",
Expand Down Expand Up @@ -193,6 +194,9 @@ type streamer struct {
StreamURL URL
// RequestsEnabled indicates if requests are enabled currently
RequestsEnabled bool
// ConnectTimeout is how long to wait before connecting if the
// proxy has no streamer. Set to 0 to disable
ConnectTimeout Duration
}

// irc contains all the fields only relevant to the irc bot
Expand Down
15 changes: 12 additions & 3 deletions streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Streamer struct {
queue radio.QueueService
// Format of the PCM audio data
AudioFormat audio.AudioFormat
startTimer *util.CallbackTimer

Check failure on line 48 in streamer/streamer.go

View workflow job for this annotation

GitHub Actions / test

undefined: util.CallbackTimer

// sync primitives
wg sync.WaitGroup
Expand Down Expand Up @@ -75,7 +76,10 @@ func NewStreamer(ctx context.Context, cfg config.Config, qs radio.QueueService,

// user value to tell us who is streaming according to the proxy
s.userValue = util.StreamValue(ctx, cfg.Manager.CurrentUser, s.userChange)

// timer we use for starting the streamer if nobody is on
s.startTimer = util.NewCallbackTimer(func() {
s.Start(ctx)
})
// and grab the user from the streamurl so we can compare it later for
// request enabling.
user, err := us.Get(cfg.Conf().Streamer.StreamURL.URL().User.Username())
Expand All @@ -87,10 +91,15 @@ func NewStreamer(ctx context.Context, cfg config.Config, qs radio.QueueService,
}

func (s *Streamer) userChange(ctx context.Context, user *radio.User) {
// nobody is streaming
if user == nil {
s.Start(context.WithoutCancel(ctx))
return
// we are allowed to connect after a timeout if one is set
if timeout := s.Conf().Streamer.ConnectTimeout; timeout > 0 {
s.startTimer.Start(time.Duration(timeout))
return
}
}
// if we are supposed to be streaming, we can connect
if user.ID == s.StreamUser.ID {
s.Start(context.WithoutCancel(ctx))
return
Expand Down
35 changes: 35 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -228,3 +229,37 @@ type Value[T any] struct {
func (v *Value[T]) Latest() T {
return *v.last.Load()
}

type TimerCallback struct {
fn func()

mu sync.Mutex
timer *time.Timer
}

func NewCallbackTimer(callback func()) *TimerCallback {
return &TimerCallback{
fn: callback,
}
}

// Start starts a timer with the timeout given, if a timer
// is already running it is stopped and a new timer is created
func (tc *TimerCallback) Start(timeout time.Duration) {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.timer != nil {
tc.timer.Stop()
}
tc.timer = time.AfterFunc(timeout, tc.fn)
}

// Stop stops the current timer if one exists
func (tc *TimerCallback) Stop() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.timer != nil {
return tc.timer.Stop()
}
return true
}

0 comments on commit 2ae539b

Please sign in to comment.