From 0cf03d5220073f4d2025f8251d8b7c1c758d2cfb Mon Sep 17 00:00:00 2001 From: Wessie Date: Fri, 19 Jan 2024 15:31:49 +0000 Subject: [PATCH] rpc: switch to gRPC streams This change allows us to have a push-model on the backend for changes to status. Also means we can do push-style updates to clients if so desired; currently test-implemented as a SSE stream but other protocols would also be supported by this change. --- ircbot/commands_impl.go | 2 +- manager/api.go | 55 ++-- manager/listener.go | 2 +- manager/main.go | 14 +- manager/stream.go | 1 + radio.go | 44 ++- radio_test.go | 4 +- rpc/client.go | 94 +++++-- rpc/helpers.go | 14 + rpc/radio.pb.go | 216 ++++++++------- rpc/radio.proto | 15 +- rpc/radio_grpc.pb.go | 388 +++++++++++++++++++++------ rpc/server.go | 66 +++-- templates/default/home.tmpl | 4 +- templates/default/partials/head.tmpl | 1 + website/api/php/api.go | 6 +- website/api/v1/router.go | 68 +++++ website/api/{ => v1}/sse.go | 91 +++++-- website/main.go | 9 + 19 files changed, 801 insertions(+), 293 deletions(-) create mode 100644 manager/stream.go create mode 100644 website/api/v1/router.go rename website/api/{ => v1}/sse.go (61%) diff --git a/ircbot/commands_impl.go b/ircbot/commands_impl.go index ae8a4918..1764c9b9 100644 --- a/ircbot/commands_impl.go +++ b/ircbot/commands_impl.go @@ -228,7 +228,7 @@ func StreamerUserInfo(e Event) error { user = &radio.User{} } - err = e.Bot.Manager.UpdateUser(e.Ctx, name, *user) + err = e.Bot.Manager.UpdateUser(e.Ctx, *user) if err != nil { return errors.E(op, err) } diff --git a/manager/api.go b/manager/api.go index 5b380158..453e7b62 100644 --- a/manager/api.go +++ b/manager/api.go @@ -8,6 +8,7 @@ import ( radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/errors" "github.com/R-a-dio/valkyrie/rpc" + "github.com/R-a-dio/valkyrie/util/eventstream" "google.golang.org/grpc" ) @@ -19,6 +20,22 @@ func NewHTTPServer(m *Manager) (*grpc.Server, error) { return gs, nil } +func (m *Manager) CurrentUser(ctx context.Context) (eventstream.Stream[radio.User], error) { + return m.userStream.SubStream(ctx), nil +} + +func (m *Manager) CurrentThread(ctx context.Context) (eventstream.Stream[radio.Thread], error) { + return m.threadStream.SubStream(ctx), nil +} + +func (m *Manager) CurrentSong(ctx context.Context) (eventstream.Stream[*radio.SongUpdate], error) { + return m.songStream.SubStream(ctx), nil +} + +func (m *Manager) CurrentListeners(ctx context.Context) (eventstream.Stream[radio.Listeners], error) { + return m.listenerStream.SubStream(ctx), nil +} + // Status returns the current status of the radio func (m *Manager) Status(ctx context.Context) (*radio.Status, error) { m.mu.Lock() @@ -28,14 +45,13 @@ func (m *Manager) Status(ctx context.Context) (*radio.Status, error) { } // UpdateUser sets information about the current streamer -func (m *Manager) UpdateUser(ctx context.Context, displayName string, u radio.User) error { +func (m *Manager) UpdateUser(ctx context.Context, u radio.User) error { defer m.updateStreamStatus() + m.userStream.Send(u) + m.mu.Lock() - if displayName != "" { - m.status.StreamerName = displayName - } else { - m.status.StreamerName = u.DJ.Name - } + + m.status.StreamerName = u.DJ.Name m.status.User = u isRobot := u.Username == "AFK" @@ -49,14 +65,17 @@ func (m *Manager) UpdateUser(ctx context.Context, displayName string, u radio.Us } m.mu.Unlock() - log.Printf("manager: updating user to: %s (%s)", displayName, u.Username) + log.Printf("manager: updating user to: %s (%s)", u.DJ.Name, u.Username) return nil } // UpdateSong sets information about the currently playing song -func (m *Manager) UpdateSong(ctx context.Context, new radio.Song, info radio.SongInfo) error { +func (m *Manager) UpdateSong(ctx context.Context, update *radio.SongUpdate) error { const op errors.Op = "manager/Manager.UpdateSong" + new := update + info := update.Info + // first we check if this is the same song as the previous one we received to // avoid double announcement or drifting start/end timings m.mu.Lock() @@ -141,8 +160,6 @@ func (m *Manager) UpdateSong(ctx context.Context, new radio.Song, info radio.Son var startListenerCount int startListenerCount, m.songStartListenerCount = m.songStartListenerCount, currentListenerCount - // make a copy of our current status to send to the announcer - announceStatus := m.status.Copy() m.mu.Unlock() // only calculate a diff if we have more than 10 listeners @@ -153,12 +170,8 @@ func (m *Manager) UpdateSong(ctx context.Context, new radio.Song, info radio.Son log.Printf("manager: set song: \"%s\" (%s)\n", song.Metadata, song.Length) - // announce the new song over a chat service - err = m.client.announce.AnnounceSong(ctx, announceStatus) - if err != nil { - // this isn't a critical error, so we do not return it if it occurs - log.Printf("%s: failed to announce song: %s", op, err) - } + // send an event out + m.songStream.Send(&radio.SongUpdate{Song: *song, Info: info}) // ============================================= // finish up database work for the previous song @@ -205,8 +218,10 @@ func (m *Manager) UpdateSong(ctx context.Context, new radio.Song, info radio.Son } // UpdateThread sets the current thread information on the front page and chats -func (m *Manager) UpdateThread(ctx context.Context, thread string) error { +func (m *Manager) UpdateThread(ctx context.Context, thread radio.Thread) error { defer m.updateStreamStatus() + m.threadStream.Send(thread) + m.mu.Lock() m.status.Thread = thread m.mu.Unlock() @@ -214,10 +229,12 @@ func (m *Manager) UpdateThread(ctx context.Context, thread string) error { } // UpdateListeners sets the listener count -func (m *Manager) UpdateListeners(ctx context.Context, listeners int) error { +func (m *Manager) UpdateListeners(ctx context.Context, listeners radio.Listeners) error { defer m.updateStreamStatus() + m.listenerStream.Send(listeners) + m.mu.Lock() - m.status.Listeners = listeners + m.status.Listeners = int(listeners) m.mu.Unlock() return nil } diff --git a/manager/listener.go b/manager/listener.go index e247e541..92412bc6 100644 --- a/manager/listener.go +++ b/manager/listener.go @@ -192,7 +192,7 @@ func (ln *Listener) parseResponse(ctx context.Context, metasize int, src io.Read go func() { ctx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() - err := ln.manager.UpdateSong(ctx, s, info) + err := ln.manager.UpdateSong(ctx, &radio.SongUpdate{Song: s, Info: info}) if err != nil { log.Printf("manager-listener: error setting song: %s\n", err) } diff --git a/manager/main.go b/manager/main.go index 16428b05..334244b8 100644 --- a/manager/main.go +++ b/manager/main.go @@ -10,6 +10,7 @@ import ( radio "github.com/R-a-dio/valkyrie" "github.com/R-a-dio/valkyrie/config" "github.com/R-a-dio/valkyrie/storage" + "github.com/R-a-dio/valkyrie/util/eventstream" ) // Execute executes a manager with the context and configuration given; it returns with @@ -70,7 +71,11 @@ func NewManager(ctx context.Context, cfg config.Config) (*Manager, error) { } m.status = *old - m.client.announce = cfg.Conf().IRC.Client() + m.userStream = eventstream.NewEventStream(old.User) + m.threadStream = eventstream.NewEventStream(old.Thread) + m.songStream = eventstream.NewEventStream(&radio.SongUpdate{Song: old.Song, Info: old.SongInfo}) + m.listenerStream = eventstream.NewEventStream(radio.Listeners(old.Listeners)) + m.client.streamer = cfg.Conf().Streamer.Client() return &m, nil } @@ -82,7 +87,6 @@ type Manager struct { // Other components client struct { - announce radio.AnnounceService streamer radio.StreamerService } // mu protects the fields below and their contents @@ -91,6 +95,12 @@ type Manager struct { autoStreamerTimer *time.Timer // listener count at the start of a song songStartListenerCount int + + // streaming support + userStream *eventstream.EventStream[radio.User] + threadStream *eventstream.EventStream[radio.Thread] + songStream *eventstream.EventStream[*radio.SongUpdate] + listenerStream *eventstream.EventStream[radio.Listeners] } // updateStreamStatus is a legacy layer to keep supporting streamstatus table usage diff --git a/manager/stream.go b/manager/stream.go new file mode 100644 index 00000000..5d04392c --- /dev/null +++ b/manager/stream.go @@ -0,0 +1 @@ +package manager diff --git a/radio.go b/radio.go index c3272485..dab06742 100644 --- a/radio.go +++ b/radio.go @@ -11,6 +11,8 @@ import ( "strconv" "strings" "time" + + "github.com/R-a-dio/valkyrie/util/eventstream" ) // CalculateRequestDelay returns the delay between two requests of a song @@ -218,20 +220,36 @@ type SearchResult struct { TotalHits int } +type SongUpdate struct { + Song + Info SongInfo +} + +type Thread = string + +type Listeners = int64 + type ManagerService interface { + CurrentUser(context.Context) (eventstream.Stream[User], error) + UpdateUser(context.Context, User) error + CurrentSong(context.Context) (eventstream.Stream[*SongUpdate], error) + UpdateSong(context.Context, *SongUpdate) error + CurrentThread(context.Context) (eventstream.Stream[Thread], error) + UpdateThread(context.Context, Thread) error + CurrentListeners(context.Context) (eventstream.Stream[Listeners], error) + UpdateListeners(context.Context, Listeners) error + Status(context.Context) (*Status, error) +} + +func OneOff[T any](ctx context.Context, fn func(context.Context) (eventstream.Stream[T], error)) (T, error) { + s, err := fn(ctx) + if err != nil { + return *new(T), err + } + defer s.Close() - // UpdateUser updates the user that is currently streaming, the display name - // is shown instead of the users name if given. - UpdateUser(ctx context.Context, displayName string, u User) error - // UpdateSong updates the currently playing song information, it only - // requires the Metadata field to be set but others are allowed. - UpdateSong(context.Context, Song, SongInfo) error - // UpdateThread updates the current thread shown on the website and in chat, - // the string can basically be any content to be shown to the users - UpdateThread(ctx context.Context, thread string) error - // UpdateListeners updates the current listener count across all streams - UpdateListeners(context.Context, int) error + return s.Next() } type StreamerService interface { @@ -793,8 +811,8 @@ type SubmissionStatus string // Possible status for song submissions const ( SubmissionAccepted SubmissionStatus = "accepted" - SubmissionDeclined = "declined" - SubmissionAwaitingReview = "awaiting-review" + SubmissionDeclined SubmissionStatus = "declined" + SubmissionAwaitingReview SubmissionStatus = "awaiting-review" ) // PendingSong is a song currently awaiting approval in the pending queue diff --git a/radio_test.go b/radio_test.go index 08cabeca..ed6ade84 100644 --- a/radio_test.go +++ b/radio_test.go @@ -91,7 +91,7 @@ func TestSongRequestable(t *testing.T) { } } -func TestCanRequest(t *testing.T) { +func TestCalculateCooldown(t *testing.T) { tests := []struct { delay time.Duration last time.Time @@ -103,7 +103,7 @@ func TestCanRequest(t *testing.T) { } for _, test := range tests { - d, ok := CanUserRequest(test.delay, test.last) + d, ok := CalculateCooldown(test.delay, test.last) if ok != test.ok { t.Errorf("failed %s on %s, returned: %s", test.last, test.delay, d) } diff --git a/rpc/client.go b/rpc/client.go index 7b76d736..a0f296c0 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -4,6 +4,7 @@ import ( "context" radio "github.com/R-a-dio/valkyrie" + "github.com/R-a-dio/valkyrie/util/eventstream" grpc "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -95,38 +96,58 @@ func (m ManagerClientRPC) Status(ctx context.Context) (*radio.Status, error) { }, nil } +func (m ManagerClientRPC) CurrentUser(ctx context.Context) (eventstream.Stream[radio.User], error) { + c := func(ctx context.Context, e *emptypb.Empty, opts ...grpc.CallOption) (pbReceiver[*User], error) { + return m.rpc.CurrentUser(ctx, e, opts...) + } + return streamFromProtobuf(ctx, c, fromProtoUser) +} + // UpdateUser implements radio.ManagerService -func (m ManagerClientRPC) UpdateUser(ctx context.Context, n string, u radio.User) error { - _, err := m.rpc.SetUser(ctx, &UserUpdate{ - User: toProtoUser(u), - StreamerName: n, - }) +func (m ManagerClientRPC) UpdateUser(ctx context.Context, u radio.User) error { + _, err := m.rpc.UpdateUser(ctx, toProtoUser(u)) return err } +func (m ManagerClientRPC) CurrentSong(ctx context.Context) (eventstream.Stream[*radio.SongUpdate], error) { + c := func(ctx context.Context, e *emptypb.Empty, opts ...grpc.CallOption) (pbReceiver[*SongUpdate], error) { + return m.rpc.CurrentSong(ctx, e, opts...) + } + return streamFromProtobuf(ctx, c, fromProtoSongUpdate) +} + // UpdateSong implements radio.ManagerService -func (m ManagerClientRPC) UpdateSong(ctx context.Context, s radio.Song, i radio.SongInfo) error { - _, err := m.rpc.SetSong(ctx, &SongUpdate{ - Song: toProtoSong(s), - Info: toProtoSongInfo(i), - }) +func (m ManagerClientRPC) UpdateSong(ctx context.Context, u *radio.SongUpdate) error { + _, err := m.rpc.UpdateSong(ctx, toProtoSongUpdate(u)) return err } // UpdateThread implements radio.ManagerService -func (m ManagerClientRPC) UpdateThread(ctx context.Context, thread string) error { - _, err := m.rpc.SetThread(ctx, wrapperspb.String(thread)) +func (m ManagerClientRPC) UpdateThread(ctx context.Context, thread radio.Thread) error { + _, err := m.rpc.UpdateThread(ctx, wrapperspb.String(thread)) return err } +func (m ManagerClientRPC) CurrentThread(ctx context.Context) (eventstream.Stream[radio.Thread], error) { + c := func(ctx context.Context, e *emptypb.Empty, opts ...grpc.CallOption) (pbReceiver[*wrapperspb.StringValue], error) { + return m.rpc.CurrentThread(ctx, e, opts...) + } + return streamFromProtobuf(ctx, c, func(v *wrapperspb.StringValue) radio.Thread { return v.Value }) +} + // UpdateListeners implements radio.ManagerService -func (m ManagerClientRPC) UpdateListeners(ctx context.Context, count int) error { - _, err := m.rpc.SetListenerInfo(ctx, &ListenerInfo{ - Listeners: int64(count), - }) +func (m ManagerClientRPC) UpdateListeners(ctx context.Context, count radio.Listeners) error { + _, err := m.rpc.UpdateListenerCount(ctx, wrapperspb.Int64(count)) return err } +func (m ManagerClientRPC) CurrentListeners(ctx context.Context) (eventstream.Stream[radio.Listeners], error) { + c := func(ctx context.Context, e *emptypb.Empty, opts ...grpc.CallOption) (pbReceiver[*wrapperspb.Int64Value], error) { + return m.rpc.CurrentListenerCount(ctx, e, opts...) + } + return streamFromProtobuf(ctx, c, func(v *wrapperspb.Int64Value) radio.Listeners { return v.Value }) +} + // NewStreamerService returns a new client implementing radio.StreamerService func NewStreamerService(c *grpc.ClientConn) radio.StreamerService { return StreamerClientRPC{ @@ -253,3 +274,44 @@ func (q QueueClientRPC) Entries(ctx context.Context) ([]radio.QueueEntry, error) } return queue, nil } + +type pbCreator[P any] func(context.Context, *emptypb.Empty, ...grpc.CallOption) (pbReceiver[P], error) + +type pbReceiver[P any] interface { + Recv() (P, error) + grpc.ClientStream +} + +type grpcStream[P, T any] struct { + s pbReceiver[P] + conv func(P) T + cancel context.CancelFunc +} + +func (gs *grpcStream[P, T]) Next() (T, error) { + p, err := gs.s.Recv() + if err != nil { + return *new(T), nil + } + return gs.conv(p), nil +} + +func (gs *grpcStream[P, T]) Close() error { + gs.cancel() + return nil +} + +func streamFromProtobuf[P, T any](ctx context.Context, streamFn pbCreator[P], conv func(P) T) (eventstream.Stream[T], error) { + var gs grpcStream[P, T] + var err error + + ctx, gs.cancel = context.WithCancel(ctx) + + gs.s, err = streamFn(ctx, new(emptypb.Empty)) + if err != nil { + return nil, err + } + + gs.conv = conv + return &gs, nil +} diff --git a/rpc/helpers.go b/rpc/helpers.go index f8aa5ce8..7f9b215a 100644 --- a/rpc/helpers.go +++ b/rpc/helpers.go @@ -126,6 +126,20 @@ func fromProtoSongInfo(i *SongInfo) radio.SongInfo { } } +func toProtoSongUpdate(s *radio.SongUpdate) *SongUpdate { + return &SongUpdate{ + Song: toProtoSong(s.Song), + Info: toProtoSongInfo(s.Info), + } +} + +func fromProtoSongUpdate(s *SongUpdate) *radio.SongUpdate { + return &radio.SongUpdate{ + Song: fromProtoSong(s.Song), + Info: fromProtoSongInfo(s.Info), + } +} + func toProtoQueueEntry(entry radio.QueueEntry) *QueueEntry { return &QueueEntry{ Song: toProtoSong(entry.Song), diff --git a/rpc/radio.pb.go b/rpc/radio.pb.go index ef9ae136..c3e3a0dc 100644 --- a/rpc/radio.pb.go +++ b/rpc/radio.pb.go @@ -1612,76 +1612,89 @@ var file_radio_proto_rawDesc = []byte{ 0x69, 0x6f, 0x6e, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x69, 0x6e, 0x66, 0x6f, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x32, 0xf5, 0x02, 0x0a, 0x07, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, + 0x72, 0x72, 0x6f, 0x72, 0x32, 0xca, 0x04, 0x0a, 0x07, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x53, 0x65, 0x74, - 0x55, 0x73, 0x65, 0x72, 0x12, 0x11, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x55, 0x73, 0x65, - 0x72, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, - 0x34, 0x0a, 0x07, 0x53, 0x65, 0x74, 0x53, 0x6f, 0x6e, 0x67, 0x12, 0x11, 0x2e, 0x72, 0x61, 0x64, - 0x69, 0x6f, 0x2e, 0x53, 0x6f, 0x6e, 0x67, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x1a, 0x16, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x42, 0x0a, 0x11, 0x53, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x15, 0x2e, 0x72, 0x61, 0x64, - 0x69, 0x6f, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x41, 0x0a, 0x09, 0x53, 0x65, 0x74, - 0x54, 0x68, 0x72, 0x65, 0x61, 0x64, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, - 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x3e, 0x0a, 0x0f, - 0x53, 0x65, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, - 0x13, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x65, 0x72, - 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x32, 0x97, 0x01, 0x0a, - 0x09, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x72, 0x12, 0x3f, 0x0a, 0x0c, 0x41, 0x6e, - 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x53, 0x6f, 0x6e, 0x67, 0x12, 0x17, 0x2e, 0x72, 0x61, 0x64, - 0x69, 0x6f, 0x2e, 0x53, 0x6f, 0x6e, 0x67, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x6d, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x0b, 0x43, 0x75, 0x72, + 0x72, 0x65, 0x6e, 0x74, 0x53, 0x6f, 0x6e, 0x67, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x1a, 0x11, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, 0x6f, 0x6e, 0x67, 0x55, 0x70, 0x64, + 0x61, 0x74, 0x65, 0x30, 0x01, 0x12, 0x37, 0x0a, 0x0a, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, + 0x6f, 0x6e, 0x67, 0x12, 0x11, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, 0x6f, 0x6e, 0x67, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x47, + 0x0a, 0x0d, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x54, 0x68, 0x72, 0x65, 0x61, 0x64, 0x12, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x30, 0x01, 0x12, 0x44, 0x0a, 0x0c, 0x55, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x54, 0x68, 0x72, 0x65, 0x61, 0x64, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x34, 0x0a, + 0x0b, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x55, 0x73, 0x65, 0x72, 0x12, 0x16, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, + 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0b, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x55, 0x73, 0x65, + 0x72, 0x30, 0x01, 0x12, 0x31, 0x0a, 0x0a, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, + 0x72, 0x12, 0x0b, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x1a, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x4d, 0x0a, 0x14, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6e, + 0x74, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1b, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x49, 0x6e, 0x74, 0x36, 0x34, 0x56, 0x61, + 0x6c, 0x75, 0x65, 0x30, 0x01, 0x12, 0x4a, 0x0a, 0x13, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4c, + 0x69, 0x73, 0x74, 0x65, 0x6e, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1b, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x49, + 0x6e, 0x74, 0x36, 0x34, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x32, 0x97, 0x01, 0x0a, 0x09, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x72, 0x12, + 0x3f, 0x0a, 0x0c, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x53, 0x6f, 0x6e, 0x67, 0x12, + 0x17, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, 0x6f, 0x6e, 0x67, 0x41, 0x6e, 0x6e, 0x6f, + 0x75, 0x6e, 0x63, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x12, 0x49, 0x0a, 0x0f, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x1e, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, 0x6f, 0x6e, 0x67, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x49, 0x0a, 0x0f, 0x41, - 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x32, 0xab, 0x02, 0x0a, 0x08, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x12, 0x38, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x72, + 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x72, 0x61, 0x64, 0x69, + 0x6f, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x04, 0x53, 0x74, 0x6f, 0x70, 0x12, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x42, 0x6f, 0x6f, + 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x17, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x39, 0x0a, 0x0b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x6f, 0x6e, 0x67, 0x12, 0x12, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, 0x6f, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x16, + 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x09, 0x53, 0x65, + 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x15, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x32, 0xab, 0x02, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x65, 0x72, 0x12, 0x38, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x16, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x17, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, - 0x04, 0x53, 0x74, 0x6f, 0x70, 0x12, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, - 0x65, 0x1a, 0x17, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x39, 0x0a, 0x0b, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x6f, 0x6e, 0x67, 0x12, 0x12, 0x2e, 0x72, 0x61, 0x64, 0x69, - 0x6f, 0x2e, 0x53, 0x6f, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, - 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x09, 0x53, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x12, 0x15, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x12, 0x31, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x31, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65, 0x12, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x10, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, + 0x51, 0x75, 0x65, 0x75, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x32, 0xe8, 0x01, 0x0a, 0x05, 0x51, 0x75, + 0x65, 0x75, 0x65, 0x12, 0x37, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x11, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x38, 0x0a, 0x0b, + 0x52, 0x65, 0x73, 0x65, 0x72, 0x76, 0x65, 0x4e, 0x65, 0x78, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x1a, 0x11, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, + 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x37, 0x0a, 0x06, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, + 0x12, 0x11, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x1a, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, + 0x33, 0x0a, 0x07, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x10, 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, - 0x49, 0x6e, 0x66, 0x6f, 0x32, 0xe8, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65, 0x12, 0x37, - 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x11, 0x2e, 0x72, - 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x1a, - 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x38, 0x0a, 0x0b, 0x52, 0x65, 0x73, 0x65, 0x72, - 0x76, 0x65, 0x4e, 0x65, 0x78, 0x74, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x11, - 0x2e, 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x12, 0x37, 0x0a, 0x06, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x12, 0x11, 0x2e, 0x72, 0x61, - 0x64, 0x69, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x1a, 0x1a, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x33, 0x0a, 0x07, 0x45, 0x6e, - 0x74, 0x72, 0x69, 0x65, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x10, 0x2e, - 0x72, 0x61, 0x64, 0x69, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x42, - 0x21, 0x5a, 0x1f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x52, 0x2d, - 0x61, 0x2d, 0x64, 0x69, 0x6f, 0x2f, 0x76, 0x61, 0x6c, 0x6b, 0x79, 0x72, 0x69, 0x65, 0x2f, 0x72, - 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x49, 0x6e, 0x66, 0x6f, 0x42, 0x21, 0x5a, 0x1f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x52, 0x2d, 0x61, 0x2d, 0x64, 0x69, 0x6f, 0x2f, 0x76, 0x61, 0x6c, 0x6b, 0x79, + 0x72, 0x69, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1720,7 +1733,8 @@ var file_radio_proto_goTypes = []interface{}{ (*timestamppb.Timestamp)(nil), // 19: google.protobuf.Timestamp (*emptypb.Empty)(nil), // 20: google.protobuf.Empty (*wrapperspb.StringValue)(nil), // 21: google.protobuf.StringValue - (*wrapperspb.BoolValue)(nil), // 22: google.protobuf.BoolValue + (*wrapperspb.Int64Value)(nil), // 22: google.protobuf.Int64Value + (*wrapperspb.BoolValue)(nil), // 23: google.protobuf.BoolValue } var file_radio_proto_depIdxs = []int32{ 18, // 0: radio.Song.length:type_name -> google.protobuf.Duration @@ -1755,41 +1769,47 @@ var file_radio_proto_depIdxs = []int32{ 17, // 29: radio.RequestResponse.error:type_name -> radio.Error 18, // 30: radio.Error.delay:type_name -> google.protobuf.Duration 20, // 31: radio.Manager.Status:input_type -> google.protobuf.Empty - 5, // 32: radio.Manager.SetUser:input_type -> radio.UserUpdate - 2, // 33: radio.Manager.SetSong:input_type -> radio.SongUpdate - 4, // 34: radio.Manager.SetStreamerConfig:input_type -> radio.StreamerConfig - 21, // 35: radio.Manager.SetThread:input_type -> google.protobuf.StringValue - 9, // 36: radio.Manager.SetListenerInfo:input_type -> radio.ListenerInfo - 10, // 37: radio.Announcer.AnnounceSong:input_type -> radio.SongAnnouncement - 11, // 38: radio.Announcer.AnnounceRequest:input_type -> radio.SongRequestAnnouncement - 20, // 39: radio.Streamer.Start:input_type -> google.protobuf.Empty - 22, // 40: radio.Streamer.Stop:input_type -> google.protobuf.BoolValue - 15, // 41: radio.Streamer.RequestSong:input_type -> radio.SongRequest - 4, // 42: radio.Streamer.SetConfig:input_type -> radio.StreamerConfig - 20, // 43: radio.Streamer.Queue:input_type -> google.protobuf.Empty - 13, // 44: radio.Queue.AddRequest:input_type -> radio.QueueEntry - 20, // 45: radio.Queue.ReserveNext:input_type -> google.protobuf.Empty - 13, // 46: radio.Queue.Remove:input_type -> radio.QueueEntry - 20, // 47: radio.Queue.Entries:input_type -> google.protobuf.Empty - 1, // 48: radio.Manager.Status:output_type -> radio.StatusResponse - 20, // 49: radio.Manager.SetUser:output_type -> google.protobuf.Empty - 20, // 50: radio.Manager.SetSong:output_type -> google.protobuf.Empty - 20, // 51: radio.Manager.SetStreamerConfig:output_type -> google.protobuf.Empty - 20, // 52: radio.Manager.SetThread:output_type -> google.protobuf.Empty - 20, // 53: radio.Manager.SetListenerInfo:output_type -> google.protobuf.Empty - 20, // 54: radio.Announcer.AnnounceSong:output_type -> google.protobuf.Empty - 20, // 55: radio.Announcer.AnnounceRequest:output_type -> google.protobuf.Empty - 12, // 56: radio.Streamer.Start:output_type -> radio.StreamerResponse - 12, // 57: radio.Streamer.Stop:output_type -> radio.StreamerResponse - 16, // 58: radio.Streamer.RequestSong:output_type -> radio.RequestResponse - 20, // 59: radio.Streamer.SetConfig:output_type -> google.protobuf.Empty - 14, // 60: radio.Streamer.Queue:output_type -> radio.QueueInfo - 20, // 61: radio.Queue.AddRequest:output_type -> google.protobuf.Empty - 13, // 62: radio.Queue.ReserveNext:output_type -> radio.QueueEntry - 22, // 63: radio.Queue.Remove:output_type -> google.protobuf.BoolValue - 14, // 64: radio.Queue.Entries:output_type -> radio.QueueInfo - 48, // [48:65] is the sub-list for method output_type - 31, // [31:48] is the sub-list for method input_type + 20, // 32: radio.Manager.CurrentSong:input_type -> google.protobuf.Empty + 2, // 33: radio.Manager.UpdateSong:input_type -> radio.SongUpdate + 20, // 34: radio.Manager.CurrentThread:input_type -> google.protobuf.Empty + 21, // 35: radio.Manager.UpdateThread:input_type -> google.protobuf.StringValue + 20, // 36: radio.Manager.CurrentUser:input_type -> google.protobuf.Empty + 6, // 37: radio.Manager.UpdateUser:input_type -> radio.User + 20, // 38: radio.Manager.CurrentListenerCount:input_type -> google.protobuf.Empty + 22, // 39: radio.Manager.UpdateListenerCount:input_type -> google.protobuf.Int64Value + 10, // 40: radio.Announcer.AnnounceSong:input_type -> radio.SongAnnouncement + 11, // 41: radio.Announcer.AnnounceRequest:input_type -> radio.SongRequestAnnouncement + 20, // 42: radio.Streamer.Start:input_type -> google.protobuf.Empty + 23, // 43: radio.Streamer.Stop:input_type -> google.protobuf.BoolValue + 15, // 44: radio.Streamer.RequestSong:input_type -> radio.SongRequest + 4, // 45: radio.Streamer.SetConfig:input_type -> radio.StreamerConfig + 20, // 46: radio.Streamer.Queue:input_type -> google.protobuf.Empty + 13, // 47: radio.Queue.AddRequest:input_type -> radio.QueueEntry + 20, // 48: radio.Queue.ReserveNext:input_type -> google.protobuf.Empty + 13, // 49: radio.Queue.Remove:input_type -> radio.QueueEntry + 20, // 50: radio.Queue.Entries:input_type -> google.protobuf.Empty + 1, // 51: radio.Manager.Status:output_type -> radio.StatusResponse + 2, // 52: radio.Manager.CurrentSong:output_type -> radio.SongUpdate + 20, // 53: radio.Manager.UpdateSong:output_type -> google.protobuf.Empty + 21, // 54: radio.Manager.CurrentThread:output_type -> google.protobuf.StringValue + 20, // 55: radio.Manager.UpdateThread:output_type -> google.protobuf.Empty + 6, // 56: radio.Manager.CurrentUser:output_type -> radio.User + 20, // 57: radio.Manager.UpdateUser:output_type -> google.protobuf.Empty + 22, // 58: radio.Manager.CurrentListenerCount:output_type -> google.protobuf.Int64Value + 20, // 59: radio.Manager.UpdateListenerCount:output_type -> google.protobuf.Empty + 20, // 60: radio.Announcer.AnnounceSong:output_type -> google.protobuf.Empty + 20, // 61: radio.Announcer.AnnounceRequest:output_type -> google.protobuf.Empty + 12, // 62: radio.Streamer.Start:output_type -> radio.StreamerResponse + 12, // 63: radio.Streamer.Stop:output_type -> radio.StreamerResponse + 16, // 64: radio.Streamer.RequestSong:output_type -> radio.RequestResponse + 20, // 65: radio.Streamer.SetConfig:output_type -> google.protobuf.Empty + 14, // 66: radio.Streamer.Queue:output_type -> radio.QueueInfo + 20, // 67: radio.Queue.AddRequest:output_type -> google.protobuf.Empty + 13, // 68: radio.Queue.ReserveNext:output_type -> radio.QueueEntry + 23, // 69: radio.Queue.Remove:output_type -> google.protobuf.BoolValue + 14, // 70: radio.Queue.Entries:output_type -> radio.QueueInfo + 51, // [51:71] is the sub-list for method output_type + 31, // [31:51] is the sub-list for method input_type 31, // [31:31] is the sub-list for extension type_name 31, // [31:31] is the sub-list for extension extendee 0, // [0:31] is the sub-list for field type_name diff --git a/rpc/radio.proto b/rpc/radio.proto index f55b1b20..7d04735e 100644 --- a/rpc/radio.proto +++ b/rpc/radio.proto @@ -40,13 +40,16 @@ message Song { } service Manager { - // Status returns the current status rpc Status(google.protobuf.Empty) returns (StatusResponse); - rpc SetUser(UserUpdate) returns (google.protobuf.Empty); - rpc SetSong(SongUpdate) returns (google.protobuf.Empty); - rpc SetStreamerConfig(StreamerConfig) returns (google.protobuf.Empty); - rpc SetThread(google.protobuf.StringValue) returns (google.protobuf.Empty); - rpc SetListenerInfo(ListenerInfo) returns (google.protobuf.Empty); + + rpc CurrentSong(google.protobuf.Empty) returns (stream SongUpdate); + rpc UpdateSong(SongUpdate) returns (google.protobuf.Empty); + rpc CurrentThread(google.protobuf.Empty) returns (stream google.protobuf.StringValue); + rpc UpdateThread(google.protobuf.StringValue) returns (google.protobuf.Empty); + rpc CurrentUser(google.protobuf.Empty) returns (stream User); + rpc UpdateUser(User) returns (google.protobuf.Empty); + rpc CurrentListenerCount(google.protobuf.Empty) returns (stream google.protobuf.Int64Value); + rpc UpdateListenerCount(google.protobuf.Int64Value) returns (google.protobuf.Empty); } message StatusResponse { diff --git a/rpc/radio_grpc.pb.go b/rpc/radio_grpc.pb.go index 2c1e0444..bdbfe837 100644 --- a/rpc/radio_grpc.pb.go +++ b/rpc/radio_grpc.pb.go @@ -21,25 +21,30 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - Manager_Status_FullMethodName = "/radio.Manager/Status" - Manager_SetUser_FullMethodName = "/radio.Manager/SetUser" - Manager_SetSong_FullMethodName = "/radio.Manager/SetSong" - Manager_SetStreamerConfig_FullMethodName = "/radio.Manager/SetStreamerConfig" - Manager_SetThread_FullMethodName = "/radio.Manager/SetThread" - Manager_SetListenerInfo_FullMethodName = "/radio.Manager/SetListenerInfo" + Manager_Status_FullMethodName = "/radio.Manager/Status" + Manager_CurrentSong_FullMethodName = "/radio.Manager/CurrentSong" + Manager_UpdateSong_FullMethodName = "/radio.Manager/UpdateSong" + Manager_CurrentThread_FullMethodName = "/radio.Manager/CurrentThread" + Manager_UpdateThread_FullMethodName = "/radio.Manager/UpdateThread" + Manager_CurrentUser_FullMethodName = "/radio.Manager/CurrentUser" + Manager_UpdateUser_FullMethodName = "/radio.Manager/UpdateUser" + Manager_CurrentListenerCount_FullMethodName = "/radio.Manager/CurrentListenerCount" + Manager_UpdateListenerCount_FullMethodName = "/radio.Manager/UpdateListenerCount" ) // ManagerClient is the client API for Manager service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type ManagerClient interface { - // Status returns the current status Status(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*StatusResponse, error) - SetUser(ctx context.Context, in *UserUpdate, opts ...grpc.CallOption) (*emptypb.Empty, error) - SetSong(ctx context.Context, in *SongUpdate, opts ...grpc.CallOption) (*emptypb.Empty, error) - SetStreamerConfig(ctx context.Context, in *StreamerConfig, opts ...grpc.CallOption) (*emptypb.Empty, error) - SetThread(ctx context.Context, in *wrapperspb.StringValue, opts ...grpc.CallOption) (*emptypb.Empty, error) - SetListenerInfo(ctx context.Context, in *ListenerInfo, opts ...grpc.CallOption) (*emptypb.Empty, error) + CurrentSong(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Manager_CurrentSongClient, error) + UpdateSong(ctx context.Context, in *SongUpdate, opts ...grpc.CallOption) (*emptypb.Empty, error) + CurrentThread(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Manager_CurrentThreadClient, error) + UpdateThread(ctx context.Context, in *wrapperspb.StringValue, opts ...grpc.CallOption) (*emptypb.Empty, error) + CurrentUser(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Manager_CurrentUserClient, error) + UpdateUser(ctx context.Context, in *User, opts ...grpc.CallOption) (*emptypb.Empty, error) + CurrentListenerCount(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Manager_CurrentListenerCountClient, error) + UpdateListenerCount(ctx context.Context, in *wrapperspb.Int64Value, opts ...grpc.CallOption) (*emptypb.Empty, error) } type managerClient struct { @@ -59,45 +64,164 @@ func (c *managerClient) Status(ctx context.Context, in *emptypb.Empty, opts ...g return out, nil } -func (c *managerClient) SetUser(ctx context.Context, in *UserUpdate, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, Manager_SetUser_FullMethodName, in, out, opts...) +func (c *managerClient) CurrentSong(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Manager_CurrentSongClient, error) { + stream, err := c.cc.NewStream(ctx, &Manager_ServiceDesc.Streams[0], Manager_CurrentSong_FullMethodName, opts...) if err != nil { return nil, err } - return out, nil + x := &managerCurrentSongClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Manager_CurrentSongClient interface { + Recv() (*SongUpdate, error) + grpc.ClientStream } -func (c *managerClient) SetSong(ctx context.Context, in *SongUpdate, opts ...grpc.CallOption) (*emptypb.Empty, error) { +type managerCurrentSongClient struct { + grpc.ClientStream +} + +func (x *managerCurrentSongClient) Recv() (*SongUpdate, error) { + m := new(SongUpdate) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *managerClient) UpdateSong(ctx context.Context, in *SongUpdate, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, Manager_SetSong_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Manager_UpdateSong_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *managerClient) SetStreamerConfig(ctx context.Context, in *StreamerConfig, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *managerClient) CurrentThread(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Manager_CurrentThreadClient, error) { + stream, err := c.cc.NewStream(ctx, &Manager_ServiceDesc.Streams[1], Manager_CurrentThread_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &managerCurrentThreadClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Manager_CurrentThreadClient interface { + Recv() (*wrapperspb.StringValue, error) + grpc.ClientStream +} + +type managerCurrentThreadClient struct { + grpc.ClientStream +} + +func (x *managerCurrentThreadClient) Recv() (*wrapperspb.StringValue, error) { + m := new(wrapperspb.StringValue) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *managerClient) UpdateThread(ctx context.Context, in *wrapperspb.StringValue, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, Manager_SetStreamerConfig_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Manager_UpdateThread_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *managerClient) SetThread(ctx context.Context, in *wrapperspb.StringValue, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *managerClient) CurrentUser(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Manager_CurrentUserClient, error) { + stream, err := c.cc.NewStream(ctx, &Manager_ServiceDesc.Streams[2], Manager_CurrentUser_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &managerCurrentUserClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Manager_CurrentUserClient interface { + Recv() (*User, error) + grpc.ClientStream +} + +type managerCurrentUserClient struct { + grpc.ClientStream +} + +func (x *managerCurrentUserClient) Recv() (*User, error) { + m := new(User) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *managerClient) UpdateUser(ctx context.Context, in *User, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, Manager_SetThread_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Manager_UpdateUser_FullMethodName, in, out, opts...) if err != nil { return nil, err } return out, nil } -func (c *managerClient) SetListenerInfo(ctx context.Context, in *ListenerInfo, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *managerClient) CurrentListenerCount(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Manager_CurrentListenerCountClient, error) { + stream, err := c.cc.NewStream(ctx, &Manager_ServiceDesc.Streams[3], Manager_CurrentListenerCount_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &managerCurrentListenerCountClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Manager_CurrentListenerCountClient interface { + Recv() (*wrapperspb.Int64Value, error) + grpc.ClientStream +} + +type managerCurrentListenerCountClient struct { + grpc.ClientStream +} + +func (x *managerCurrentListenerCountClient) Recv() (*wrapperspb.Int64Value, error) { + m := new(wrapperspb.Int64Value) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *managerClient) UpdateListenerCount(ctx context.Context, in *wrapperspb.Int64Value, opts ...grpc.CallOption) (*emptypb.Empty, error) { out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, Manager_SetListenerInfo_FullMethodName, in, out, opts...) + err := c.cc.Invoke(ctx, Manager_UpdateListenerCount_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -108,13 +232,15 @@ func (c *managerClient) SetListenerInfo(ctx context.Context, in *ListenerInfo, o // All implementations must embed UnimplementedManagerServer // for forward compatibility type ManagerServer interface { - // Status returns the current status Status(context.Context, *emptypb.Empty) (*StatusResponse, error) - SetUser(context.Context, *UserUpdate) (*emptypb.Empty, error) - SetSong(context.Context, *SongUpdate) (*emptypb.Empty, error) - SetStreamerConfig(context.Context, *StreamerConfig) (*emptypb.Empty, error) - SetThread(context.Context, *wrapperspb.StringValue) (*emptypb.Empty, error) - SetListenerInfo(context.Context, *ListenerInfo) (*emptypb.Empty, error) + CurrentSong(*emptypb.Empty, Manager_CurrentSongServer) error + UpdateSong(context.Context, *SongUpdate) (*emptypb.Empty, error) + CurrentThread(*emptypb.Empty, Manager_CurrentThreadServer) error + UpdateThread(context.Context, *wrapperspb.StringValue) (*emptypb.Empty, error) + CurrentUser(*emptypb.Empty, Manager_CurrentUserServer) error + UpdateUser(context.Context, *User) (*emptypb.Empty, error) + CurrentListenerCount(*emptypb.Empty, Manager_CurrentListenerCountServer) error + UpdateListenerCount(context.Context, *wrapperspb.Int64Value) (*emptypb.Empty, error) mustEmbedUnimplementedManagerServer() } @@ -125,20 +251,29 @@ type UnimplementedManagerServer struct { func (UnimplementedManagerServer) Status(context.Context, *emptypb.Empty) (*StatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") } -func (UnimplementedManagerServer) SetUser(context.Context, *UserUpdate) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method SetUser not implemented") +func (UnimplementedManagerServer) CurrentSong(*emptypb.Empty, Manager_CurrentSongServer) error { + return status.Errorf(codes.Unimplemented, "method CurrentSong not implemented") } -func (UnimplementedManagerServer) SetSong(context.Context, *SongUpdate) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method SetSong not implemented") +func (UnimplementedManagerServer) UpdateSong(context.Context, *SongUpdate) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateSong not implemented") } -func (UnimplementedManagerServer) SetStreamerConfig(context.Context, *StreamerConfig) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method SetStreamerConfig not implemented") +func (UnimplementedManagerServer) CurrentThread(*emptypb.Empty, Manager_CurrentThreadServer) error { + return status.Errorf(codes.Unimplemented, "method CurrentThread not implemented") } -func (UnimplementedManagerServer) SetThread(context.Context, *wrapperspb.StringValue) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method SetThread not implemented") +func (UnimplementedManagerServer) UpdateThread(context.Context, *wrapperspb.StringValue) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateThread not implemented") } -func (UnimplementedManagerServer) SetListenerInfo(context.Context, *ListenerInfo) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method SetListenerInfo not implemented") +func (UnimplementedManagerServer) CurrentUser(*emptypb.Empty, Manager_CurrentUserServer) error { + return status.Errorf(codes.Unimplemented, "method CurrentUser not implemented") +} +func (UnimplementedManagerServer) UpdateUser(context.Context, *User) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateUser not implemented") +} +func (UnimplementedManagerServer) CurrentListenerCount(*emptypb.Empty, Manager_CurrentListenerCountServer) error { + return status.Errorf(codes.Unimplemented, "method CurrentListenerCount not implemented") +} +func (UnimplementedManagerServer) UpdateListenerCount(context.Context, *wrapperspb.Int64Value) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateListenerCount not implemented") } func (UnimplementedManagerServer) mustEmbedUnimplementedManagerServer() {} @@ -171,92 +306,158 @@ func _Manager_Status_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } -func _Manager_SetUser_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(UserUpdate) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ManagerServer).SetUser(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Manager_SetUser_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ManagerServer).SetUser(ctx, req.(*UserUpdate)) +func _Manager_CurrentSong_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return err } - return interceptor(ctx, in, info, handler) + return srv.(ManagerServer).CurrentSong(m, &managerCurrentSongServer{stream}) +} + +type Manager_CurrentSongServer interface { + Send(*SongUpdate) error + grpc.ServerStream +} + +type managerCurrentSongServer struct { + grpc.ServerStream +} + +func (x *managerCurrentSongServer) Send(m *SongUpdate) error { + return x.ServerStream.SendMsg(m) } -func _Manager_SetSong_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _Manager_UpdateSong_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(SongUpdate) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ManagerServer).SetSong(ctx, in) + return srv.(ManagerServer).UpdateSong(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Manager_SetSong_FullMethodName, + FullMethod: Manager_UpdateSong_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ManagerServer).SetSong(ctx, req.(*SongUpdate)) + return srv.(ManagerServer).UpdateSong(ctx, req.(*SongUpdate)) } return interceptor(ctx, in, info, handler) } -func _Manager_SetStreamerConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StreamerConfig) +func _Manager_CurrentThread_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ManagerServer).CurrentThread(m, &managerCurrentThreadServer{stream}) +} + +type Manager_CurrentThreadServer interface { + Send(*wrapperspb.StringValue) error + grpc.ServerStream +} + +type managerCurrentThreadServer struct { + grpc.ServerStream +} + +func (x *managerCurrentThreadServer) Send(m *wrapperspb.StringValue) error { + return x.ServerStream.SendMsg(m) +} + +func _Manager_UpdateThread_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(wrapperspb.StringValue) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ManagerServer).SetStreamerConfig(ctx, in) + return srv.(ManagerServer).UpdateThread(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Manager_SetStreamerConfig_FullMethodName, + FullMethod: Manager_UpdateThread_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ManagerServer).SetStreamerConfig(ctx, req.(*StreamerConfig)) + return srv.(ManagerServer).UpdateThread(ctx, req.(*wrapperspb.StringValue)) } return interceptor(ctx, in, info, handler) } -func _Manager_SetThread_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(wrapperspb.StringValue) +func _Manager_CurrentUser_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ManagerServer).CurrentUser(m, &managerCurrentUserServer{stream}) +} + +type Manager_CurrentUserServer interface { + Send(*User) error + grpc.ServerStream +} + +type managerCurrentUserServer struct { + grpc.ServerStream +} + +func (x *managerCurrentUserServer) Send(m *User) error { + return x.ServerStream.SendMsg(m) +} + +func _Manager_UpdateUser_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(User) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ManagerServer).SetThread(ctx, in) + return srv.(ManagerServer).UpdateUser(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Manager_SetThread_FullMethodName, + FullMethod: Manager_UpdateUser_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ManagerServer).SetThread(ctx, req.(*wrapperspb.StringValue)) + return srv.(ManagerServer).UpdateUser(ctx, req.(*User)) } return interceptor(ctx, in, info, handler) } -func _Manager_SetListenerInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ListenerInfo) +func _Manager_CurrentListenerCount_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ManagerServer).CurrentListenerCount(m, &managerCurrentListenerCountServer{stream}) +} + +type Manager_CurrentListenerCountServer interface { + Send(*wrapperspb.Int64Value) error + grpc.ServerStream +} + +type managerCurrentListenerCountServer struct { + grpc.ServerStream +} + +func (x *managerCurrentListenerCountServer) Send(m *wrapperspb.Int64Value) error { + return x.ServerStream.SendMsg(m) +} + +func _Manager_UpdateListenerCount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(wrapperspb.Int64Value) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ManagerServer).SetListenerInfo(ctx, in) + return srv.(ManagerServer).UpdateListenerCount(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Manager_SetListenerInfo_FullMethodName, + FullMethod: Manager_UpdateListenerCount_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ManagerServer).SetListenerInfo(ctx, req.(*ListenerInfo)) + return srv.(ManagerServer).UpdateListenerCount(ctx, req.(*wrapperspb.Int64Value)) } return interceptor(ctx, in, info, handler) } @@ -273,27 +474,44 @@ var Manager_ServiceDesc = grpc.ServiceDesc{ Handler: _Manager_Status_Handler, }, { - MethodName: "SetUser", - Handler: _Manager_SetUser_Handler, + MethodName: "UpdateSong", + Handler: _Manager_UpdateSong_Handler, }, { - MethodName: "SetSong", - Handler: _Manager_SetSong_Handler, + MethodName: "UpdateThread", + Handler: _Manager_UpdateThread_Handler, }, { - MethodName: "SetStreamerConfig", - Handler: _Manager_SetStreamerConfig_Handler, + MethodName: "UpdateUser", + Handler: _Manager_UpdateUser_Handler, }, { - MethodName: "SetThread", - Handler: _Manager_SetThread_Handler, + MethodName: "UpdateListenerCount", + Handler: _Manager_UpdateListenerCount_Handler, }, + }, + Streams: []grpc.StreamDesc{ { - MethodName: "SetListenerInfo", - Handler: _Manager_SetListenerInfo_Handler, + StreamName: "CurrentSong", + Handler: _Manager_CurrentSong_Handler, + ServerStreams: true, + }, + { + StreamName: "CurrentThread", + Handler: _Manager_CurrentThread_Handler, + ServerStreams: true, + }, + { + StreamName: "CurrentUser", + Handler: _Manager_CurrentUser_Handler, + ServerStreams: true, + }, + { + StreamName: "CurrentListenerCount", + Handler: _Manager_CurrentListenerCount_Handler, + ServerStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "radio.proto", } diff --git a/rpc/server.go b/rpc/server.go index e897e249..0913659b 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -4,6 +4,8 @@ import ( "context" radio "github.com/R-a-dio/valkyrie" + "github.com/R-a-dio/valkyrie/util/eventstream" + grpc "google.golang.org/grpc" emptypb "google.golang.org/protobuf/types/known/emptypb" wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -72,33 +74,65 @@ func (m ManagerShim) Status(ctx context.Context, _ *emptypb.Empty) (*StatusRespo }, nil } -// SetUser implements Manager -func (m ManagerShim) SetUser(ctx context.Context, u *UserUpdate) (*emptypb.Empty, error) { - err := m.manager.UpdateUser(ctx, u.StreamerName, fromProtoUser(u.User)) - return new(emptypb.Empty), err +type pbSender[P any] interface { + Send(P) error + grpc.ServerStream +} + +func streamToProtobuf[T any, P any](s pbSender[P], streamFn func(context.Context) (eventstream.Stream[T], error), conv func(T) P) error { + ctx, cancel := context.WithCancel(s.Context()) + defer cancel() + + stream, err := streamFn(ctx) + if err != nil { + return err + } + + for { + recv, err := stream.Next() + if err != nil { + return err + } + err = s.Send(conv(recv)) + if err != nil { + return err + } + } +} + +func (sm ManagerShim) CurrentSong(_ *emptypb.Empty, s Manager_CurrentSongServer) error { + return streamToProtobuf(s, sm.manager.CurrentSong, toProtoSongUpdate) } -// SetSong implements Manager -func (m ManagerShim) SetSong(ctx context.Context, su *SongUpdate) (*emptypb.Empty, error) { - err := m.manager.UpdateSong(ctx, fromProtoSong(su.Song), fromProtoSongInfo(su.Info)) +func (m ManagerShim) UpdateSong(ctx context.Context, su *SongUpdate) (*emptypb.Empty, error) { + err := m.manager.UpdateSong(ctx, fromProtoSongUpdate(su)) return new(emptypb.Empty), err } -// SetStreamerConfig implements Manager -func (m ManagerShim) SetStreamerConfig(ctx context.Context, c *StreamerConfig) (*emptypb.Empty, error) { - // TODO: implement this - return new(emptypb.Empty), nil +func (sm ManagerShim) CurrentThread(_ *emptypb.Empty, s Manager_CurrentThreadServer) error { + return streamToProtobuf(s, sm.manager.CurrentThread, wrapperspb.String) } -// SetThread implements Manager -func (m ManagerShim) SetThread(ctx context.Context, t *wrapperspb.StringValue) (*emptypb.Empty, error) { +func (m ManagerShim) UpdateThread(ctx context.Context, t *wrapperspb.StringValue) (*emptypb.Empty, error) { err := m.manager.UpdateThread(ctx, t.Value) return new(emptypb.Empty), err } -// SetListenerInfo implements Manager -func (m ManagerShim) SetListenerInfo(ctx context.Context, i *ListenerInfo) (*emptypb.Empty, error) { - err := m.manager.UpdateListeners(ctx, int(i.Listeners)) +func (sm ManagerShim) CurrentUser(_ *emptypb.Empty, s Manager_CurrentUserServer) error { + return streamToProtobuf(s, sm.manager.CurrentUser, toProtoUser) +} + +func (m ManagerShim) UpdateUser(ctx context.Context, u *User) (*emptypb.Empty, error) { + err := m.manager.UpdateUser(ctx, fromProtoUser(u)) + return new(emptypb.Empty), err +} + +func (sm ManagerShim) CurrentListenerCount(_ *emptypb.Empty, s Manager_CurrentListenerCountServer) error { + return streamToProtobuf(s, sm.manager.CurrentListeners, wrapperspb.Int64) +} + +func (m ManagerShim) UpdateListenerCount(ctx context.Context, i *wrapperspb.Int64Value) (*emptypb.Empty, error) { + err := m.manager.UpdateListeners(ctx, i.Value) return new(emptypb.Empty), err } diff --git a/templates/default/home.tmpl b/templates/default/home.tmpl index 43c2fff4..796deefa 100644 --- a/templates/default/home.tmpl +++ b/templates/default/home.tmpl @@ -1,5 +1,5 @@ {{define "content"}} -
+
@@ -22,7 +22,7 @@

