diff --git a/streamer/api.go b/streamer/api.go index 9b86ca7..94e5e65 100644 --- a/streamer/api.go +++ b/streamer/api.go @@ -44,9 +44,9 @@ type streamerService struct { } // Start implements radio.StreamerService -func (s *streamerService) Start(_ context.Context) error { +func (s *streamerService) Start(ctx context.Context) error { // don't use the passed ctx here as it will cancel once we return - s.streamer.Start(context.Background()) + s.streamer.Start(context.WithoutCancel(ctx)) return nil } diff --git a/streamer/main.go b/streamer/main.go index f22c50b..7c9bb06 100644 --- a/streamer/main.go +++ b/streamer/main.go @@ -20,17 +20,14 @@ func Execute(ctx context.Context, cfg config.Config) error { return err } - streamer, err := NewStreamer(ctx, cfg, queue) + streamer, err := NewStreamer(ctx, cfg, queue, store.User(ctx)) if err != nil { return err } defer streamer.ForceStop(context.Background()) - // announcement service - announce := cfg.IRC - // setup a http server for our RPC API - srv, err := NewGRPCServer(ctx, cfg, store, queue, announce, streamer) + srv, err := NewGRPCServer(ctx, cfg, store, queue, cfg.IRC, streamer) if err != nil { return err } diff --git a/streamer/streamer.go b/streamer/streamer.go index aab3abe..1e6b89c 100644 --- a/streamer/streamer.go +++ b/streamer/streamer.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "net/url" "runtime/debug" "sync" "sync/atomic" @@ -38,6 +39,9 @@ type Streamer struct { config.Config logger *zerolog.Logger + userValue *util.Value[*radio.User] + // StreamUser is the user that we are logged in as when streaming + StreamUser radio.User // queue used by the streamer queue radio.QueueService // Format of the PCM audio data @@ -53,11 +57,13 @@ type Streamer struct { } // NewStreamer returns a new streamer using the state given -func NewStreamer(ctx context.Context, cfg config.Config, queue radio.QueueService) (*Streamer, error) { +func NewStreamer(ctx context.Context, cfg config.Config, qs radio.QueueService, us radio.UserStorage) (*Streamer, error) { + const op errors.Op = "streamer.NewStreamer" + var s = &Streamer{ Config: cfg, logger: zerolog.Ctx(ctx), - queue: queue, + queue: qs, } s.AudioFormat = audio.AudioFormat{ @@ -66,9 +72,32 @@ func NewStreamer(ctx context.Context, cfg config.Config, queue radio.QueueServic SampleRate: 44100, } + s.userValue = util.StreamValue(ctx, cfg.Manager.CurrentUser, s.userChange) + + uri, err := url.Parse(cfg.Conf().Streamer.StreamURL) + if err != nil { + return nil, errors.E(op, err) + } + + user, err := us.Get(uri.User.Username()) + if err != nil { + return nil, errors.E(op, err) + } + s.StreamUser = *user return s, nil } +func (s *Streamer) userChange(ctx context.Context, user *radio.User) { + if user == nil { + s.Start(context.WithoutCancel(ctx)) + return + } + if user.ID == s.StreamUser.ID { + s.Start(context.WithoutCancel(ctx)) + return + } +} + // Start starts the streamer with the context given, Start is a noop if // already started func (s *Streamer) Start(ctx context.Context) {