Skip to content

Commit

Permalink
seperated session management to lobby and game in backend
Browse files Browse the repository at this point in the history
  • Loading branch information
oriventi committed Apr 24, 2024
1 parent 2016f8e commit 5a00c33
Show file tree
Hide file tree
Showing 19 changed files with 644 additions and 1,000 deletions.
18 changes: 10 additions & 8 deletions duo_backend/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (

type Server struct {
pb.UnimplementedDUOServiceServer
Config util.Config
Store db.Store
Maker token.Maker
SessionHandler SessionManager
Config util.Config
Store db.Store
Maker token.Maker
// SessionHandler SessionManager
LobbyHandler LobbyManager
}

func NewServer(store db.Store, config util.Config) *Server {
Expand All @@ -25,10 +26,11 @@ func NewServer(store db.Store, config util.Config) *Server {
}

return &Server{
Store: store,
Config: config,
Maker: maker,
SessionHandler: *NewSessionManager(store),
Store: store,
Config: config,
Maker: maker,
// SessionHandler: *NewSessionManager(store),
LobbyHandler: *NewLobbyManager(store),
}
}

Expand Down
165 changes: 91 additions & 74 deletions duo_backend/api/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,67 @@ import (
"fmt"
"log"
"sync"
"time"

db "github.com/duo/db/sqlc"
"github.com/duo/pb"
"github.com/google/uuid"
)

type UserStream struct {
Stream pb.DUOService_JoinSessionServer
Stream pb.DUOService_JoinLobbyServer
UserId uuid.UUID
Username string
}

func NewUserStream(stream pb.DUOService_JoinSessionServer, userId uuid.UUID, username string) *UserStream {
func NewUserStream(stream pb.DUOService_JoinLobbyServer, userId uuid.UUID, username string) *UserStream {
return &UserStream{
Stream: stream,
Username: username,
UserId: userId,
}
}

type SessionManager struct {
SessionStreams map[int][]UserStream
store db.Store
Mu sync.Mutex
type LobbyManager struct {
LobbyStreams map[int][]UserStream
store db.Store
Mu sync.Mutex
}

func NewSessionManager(store db.Store) *SessionManager {
return &SessionManager{
SessionStreams: make(map[int][]UserStream),
store: store,
Mu: sync.Mutex{},
func NewLobbyManager(store db.Store) *LobbyManager {
return &LobbyManager{
LobbyStreams: make(map[int][]UserStream),
store: store,
Mu: sync.Mutex{},
}
}

func (sm *SessionManager) SendTestMessagesToAll() {
//Send messages to all clients in all sessions with a 1 second interval
go func() {
for {
log.Printf("sending test message to sessions: %v", sm.SessionStreams)
for sessionId, _ := range sm.SessionStreams {
users, usersErr := sm.GetUsersInSession(sessionId)
if usersErr != nil {
log.Printf("error getting users in session %d: %v", sessionId, usersErr)
continue
}
sm.SendMessage(sessionId, &pb.SessionStream{
SessionId: int32(sessionId),
GameState: &pb.GameState{},
SessionState: &pb.SessionState{
Users: users,
},
})
}
time.Sleep(1 * time.Second)
}
}()
}

func (sm *SessionManager) CreateSession(userUUID uuid.UUID, pin string, maxPlayers int32) (*db.GameSession, error) {
dbSession, createErr := sm.store.CreateSession(context.Background(), db.CreateSessionParams{
// func (sm *SessionManager) SendTestMessagesToAll() {
// //Send messages to all clients in all sessions with a 1 second interval
// go func() {
// for {
// log.Printf("sending test message to sessions: %v", sm.SessionStreams)
// for sessionId, _ := range sm.SessionStreams {
// users, usersErr := sm.GetUsersInSession(sessionId)
// if usersErr != nil {
// log.Printf("error getting users in session %d: %v", sessionId, usersErr)
// continue
// }
// sm.SendMessage(sessionId, &pb.SessionStream{
// SessionId: int32(sessionId),
// GameState: &pb.GameState{},
// SessionState: &pb.SessionState{
// Users: users,
// },
// })
// }
// time.Sleep(1 * time.Second)
// }
// }()
// }

func (sm *LobbyManager) CreateLobby(userUUID uuid.UUID, maxPlayers int32) (*db.Lobby, error) {
dbSession, createErr := sm.store.CreateLobby(context.Background(), db.CreateLobbyParams{
OwnerID: userUUID,
Pin: pin,
MaxPlayers: maxPlayers,
})
if createErr != nil {
Expand All @@ -76,29 +74,29 @@ func (sm *SessionManager) CreateSession(userUUID uuid.UUID, pin string, maxPlaye

sm.Mu.Lock()

sm.SessionStreams[int(dbSession.ID)] = []UserStream{}
sm.LobbyStreams[int(dbSession.ID)] = []UserStream{}

sm.Mu.Unlock()

return &dbSession, nil
}

func (sm *SessionManager) GetUsersInSession(sessionId int) ([]*pb.User, error) {
func (sm *LobbyManager) GetUsersInLobby(lobbyId int) ([]*pb.User, error) {

dbSession, getErr := sm.store.GetSessionByID(context.Background(), int32(sessionId))
dbSession, getErr := sm.store.GetLobbyByID(context.Background(), int32(lobbyId))
if getErr != nil {
return []*pb.User{}, getErr
}

sm.Mu.Lock()
defer sm.Mu.Unlock()
//Session must exist
if sm.SessionStreams[sessionId] == nil {
if sm.LobbyStreams[lobbyId] == nil {
return []*pb.User{}, fmt.Errorf("session does not exist")
}

var users []*pb.User
for _, s := range sm.SessionStreams[sessionId] {
for _, s := range sm.LobbyStreams[lobbyId] {
users = append(users, &pb.User{
Uuid: s.UserId.String(),
Name: s.Username,
Expand All @@ -110,110 +108,129 @@ func (sm *SessionManager) GetUsersInSession(sessionId int) ([]*pb.User, error) {
return users, nil
}

func (sm *SessionManager) GetSession(sessionId int) ([]UserStream, error) {
return sm.SessionStreams[sessionId], nil
func (sm *LobbyManager) GetLobby(lobbyId int) ([]UserStream, error) {
return sm.LobbyStreams[lobbyId], nil
}

func (sm *SessionManager) SendMessage(sessionId int, message *pb.SessionStream) []error {
func (sm *LobbyManager) SendMessageToLobby(lobbyId int, message *pb.LobbyStatus) []error {
//Session must exist
if sm.SessionStreams[sessionId] == nil {
if sm.LobbyStreams[lobbyId] == nil {
return []error{fmt.Errorf("session does not exist")}
}

var errs []error

for _, s := range sm.SessionStreams[sessionId] {
log.Printf("sending message to session %d", sessionId)
for _, s := range sm.LobbyStreams[lobbyId] {
log.Printf("sending message to session %d", lobbyId)
err := s.Stream.Send(message)
if err != nil {
errs = append(errs, err)
log.Printf("error sending message to session %d: %v", sessionId, err)
log.Printf("error sending message to session %d: %v", lobbyId, err)
}
}

return errs
}

func (sm *SessionManager) DeleteSession(sessionId int) error {
_, delErr := sm.store.DeleteSessionByID(context.Background(), int32(sessionId))
func (sm *LobbyManager) DeleteLobby(lobbyId int) error {
_, delErr := sm.store.DeleteLobbyByID(context.Background(), int32(lobbyId))
if delErr != nil {
return delErr
}

sm.Mu.Lock()
defer sm.Mu.Unlock()

for _, s := range sm.SessionStreams[sessionId] {
for _, s := range sm.LobbyStreams[lobbyId] {
//TODO Close client connection correctly
s.Stream.Context().Done()
}
delete(sm.SessionStreams, sessionId)
delete(sm.LobbyStreams, lobbyId)

return nil
}

func (sm *SessionManager) GetSessionStreams(sessionId int) ([]UserStream, error) {
return sm.SessionStreams[sessionId], nil
func (sm *LobbyManager) GetLobbyStreams(sessionId int) ([]UserStream, error) {
return sm.LobbyStreams[sessionId], nil
}

func (sm *SessionManager) AddStreamToSession(sessionId int, stream UserStream) error {
func (sm *LobbyManager) AddStreamToLobby(lobbyId int, stream UserStream) error {

//Stream must exist
if sm.SessionStreams[sessionId] == nil {
if sm.LobbyStreams[lobbyId] == nil {
return fmt.Errorf("session does not exist")
}

//Stream must be unique
for _, s := range sm.SessionStreams[sessionId] {
for _, s := range sm.LobbyStreams[lobbyId] {
if s.UserId == stream.UserId {
return fmt.Errorf("stream already exists in session")
}
}

sm.Mu.Lock()
sm.SessionStreams[sessionId] = append(sm.SessionStreams[sessionId], stream)
sm.LobbyStreams[lobbyId] = append(sm.LobbyStreams[lobbyId], stream)
sm.Mu.Unlock()

log.Printf("Added user stream %v to session %d", stream.UserId, sessionId)
users, userErr := sm.GetUsersInLobby(lobbyId)
if userErr != nil {
return userErr
}

sm.SendMessageToLobby(lobbyId, &pb.LobbyStatus{
Users: users,
IsStarting: false,
})

log.Printf("Added user stream %v to session %d", stream.UserId, lobbyId)

//Wait for client to disconnect
<-stream.Stream.Context().Done()

log.Printf("Client %v disconnected", stream.UserId)
sm.RemoveStreamFromSession(sessionId, stream.UserId)
sm.RemoveStreamFromLobby(lobbyId, stream.UserId)
return nil
}

func (sm *SessionManager) RemoveStreamFromSession(sessionId int, userId uuid.UUID) error {
func (sm *LobbyManager) RemoveStreamFromLobby(lobbyId int, userId uuid.UUID) error {
//Stream must exist
if sm.SessionStreams[sessionId] == nil {
return fmt.Errorf("session does not exist")
if sm.LobbyStreams[lobbyId] == nil {
return fmt.Errorf("lobby does not exist")
}

dbSession, getErr := sm.store.GetSessionByID(context.Background(), int32(sessionId))
dbLobby, getErr := sm.store.GetLobbyByID(context.Background(), int32(lobbyId))
if getErr != nil {
return getErr
}

if userId == dbSession.OwnerID {
log.Printf("Owner %v disconnected and session %d deleted", userId, sessionId)
sm.DeleteSession(sessionId)
if userId == dbLobby.OwnerID {
log.Printf("Owner %v disconnected and lobby %d deleted", userId, lobbyId)
sm.DeleteLobby(lobbyId)
return nil
}

newStreams := []UserStream{}
for _, s := range sm.SessionStreams[sessionId] {
for _, s := range sm.LobbyStreams[lobbyId] {
if s.UserId != userId {
newStreams = append(newStreams, s)
} else {
s.Stream.Context().Done()
}
}

sm.Mu.Lock()
sm.SessionStreams[sessionId] = newStreams
sm.LobbyStreams[lobbyId] = newStreams
sm.Mu.Unlock()

log.Printf("Removed user stream %v from session %d", userId, sessionId)
users, userErr := sm.GetUsersInLobby(lobbyId)
if userErr != nil {
return userErr
}

sm.SendMessageToLobby(lobbyId, &pb.LobbyStatus{
Users: users,
IsStarting: false,
})

log.Printf("Removed user stream %v from session %d", userId, lobbyId)
return nil
}
Loading

0 comments on commit 5a00c33

Please sign in to comment.