Skip to content

Commit

Permalink
util/eventstream: implement a CompareAndSend
Browse files Browse the repository at this point in the history
CompareAndSend acts similar to a CompareAndSwap in atomics, it lets you
	compare to the current value before sending the new value such
	that there can be no race condition between two sends.

radio: add ManagerService.UpdateFromStorage

UpdateFromStorage is a method indicating to the manager that it should find
	updated information about state from the storage provider

rpc: implement above method in the rpc layer

config: implement above method in the config reload layer
  • Loading branch information
Wessie committed Jun 21, 2024
1 parent 7289f85 commit 3be0277
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 155 deletions.
4 changes: 4 additions & 0 deletions config/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (m *managerService) UpdateUser(ctx context.Context, u *radio.User) error {
return m.fn().UpdateUser(ctx, u)
}

func (m *managerService) UpdateFromStorage(ctx context.Context) error {
return m.fn().UpdateFromStorage(ctx)
}

var _ radio.ManagerService = &managerService{}

func newStreamerService(cfg Config, conn func() *grpc.ClientConn) radio.StreamerService {
Expand Down
45 changes: 45 additions & 0 deletions manager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,51 @@ func (m *Manager) UpdateListeners(ctx context.Context, listeners radio.Listeners
return nil
}

func (m *Manager) UpdateFromStorage(ctx context.Context) error {
const op errors.Op = "manager/Manager.UpdateFromStorage"
_, span := otel.Tracer("").Start(ctx, string(op))
defer span.End()

{ // user update
current := m.userStream.Latest()

new, err := m.Storage.User(ctx).GetByID(current.ID)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("failed to update user from storage")
} else {
m.userStream.CompareAndSend(new, func(new, old *radio.User) bool {
if old == nil {
return new == nil
}

return new.ID == old.ID
})
}
}

{ // song update
current := m.songStream.Latest()

new, err := m.Storage.Song(ctx).FromHash(current.Hash)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("failed to update song from storage")
} else {
m.songStream.CompareAndSend(&radio.SongUpdate{
Song: *new,
Info: current.Info,
}, func(new, old *radio.SongUpdate) bool {
if old == nil {
return new == nil
}

return new.EqualTo(old.Song)
})
}
}

return nil
}

// statusFromStreams constructs a radio.Status from the individual data streams using
// their latest value
func (m *Manager) statusFromStreams() radio.Status {
Expand Down
62 changes: 53 additions & 9 deletions mocks/radio.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions radio.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ type ListenerTrackerService interface {
}

type ManagerService interface {
UpdateFromStorage(context.Context) error

CurrentUser(context.Context) (eventstream.Stream[*User], error)
UpdateUser(context.Context, *User) error
CurrentSong(context.Context) (eventstream.Stream[*SongUpdate], error)
Expand Down
5 changes: 5 additions & 0 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ func (m ManagerClientRPC) CurrentListeners(ctx context.Context) (eventstream.Str
return streamFromProtobuf(ctx, c, func(v *wrapperspb.Int64Value) radio.Listeners { return v.Value })
}

func (m ManagerClientRPC) UpdateFromStorage(ctx context.Context) error {
_, err := m.rpc.UpdateFromStorage(ctx, new(emptypb.Empty))
return err
}

// NewStreamerService returns a new client implementing radio.StreamerService
func NewStreamerService(c *grpc.ClientConn) radio.StreamerService {
return StreamerClientRPC{
Expand Down
Loading

0 comments on commit 3be0277

Please sign in to comment.