Skip to content

Commit

Permalink
rpc: switch to gRPC streams
Browse files Browse the repository at this point in the history
This change allows us to have a push-model on the backend for changes to status.
Also means we can do push-style updates to clients if so desired; currently
test-implemented as a SSE stream but other protocols would also be supported by
this change.
  • Loading branch information
Wessie committed Jan 19, 2024
1 parent 4045e34 commit 0cf03d5
Show file tree
Hide file tree
Showing 19 changed files with 801 additions and 293 deletions.
2 changes: 1 addition & 1 deletion ircbot/commands_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func StreamerUserInfo(e Event) error {
user = &radio.User{}
}

err = e.Bot.Manager.UpdateUser(e.Ctx, name, *user)
err = e.Bot.Manager.UpdateUser(e.Ctx, *user)
if err != nil {
return errors.E(op, err)
}
Expand Down
55 changes: 36 additions & 19 deletions manager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/errors"
"github.com/R-a-dio/valkyrie/rpc"
"github.com/R-a-dio/valkyrie/util/eventstream"
"google.golang.org/grpc"
)

Expand All @@ -19,6 +20,22 @@ func NewHTTPServer(m *Manager) (*grpc.Server, error) {
return gs, nil
}

func (m *Manager) CurrentUser(ctx context.Context) (eventstream.Stream[radio.User], error) {
return m.userStream.SubStream(ctx), nil
}

func (m *Manager) CurrentThread(ctx context.Context) (eventstream.Stream[radio.Thread], error) {
return m.threadStream.SubStream(ctx), nil
}

func (m *Manager) CurrentSong(ctx context.Context) (eventstream.Stream[*radio.SongUpdate], error) {
return m.songStream.SubStream(ctx), nil
}

func (m *Manager) CurrentListeners(ctx context.Context) (eventstream.Stream[radio.Listeners], error) {
return m.listenerStream.SubStream(ctx), nil
}