-

{{.Status.Song.Metadata}}

+

{{.Status.Song.Metadata}}

{{if .Status.Song.DatabaseTrack}}{{.Status.Song.Tags}}{{end}}

diff --git a/templates/default/partials/head.tmpl b/templates/default/partials/head.tmpl index 2e7fd483..f429834c 100644 --- a/templates/default/partials/head.tmpl +++ b/templates/default/partials/head.tmpl @@ -5,6 +5,7 @@ + {{template "styles"}} {{end}} {{define "title"}}R/a/dio{{end}} diff --git a/website/api/php/api.go b/website/api/php/api.go index 6058dc25..3888b8b5 100644 --- a/website/api/php/api.go +++ b/website/api/php/api.go @@ -296,7 +296,7 @@ func (a *API) getCanRequest(w http.ResponseWriter, r *http.Request) { // but not if an error occured if err != nil { // TODO: handle error - http.Error(w, http.StatusText(501), 501) + http.Error(w, http.StatusText(http.StatusNotImplemented), http.StatusNotImplemented) return } err := json.NewEncoder(w).Encode(response) @@ -325,7 +325,6 @@ func (a *API) getCanRequest(w http.ResponseWriter, r *http.Request) { } response.Main.Requests = true - return } type canRequestResponse struct { @@ -343,7 +342,6 @@ func (a *API) getDJImage(w http.ResponseWriter, r *http.Request) { user, ok := middleware.GetUser(ctx) if !ok { panic("missing UserByDJIDCtx middleware") - return } filename := filepath.Join(a.Conf().Website.DJImagePath, user.DJ.ID.String()) @@ -408,8 +406,6 @@ func (a *API) postRequest(w http.ResponseWriter, r *http.Request) { } } -type requestResponse map[string]string - func newV0Status(ctx context.Context, storage radio.SongStorageService, streamer radio.StreamerService, manager radio.ManagerService) (*v0Status, error) { diff --git a/website/api/v1/router.go b/website/api/v1/router.go new file mode 100644 index 00000000..e1a4ca47 --- /dev/null +++ b/website/api/v1/router.go @@ -0,0 +1,68 @@ +package v1 + +import ( + "context" + "log" + "time" + + radio "github.com/R-a-dio/valkyrie" + "github.com/R-a-dio/valkyrie/config" + "github.com/R-a-dio/valkyrie/util/eventstream" + "github.com/go-chi/chi/v5" +) + +func NewAPI(ctx context.Context, cfg config.Config) (*API, error) { + api := &API{ + Context: ctx, + Config: cfg, + sse: NewStream(), + } + + go func() { + defer api.sse.Shutdown() + + m := cfg.Conf().Manager.Client() + + var s eventstream.Stream[*radio.SongUpdate] + var err error + for { + s, err = m.CurrentSong(ctx) + if err == nil { + break + } + + log.Println("v1/api:setup:", err) + time.Sleep(time.Second * 3) + } + + for { + us, err := s.Next() + if err != nil { + log.Println("v1/api:loop:", err) + break + } + if us == nil { + log.Println("v1/api:loop: nil value") + continue + } + + log.Println("v1/api:sending:", us.Metadata) + api.sse.SendEvent(EventMetadata, []byte(us.Metadata)) + } + }() + + return api, nil +} + +type API struct { + Context context.Context + Config config.Config + sse *Stream +} + +func (a *API) Router() chi.Router { + r := chi.NewRouter() + + r.Get("/sse", a.sse.ServeHTTP) + return r +} diff --git a/website/api/sse.go b/website/api/v1/sse.go similarity index 61% rename from website/api/sse.go rename to website/api/v1/sse.go index 5a3552b7..290625ba 100644 --- a/website/api/sse.go +++ b/website/api/v1/sse.go @@ -1,10 +1,12 @@ -package api +package v1 import ( "fmt" + "log" "maps" "net/http" "sync" + "time" ) const ( @@ -16,8 +18,7 @@ const ( ) const ( - EVENT_COUNT = 4 - + EventPing = "ping" EventMetadata = "metadata" EventStreamer = "streamer" EventQueue = "queue" @@ -34,28 +35,29 @@ type Stream struct { // mu guards last mu *sync.RWMutex last map[EventName]message + // shutdown indicator + shutdownCh chan struct{} } func NewStream() *Stream { s := &Stream{ - reqs: make(chan request), - subs: make([]chan message, 0, 128), - mu: new(sync.RWMutex), - last: make(map[EventName]message), + reqs: make(chan request), + subs: make([]chan message, 0, 128), + mu: new(sync.RWMutex), + last: make(map[EventName]message), + shutdownCh: make(chan struct{}), } - s.run() + go s.run() + go s.ping() return s } // ServeHTTP implements http.Handler where each client gets send all SSE events that // occur after connecting. There is no history. func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { - flusher, ok := w.(http.Flusher) - if !ok { - http.Error(w, http.StatusText(http.StatusNotImplemented), http.StatusNotImplemented) - return - } + controller := http.NewResponseController(w) + log.Println("sse: subscribing") ch := s.sub() defer func() { s.leave(ch) @@ -65,32 +67,50 @@ func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) { // send events that have already happened, one for each event so that // we're certain the page is current + log.Println("sse: cloning initial") s.mu.RLock() init := maps.Clone(s.last) s.mu.RUnlock() + for _, m := range init { + log.Println("sending initial event:", string(m)) if _, err := w.Write(m); err != nil { return } } - flusher.Flush() + controller.Flush() + log.Println("sse: starting loop") for m := range ch { if _, err := w.Write(m); err != nil { return } - flusher.Flush() + controller.Flush() } } // SendEvent sends an SSE event with the data given. func (s *Stream) SendEvent(event EventName, data []byte) { m := newMessage(event, data) - s.mu.Lock() - s.last[event] = m - s.mu.Unlock() - s.reqs <- request{cmd: SEND, m: m} + select { + case s.reqs <- request{cmd: SEND, m: m, e: event}: + case <-s.shutdownCh: + } +} + +func (s *Stream) ping() { + t := time.NewTicker(time.Second * 30) + defer t.Stop() + + for range t.C { + s.SendEvent(EventPing, []byte("ping")) + select { + case <-s.shutdownCh: + return + default: + } + } } func (s *Stream) run() { @@ -109,6 +129,10 @@ func (s *Stream) run() { } } case SEND: + s.mu.Lock() + s.last[req.e] = req.m + s.mu.Unlock() + for _, ch := range s.subs { select { case ch <- req.m: @@ -116,36 +140,49 @@ func (s *Stream) run() { } } case SHUTDOWN: + close(s.shutdownCh) for _, ch := range s.subs { close(ch) - s.subs = s.subs[:0] } + return } } } func (s *Stream) sub() chan message { - ch := make(chan message) - s.reqs <- request{cmd: SUBSCRIBE, ch: ch} + ch := make(chan message, 2) + select { + case s.reqs <- request{cmd: SUBSCRIBE, ch: ch}: + case <-s.shutdownCh: + close(ch) + } return ch } func (s *Stream) leave(ch chan message) { - s.reqs <- request{cmd: LEAVE, ch: ch} + select { + case s.reqs <- request{cmd: LEAVE, ch: ch}: + case <-s.shutdownCh: + } } // Shutdown disconnects all connected clients func (s *Stream) Shutdown() { - s.reqs <- request{cmd: SHUTDOWN} + select { + case s.reqs <- request{cmd: SHUTDOWN}: + case <-s.shutdownCh: + } } type request struct { - cmd string - ch chan message - m message + cmd string // required + ch chan message // SUB/LEAVE only + m message // SEND only + e EventName // SEND only } func newMessage(event EventName, data []byte) message { + // TODO: handle newlines in data return message(fmt.Sprintf("event: %s\ndata: %s\n\n", event, data)) } diff --git a/website/main.go b/website/main.go index a72f67f7..c370646e 100644 --- a/website/main.go +++ b/website/main.go @@ -12,6 +12,7 @@ import ( "github.com/R-a-dio/valkyrie/templates" "github.com/R-a-dio/valkyrie/website/admin" phpapi "github.com/R-a-dio/valkyrie/website/api/php" + v1 "github.com/R-a-dio/valkyrie/website/api/v1" vmiddleware "github.com/R-a-dio/valkyrie/website/middleware" "github.com/R-a-dio/valkyrie/website/public" @@ -70,6 +71,7 @@ func Execute(ctx context.Context, cfg config.Config) error { // version 0 of the api (the legacy PHP version) // it's mostly self-contained to the /api/* route, except for /request that // leaked out at some point + log.Println("starting v0 api") v0, err := phpapi.NewAPI(ctx, cfg, storage, streamer, manager) if err != nil { return errors.E(op, err) @@ -77,6 +79,13 @@ func Execute(ctx context.Context, cfg config.Config) error { r.Mount("/api", v0.Router()) r.Route(`/request/{TrackID:[0-9]+}`, v0.RequestRoute) + log.Println("starting v1 api") + v1, err := v1.NewAPI(ctx, cfg) + if err != nil { + return errors.E(op, err) + } + r.Mount("/v1", v1.Router()) + // admin routes r.Get("/logout", authentication.LogoutHandler) // outside so it isn't login restricted r.Mount("/admin", admin.Router(ctx, admin.State{