Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
skewb1k committed Jan 20, 2025
2 parents 71861db + 33eba2a commit 94d2bd6
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 140 deletions.
112 changes: 64 additions & 48 deletions internal/controller/ws-handler.controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,28 @@ func (c controller) handleAlive(_ context.Context, _ *websocket.Conn, _ EmptyInp
}

type UpdatePlayerStateInput struct {
VideoId int `json:"video_id"`
IsPlaying bool `json:"is_playing"`
CurrentTime int `json:"current_time"`
PlaybackRate float64 `json:"playback_rate"`
UpdatedAt int `json:"updated_at"`
VideoId int `json:"video_id"`
IsPlaying bool `json:"is_playing"`
CurrentTime int `json:"current_time"`
PlaybackRate float64 `json:"playback_rate"`
UpdatedAt int `json:"updated_at"`
PlayerVersion int `json:"player_version"`
}

func (c controller) handleUpdatePlayerState(ctx context.Context, conn *websocket.Conn, input UpdatePlayerStateInput) error {
roomId := c.getRoomIdFromCtx(ctx)
memberId := c.getMemberIdFromCtx(ctx)

updatePlayerStateResp, err := c.roomService.UpdatePlayerState(ctx, &service.UpdatePlayerStateParams{
SenderConn: conn,
VideoId: input.VideoId,
IsPlaying: input.IsPlaying,
CurrentTime: input.CurrentTime,
PlaybackRate: input.PlaybackRate,
UpdatedAt: input.UpdatedAt,
SenderId: memberId,
RoomId: roomId,
SenderConn: conn,
VideoId: input.VideoId,
IsPlaying: input.IsPlaying,
CurrentTime: input.CurrentTime,
PlaybackRate: input.PlaybackRate,
UpdatedAt: input.UpdatedAt,
PlayerVersion: input.PlayerVersion,
SenderId: memberId,
RoomId: roomId,
})
if err != nil {
return fmt.Errorf("failed to update player state: %w", err)
Expand All @@ -53,19 +55,23 @@ func (c controller) handleUpdatePlayerState(ctx context.Context, conn *websocket
}

type UpdatePlayerVideoInput struct {
VideoId int `json:"video_id"`
UpdatedAt int `json:"updated_at"`
VideoId int `json:"video_id"`
UpdatedAt int `json:"updated_at"`
PlayerVersion int `json:"player_version"`
PlaylistVersion int `json:"playlist_version"`
}

func (c controller) handleUpdatePlayerVideo(ctx context.Context, _ *websocket.Conn, input UpdatePlayerVideoInput) error {
roomId := c.getRoomIdFromCtx(ctx)
memberId := c.getMemberIdFromCtx(ctx)

updatePlayerVideoResp, err := c.roomService.UpdatePlayerVideo(ctx, &service.UpdatePlayerVideoParams{
VideoId: input.VideoId,
UpdatedAt: input.UpdatedAt,
SenderId: memberId,
RoomId: roomId,
PlaylistVersion: input.PlaylistVersion,
PlayerVersion: input.PlayerVersion,
VideoId: input.VideoId,
UpdatedAt: input.UpdatedAt,
SenderId: memberId,
RoomId: roomId,
})
if err != nil {
return fmt.Errorf("failed to update player video: %w", err)
Expand All @@ -84,19 +90,23 @@ func (c controller) handleUpdatePlayerVideo(ctx context.Context, _ *websocket.Co
}

type AddVideoInput struct {
VideoUrl string `json:"video_url"`
UpdatedAt int `json:"updated_at"`
VideoUrl string `json:"video_url"`
UpdatedAt int `json:"updated_at"`
PlaylsitVersion int `json:"playlist_version"`
PlayerVersion int `json:"player_version"`
}

func (c controller) handleAddVideo(ctx context.Context, _ *websocket.Conn, input AddVideoInput) error {
roomId := c.getRoomIdFromCtx(ctx)
memberId := c.getMemberIdFromCtx(ctx)

addVideoResponse, err := c.roomService.AddVideo(ctx, &service.AddVideoParams{
SenderId: memberId,
RoomId: roomId,
VideoUrl: input.VideoUrl,
UpdatedAt: input.UpdatedAt,
PlaylistVersion: input.PlaylsitVersion,
PlayerVersion: input.PlayerVersion,
SenderId: memberId,
RoomId: roomId,
VideoUrl: input.VideoUrl,
UpdatedAt: input.UpdatedAt,
})
if err != nil {
return fmt.Errorf("failed to add video: %w", err)
Expand Down Expand Up @@ -126,33 +136,35 @@ func (c controller) handleAddVideo(ctx context.Context, _ *websocket.Conn, input
return nil
}

func (c controller) handleEndVideo(ctx context.Context, _ *websocket.Conn, _ EmptyInput) error {
type EndVideoInput struct {
PlayerVersion int `json:"player_version"`
}

func (c controller) handleEndVideo(ctx context.Context, _ *websocket.Conn, input EndVideoInput) error {
roomId := c.getRoomIdFromCtx(ctx)
memberId := c.getMemberIdFromCtx(ctx)

addVideoResponse, err := c.roomService.EndVideo(ctx, &service.EndVideoParams{
SenderId: memberId,
RoomId: roomId,
endVideoResponse, err := c.roomService.EndVideo(ctx, &service.EndVideoParams{
PlayerVersion: input.PlayerVersion,
SenderId: memberId,
RoomId: roomId,
})
if err != nil {
return fmt.Errorf("failed to add video: %w", err)
return fmt.Errorf("failed to end video: %w", err)
}

if addVideoResponse.Player != nil {
if endVideoResponse.Playlist != nil {
if err := c.broadcastPlayerVideoUpdated(ctx,
addVideoResponse.Conns,
addVideoResponse.Player,
addVideoResponse.Playlist,
addVideoResponse.Members,
endVideoResponse.Conns,
endVideoResponse.Player,
endVideoResponse.Playlist,
endVideoResponse.Members,
); err != nil {
return fmt.Errorf("failed to broadcast player updated: %w", err)
}
} else {
if err := c.broadcast(ctx, addVideoResponse.Conns, &Output{
Type: "VIDEO_ENDED",
Payload: nil,
}); err != nil {
return fmt.Errorf("failed to broadcast video ended: %w", err)
if err := c.broadcastPlayerStateUpdated(ctx, endVideoResponse.Conns, endVideoResponse.Player); err != nil {
return fmt.Errorf("failed to broadcast player state updated: %w", err)
}
}

Expand Down Expand Up @@ -226,17 +238,19 @@ func (c controller) handlePromoteMember(ctx context.Context, _ *websocket.Conn,
}

type RemoveVideoInput struct {
VideoId int `json:"video_id"`
VideoId int `json:"video_id"`
PlaylistVersion int `json:"playlist_version"`
}

func (c controller) handleRemoveVideo(ctx context.Context, _ *websocket.Conn, input RemoveVideoInput) error {
roomId := c.getRoomIdFromCtx(ctx)
memberId := c.getMemberIdFromCtx(ctx)

removeVideoResponse, err := c.roomService.RemoveVideo(ctx, &service.RemoveVideoParams{
VideoId: input.VideoId,
SenderId: memberId,
RoomId: roomId,
PlaylistVersion: input.PlaylistVersion,
VideoId: input.VideoId,
SenderId: memberId,
RoomId: roomId,
})
if err != nil {
return fmt.Errorf("failed to remove video: %w", err)
Expand Down Expand Up @@ -340,17 +354,19 @@ func (c controller) handleUpdateIsMuted(ctx context.Context, conn *websocket.Con
}

type ReorderPlaylistInput struct {
VideoIds []int `json:"video_ids"`
VideoIds []int `json:"video_ids"`
PlaylistVersion int `json:"playlist_version"`
}

func (c controller) handleReorderPlaylist(ctx context.Context, _ *websocket.Conn, input ReorderPlaylistInput) error {
roomId := c.getRoomIdFromCtx(ctx)
memberId := c.getMemberIdFromCtx(ctx)

removeVideoResponse, err := c.roomService.ReorderPlaylist(ctx, &service.ReorderPlaylistParams{
VideoIds: input.VideoIds,
SenderId: memberId,
RoomId: roomId,
PlaylistVersion: input.PlaylistVersion,
VideoIds: input.VideoIds,
SenderId: memberId,
RoomId: roomId,
})
if err != nil {
return fmt.Errorf("failed to reorder playlist: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/ws-router.controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func (c controller) getWSRouter() *wsrouter.WSRouter {
// video
wsrouter.Handle(mux, "ALIVE", c.handleAlive)
wsrouter.Handle(mux, "ADD_VIDEO", c.handleAddVideo)
wsrouter.Handle(mux, "END_VIDEO", c.handleEndVideo)
wsrouter.Handle(mux, "REMOVE_VIDEO", c.handleRemoveVideo)
wsrouter.Handle(mux, "REORDER_PLAYLIST", c.handleReorderPlaylist)

Expand All @@ -34,6 +33,7 @@ func (c controller) getWSRouter() *wsrouter.WSRouter {
// player
wsrouter.Handle(mux, "UPDATE_PLAYER_STATE", c.handleUpdatePlayerState)
wsrouter.Handle(mux, "UPDATE_PLAYER_VIDEO", c.handleUpdatePlayerVideo)
wsrouter.Handle(mux, "END_VIDEO", c.handleEndVideo)

// profile
wsrouter.Handle(mux, "UPDATE_PROFILE", c.handleUpdateProfile)
Expand Down
29 changes: 29 additions & 0 deletions internal/repository/room/redis/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package redis

import (
"context"
"errors"
"fmt"

"github.com/redis/go-redis/v9"
"github.com/sharetube/server/internal/repository/room"
)

Expand All @@ -23,6 +25,33 @@ func (r repo) getVideoEndedKey(roomId string) string {
return fmt.Sprintf("room:%s:video-ended", roomId)
}

func (r repo) getPlayerVersionKey(roomId string) string {
return fmt.Sprintf("room:%s:player-version", roomId)
}

func (r repo) IncrPlayerVersion(ctx context.Context, roomId string) (int, error) {
playerVersionKey := r.getPlayerVersionKey(roomId)
playerVersion, err := r.rc.Incr(ctx, playerVersionKey).Result()
if err != nil {
return 0, err
}

return int(playerVersion), nil
}

func (r repo) GetPlayerVersion(ctx context.Context, roomId string) (int, error) {
playerVersionKey := r.getPlayerVersionKey(roomId)
playerVersion, err := r.rc.Get(ctx, playerVersionKey).Int()
if err != nil {
if errors.Is(err, redis.Nil) {
return 0, nil
}
return 0, err
}

return playerVersion, nil
}

func (r repo) SetVideoEnded(ctx context.Context, params *room.SetVideoEndedParams) error {
// pipe := r.rc.TxPipeline()
// pipe.Expire(ctx, videoEndedKey, r.maxExpireDuration)
Expand Down
22 changes: 18 additions & 4 deletions internal/service/helper.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,26 @@ func (s service) updatePlayerVideo(ctx context.Context, roomId string, videoId i
conns = append(conns, conn)
}

playerVersion, err := s.roomRepo.IncrPlayerVersion(ctx, roomId)
if err != nil {
return nil, fmt.Errorf("failed to incr player version: %w", err)
}

isEnded, err := s.roomRepo.GetVideoEnded(ctx, roomId)
if err != nil {
return nil, fmt.Errorf("failed to get video ended: %w", err)
}

return &updatePlayerVideoResponse{
Player: Player{
CurrentTime: currentTime,
IsPlaying: isPlaying,
PlaybackRate: playbackRate,
UpdatedAt: updatedAt,
State: PlayerState{
CurrentTime: currentTime,
IsPlaying: isPlaying,
PlaybackRate: playbackRate,
UpdatedAt: updatedAt,
},
IsEnded: isEnded,
Version: playerVersion,
},
Members: members,
Playlist: Playlist{
Expand Down
23 changes: 19 additions & 4 deletions internal/service/member.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ func (s service) UpdateIsReady(ctx context.Context, params *UpdateIsReadyParams)
player.IsPlaying = neededIsReady
player.UpdatedAt = int(time.Now().UnixMicro())

// todo: check isEnded
if err := s.roomRepo.UpdatePlayerWaitingForReady(ctx, params.RoomId, !player.WaitingForReady); err != nil {
return nil, fmt.Errorf("failed to update player waiting for ready: %w", err)
}
Expand All @@ -526,15 +527,29 @@ func (s service) UpdateIsReady(ctx context.Context, params *UpdateIsReadyParams)
return nil, fmt.Errorf("failed to update player is playing: %w", err)
}

playerVersion, err := s.roomRepo.IncrPlayerVersion(ctx, params.RoomId)
if err != nil {
return nil, fmt.Errorf("failed to incr player version: %w", err)
}

isEnded, err := s.roomRepo.GetVideoEnded(ctx, params.RoomId)
if err != nil {
return nil, fmt.Errorf("failed to get video ended: %w", err)
}

return &UpdateIsReadyResponse{
Conns: conns,
UpdatedMember: updatedMember,
Members: members,
Player: &Player{
IsPlaying: player.IsPlaying,
CurrentTime: player.CurrentTime,
PlaybackRate: player.PlaybackRate,
UpdatedAt: player.UpdatedAt,
State: PlayerState{
CurrentTime: player.CurrentTime,
IsPlaying: player.IsPlaying,
PlaybackRate: player.PlaybackRate,
UpdatedAt: player.UpdatedAt,
},
IsEnded: isEnded,
Version: playerVersion,
},
}, nil
}
Expand Down
19 changes: 12 additions & 7 deletions internal/service/models.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@ type Playlist struct {
Version int `json:"version"`
}

type Player struct {
IsPlaying bool `json:"is_playing"`
type PlayerState struct {
CurrentTime int `json:"current_time"`
IsPlaying bool `json:"is_playing"`
PlaybackRate float64 `json:"playback_rate"`
UpdatedAt int `json:"updated_at"`
}

type Player struct {
State PlayerState `json:"state"`
IsEnded bool `json:"is_ended"`
Version int `json:"version"`
}

type Room struct {
Id string `json:"id"`
Player Player `json:"player"`
VideoEnded bool `json:"video_ended"`
Members []Member `json:"members"`
Playlist Playlist `json:"playlist"`
Id string `json:"id"`
Player Player `json:"player"`
Members []Member `json:"members"`
Playlist Playlist `json:"playlist"`
}
Loading

0 comments on commit 94d2bd6

Please sign in to comment.