// Status returns the current status of the radio
func (m *Manager) Status(ctx context.Context) (*radio.Status, error) {
m.mu.Lock()
Expand All @@ -28,14 +45,13 @@ func (m *Manager) Status(ctx context.Context) (*radio.Status, error) {
}

// UpdateUser sets information about the current streamer
func (m *Manager) UpdateUser(ctx context.Context, displayName string, u radio.User) error {
func (m *Manager) UpdateUser(ctx context.Context, u radio.User) error {
defer m.updateStreamStatus()
m.userStream.Send(u)

m.mu.Lock()
if displayName != "" {
m.status.StreamerName = displayName
} else {
m.status.StreamerName = u.DJ.Name
}

m.status.StreamerName = u.DJ.Name
m.status.User = u

isRobot := u.Username == "AFK"
Expand All @@ -49,14 +65,17 @@ func (m *Manager) UpdateUser(ctx context.Context, displayName string, u radio.Us
}

m.mu.Unlock()
log.Printf("manager: updating user to: %s (%s)", displayName, u.Username)
log.Printf("manager: updating user to: %s (%s)", u.DJ.Name, u.Username)
return nil
}

// UpdateSong sets information about the currently playing song
func (m *Manager) UpdateSong(ctx context.Context, new radio.Song, info radio.SongInfo) error {
func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) error {
const op errors.Op = "manager/Manager.UpdateSong"

new := update
info := update.Info

// first we check if this is the same song as the previous one we received to
// avoid double announcement or drifting start/end timings
m.mu.Lock()
Expand Down Expand Up @@ -141,8 +160,6 @@ func (m *Manager) UpdateSong(ctx context.Context, new radio.Song, info radio.Son
var startListenerCount int
startListenerCount, m.songStartListenerCount = m.songStartListenerCount, currentListenerCount

// make a copy of our current status to send to the announcer
announceStatus := m.status.Copy()
m.mu.Unlock()

// only calculate a diff if we have more than 10 listeners
Expand All @@ -153,12 +170,8 @@ func (m *Manager) UpdateSong(ctx context.Context, new radio.Song, info radio.Son

log.Printf("manager: set song: \"%s\" (%s)\n", song.Metadata, song.Length)

// announce the new song over a chat service
err = m.client.announce.AnnounceSong(ctx, announceStatus)
if err != nil {
// this isn't a critical error, so we do not return it if it occurs
log.Printf("%s: failed to announce song: %s", op, err)
}
// send an event out
m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info})

// =============================================
// finish up database work for the previous song
Expand Down Expand Up @@ -205,19 +218,23 @@ func (m *Manager) UpdateSong(ctx context.Context, new radio.Song, info radio.Son
}

// UpdateThread sets the current thread information on the front page and chats
func (m *Manager) UpdateThread(ctx context.Context, thread string) error {
func (m *Manager) UpdateThread(ctx context.Context, thread radio.Thread) error {
defer m.updateStreamStatus()
m.threadStream.Send(thread)

m.mu.Lock()
m.status.Thread = thread
m.mu.Unlock()
return nil
}

// UpdateListeners sets the listener count
func (m *Manager) UpdateListeners(ctx context.Context, listeners int) error {
func (m *Manager) UpdateListeners(ctx context.Context, listeners radio.Listeners) error {
defer m.updateStreamStatus()
m.listenerStream.Send(listeners)

m.mu.Lock()
m.status.Listeners = listeners
m.status.Listeners = int(listeners)
m.mu.Unlock()
return nil
}
2 changes: 1 addition & 1 deletion manager/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (ln *Listener) parseResponse(ctx context.Context, metasize int, src io.Read
go func() {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
err := ln.manager.UpdateSong(ctx, s, info)
err := ln.manager.UpdateSong(ctx, &radio.SongUpdate{Song: s, Info: info})
if err != nil {
log.Printf("manager-listener: error setting song: %s\n", err)
}
Expand Down
14 changes: 12 additions & 2 deletions manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/config"
"github.com/R-a-dio/valkyrie/storage"
"github.com/R-a-dio/valkyrie/util/eventstream"
)

// Execute executes a manager with the context and configuration given; it returns with
Expand Down Expand Up @@ -70,7 +71,11 @@ func NewManager(ctx context.Context, cfg config.Config) (*Manager, error) {
}
m.status = *old

m.client.announce = cfg.Conf().IRC.Client()
m.userStream = eventstream.NewEventStream(old.User)
m.threadStream = eventstream.NewEventStream(old.Thread)
m.songStream = eventstream.NewEventStream(&radio.SongUpdate{Song: old.Song, Info: old.SongInfo})
m.listenerStream = eventstream.NewEventStream(radio.Listeners(old.Listeners))

m.client.streamer = cfg.Conf().Streamer.Client()
return &m, nil
}
Expand All @@ -82,7 +87,6 @@ type Manager struct {

// Other components
client struct {
announce radio.AnnounceService
streamer radio.StreamerService
}
// mu protects the fields below and their contents
Expand All @@ -91,6 +95,12 @@ type Manager struct {
autoStreamerTimer *time.Timer
// listener count at the start of a song
songStartListenerCount int

// streaming support
userStream *eventstream.EventStream[radio.User]
threadStream *eventstream.EventStream[radio.Thread]
songStream *eventstream.EventStream[*radio.SongUpdate]
listenerStream *eventstream.EventStream[radio.Listeners]
}

// updateStreamStatus is a legacy layer to keep supporting streamstatus table usage
Expand Down
1 change: 1 addition & 0 deletions manager/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package manager
44 changes: 31 additions & 13 deletions radio.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strconv"
"strings"
"time"

"github.com/R-a-dio/valkyrie/util/eventstream"
)

// CalculateRequestDelay returns the delay between two requests of a song
Expand Down Expand Up @@ -218,20 +220,36 @@ type SearchResult struct {
TotalHits int
}

type SongUpdate struct {
Song
Info SongInfo
}

type Thread = string

type Listeners = int64

type ManagerService interface {
CurrentUser(context.Context) (eventstream.Stream[User], error)
UpdateUser(context.Context, User) error
CurrentSong(context.Context) (eventstream.Stream[*SongUpdate], error)
UpdateSong(context.Context, *SongUpdate) error
CurrentThread(context.Context) (eventstream.Stream[Thread], error)
UpdateThread(context.Context, Thread) error
CurrentListeners(context.Context) (eventstream.Stream[Listeners], error)
UpdateListeners(context.Context, Listeners) error

Status(context.Context) (*Status, error)
}

func OneOff[T any](ctx context.Context, fn func(context.Context) (eventstream.Stream[T], error)) (T, error) {
s, err := fn(ctx)
if err != nil {
return *new(T), err
}
defer s.Close()

// UpdateUser updates the user that is currently streaming, the display name
// is shown instead of the users name if given.
UpdateUser(ctx context.Context, displayName string, u User) error
// UpdateSong updates the currently playing song information, it only
// requires the Metadata field to be set but others are allowed.
UpdateSong(context.Context, Song, SongInfo) error
// UpdateThread updates the current thread shown on the website and in chat,
// the string can basically be any content to be shown to the users
UpdateThread(ctx context.Context, thread string) error
// UpdateListeners updates the current listener count across all streams
UpdateListeners(context.Context, int) error
return s.Next()
}

type StreamerService interface {
Expand Down Expand Up @@ -793,8 +811,8 @@ type SubmissionStatus string
// Possible status for song submissions
const (
SubmissionAccepted SubmissionStatus = "accepted"
SubmissionDeclined = "declined"
SubmissionAwaitingReview = "awaiting-review"
SubmissionDeclined SubmissionStatus = "declined"
SubmissionAwaitingReview SubmissionStatus = "awaiting-review"
)

// PendingSong is a song currently awaiting approval in the pending queue
Expand Down
4 changes: 2 additions & 2 deletions radio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestSongRequestable(t *testing.T) {
}
}

func TestCanRequest(t *testing.T) {
func TestCalculateCooldown(t *testing.T) {
tests := []struct {
delay time.Duration
last time.Time
Expand All @@ -103,7 +103,7 @@ func TestCanRequest(t *testing.T) {
}

for _, test := range tests {
d, ok := CanUserRequest(test.delay, test.last)
d, ok := CalculateCooldown(test.delay, test.last)
if ok != test.ok {
t.Errorf("failed %s on %s, returned: %s", test.last, test.delay, d)
}
Expand Down
94 changes: 78 additions & 16 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/util/eventstream"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
emptypb "google.golang.org/protobuf/types/known/emptypb"
Expand Down Expand Up @@ -95,38 +96,58 @@ func (m ManagerClientRPC) Status(ctx context.Context) (*radio.Status, error) {
}, nil
}

func (m ManagerClientRPC) CurrentUser(ctx context.Context) (eventstream.Stream[radio.User], error) {
c := func(ctx context.Context, e *emptypb.Empty, opts ...grpc.CallOption) (pbReceiver[*User], error) {
return m.rpc.CurrentUser(ctx, e, opts...)
}
return streamFromProtobuf(ctx, c, fromProtoUser)
}

// UpdateUser implements radio.ManagerService
func (m ManagerClientRPC) UpdateUser(ctx context.Context, n string, u radio.User) error {
_, err := m.rpc.SetUser(ctx, &UserUpdate{
User: toProtoUser(u),
StreamerName: n,
})
func (m ManagerClientRPC) UpdateUser(ctx context.Context, u radio.User) error {
_, err := m.rpc.UpdateUser(ctx, toProtoUser(u))
return err
}

func (m ManagerClientRPC) CurrentSong(ctx context.Context) (eventstream.Stream[*radio.SongUpdate], error) {
c := func(ctx context.Context, e *emptypb.Empty, opts ...grpc.CallOption) (pbReceiver[*SongUpdate], error) {
return m.rpc.CurrentSong(ctx, e, opts...)
}
return streamFromProtobuf(ctx, c, fromProtoSongUpdate)
}

// UpdateSong implements radio.ManagerService
func (m ManagerClientRPC) UpdateSong(ctx context.Context, s radio.Song, i radio.SongInfo) error {
_, err := m.rpc.SetSong(ctx, &SongUpdate{
Song: toProtoSong(s),
Info: toProtoSongInfo(i),
})
func (m ManagerClientRPC) UpdateSong(ctx context.Context, u *radio.SongUpdate) error {
_, err := m.rpc.UpdateSong(ctx, toProtoSongUpdate(u))
return err
}

// UpdateThread implements radio.ManagerService
func (m ManagerClientRPC) UpdateThread(ctx context.Context, thread string) error {
_, err := m.rpc.SetThread(ctx, wrapperspb.String(thread))
func (m ManagerClientRPC) UpdateThread(ctx context.Context, thread radio.Thread) error {
_, err := m.rpc.UpdateThread(ctx, wrapperspb.String(thread))
return err
}

func (m ManagerClientRPC) CurrentThread(ctx context.Context) (eventstream.Stream[radio.Thread], error) {
c := func(ctx context.Context, e *emptypb.Empty, opts ...grpc.CallOption) (pbReceiver[*wrapperspb.StringValue], error) {
return m.rpc.CurrentThread(ctx, e, opts...)
}
return streamFromProtobuf(ctx, c, func(v *wrapperspb.StringValue) radio.Thread { return v.Value })
}

// UpdateListeners implements radio.ManagerService
func (m ManagerClientRPC) UpdateListeners(ctx context.Context, count int) error {
_, err := m.rpc.SetListenerInfo(ctx, &ListenerInfo{
Listeners: int64(count),
})
func (m ManagerClientRPC) UpdateListeners(ctx context.Context, count radio.Listeners) error {
_, err := m.rpc.UpdateListenerCount(ctx, wrapperspb.Int64(count))
return err
}

func (m ManagerClientRPC) CurrentListeners(ctx context.Context) (eventstream.Stream[radio.Listeners], error) {
c := func(ctx context.Context, e *emptypb.Empty, opts ...grpc.CallOption) (pbReceiver[*wrapperspb.Int64Value], error) {
return m.rpc.CurrentListenerCount(ctx, e, opts...)
}
return streamFromProtobuf(ctx, c, func(v *wrapperspb.Int64Value) radio.Listeners { return v.Value })
}

// NewStreamerService returns a new client implementing radio.StreamerService
func NewStreamerService(c *grpc.ClientConn) radio.StreamerService {
return StreamerClientRPC{
Expand Down Expand Up @@ -253,3 +274,44 @@ func (q QueueClientRPC) Entries(ctx context.Context) ([]radio.QueueEntry, error)
}
return queue, nil
}

type pbCreator[P any] func(context.Context, *emptypb.Empty, ...grpc.CallOption) (pbReceiver[P], error)

type pbReceiver[P any] interface {
Recv() (P, error)
grpc.ClientStream
}

type grpcStream[P, T any] struct {
s pbReceiver[P]
conv func(P) T
cancel context.CancelFunc
}

func (gs *grpcStream[P, T]) Next() (T, error) {
p, err := gs.s.Recv()
if err != nil {
return *new(T), nil
}
return gs.conv(p), nil
}

func (gs *grpcStream[P, T]) Close() error {
gs.cancel()
return nil
}

func streamFromProtobuf[P, T any](ctx context.Context, streamFn pbCreator[P], conv func(P) T) (eventstream.Stream[T], error) {
var gs grpcStream[P, T]
var err error

ctx, gs.cancel = context.WithCancel(ctx)

gs.s, err = streamFn(ctx, new(emptypb.Empty))
if err != nil {
return nil, err
}

gs.conv = conv
return &gs, nil
}
Loading

0 comments on commit 0cf03d5

Please sign in to comment.