Skip to content

Commit

Permalink
streamer: implemented a very rudimentary self-connect
Browse files Browse the repository at this point in the history
initial implementation for #93
  • Loading branch information
Wessie committed May 13, 2024
1 parent 5ecb463 commit 2db809d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
4 changes: 2 additions & 2 deletions streamer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type streamerService struct {
}

// Start implements radio.StreamerService
func (s *streamerService) Start(_ context.Context) error {
func (s *streamerService) Start(ctx context.Context) error {
// don't use the passed ctx here as it will cancel once we return
s.streamer.Start(context.Background())
s.streamer.Start(context.WithoutCancel(ctx))
return nil
}

Expand Down
7 changes: 2 additions & 5 deletions streamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@ func Execute(ctx context.Context, cfg config.Config) error {
return err
}

streamer, err := NewStreamer(ctx, cfg, queue)
streamer, err := NewStreamer(ctx, cfg, queue, store.User(ctx))
if err != nil {
return err
}
defer streamer.ForceStop(context.Background())

// announcement service
announce := cfg.IRC

// setup a http server for our RPC API
srv, err := NewGRPCServer(ctx, cfg, store, queue, announce, streamer)
srv, err := NewGRPCServer(ctx, cfg, store, queue, cfg.IRC, streamer)
if err != nil {
return err
}
Expand Down
33 changes: 31 additions & 2 deletions streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net"
"net/url"
"runtime/debug"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -38,6 +39,9 @@ type Streamer struct {
config.Config
logger *zerolog.Logger

userValue *util.Value[*radio.User]
// StreamUser is the user that we are logged in as when streaming
StreamUser radio.User
// queue used by the streamer
queue radio.QueueService
// Format of the PCM audio data
Expand All @@ -53,11 +57,13 @@ type Streamer struct {
}

// NewStreamer returns a new streamer using the state given
func NewStreamer(ctx context.Context, cfg config.Config, queue radio.QueueService) (*Streamer, error) {
func NewStreamer(ctx context.Context, cfg config.Config, qs radio.QueueService, us radio.UserStorage) (*Streamer, error) {
const op errors.Op = "streamer.NewStreamer"

var s = &Streamer{
Config: cfg,
logger: zerolog.Ctx(ctx),
queue: queue,
queue: qs,
}

s.AudioFormat = audio.AudioFormat{
Expand All @@ -66,9 +72,32 @@ func NewStreamer(ctx context.Context, cfg config.Config, queue radio.QueueServic
SampleRate: 44100,
}

s.userValue = util.StreamValue(ctx, cfg.Manager.CurrentUser, s.userChange)

uri, err := url.Parse(cfg.Conf().Streamer.StreamURL)
if err != nil {
return nil, errors.E(op, err)
}

user, err := us.Get(uri.User.Username())
if err != nil {
return nil, errors.E(op, err)
}
s.StreamUser = *user
return s, nil
}

func (s *Streamer) userChange(ctx context.Context, user *radio.User) {
if user == nil {
s.Start(context.WithoutCancel(ctx))
return
}
if user.ID == s.StreamUser.ID {
s.Start(context.WithoutCancel(ctx))
return
}
}

// Start starts the streamer with the context given, Start is a noop if
// already started
func (s *Streamer) Start(ctx context.Context) {
Expand Down

0 comments on commit 2db809d

Please sign in to comment.