Skip to content

Commit

Permalink
Enh/rtmp proxy (#1387)
Browse files Browse the repository at this point in the history
* Implement method to fetch upcoming stream details and create test course

* Update auth scope for old token endpoints; add endpoint to exchange personal token with stream-link

* Add rtmpProxyURL to config.yaml; update token tests and dao

* Update token panel to include rtmp proxy url and self-stream info

* Update GetAllTokens method to filter tokens based on user role

* Fix linting (golangci-lint)
  • Loading branch information
carlobortolan authored Oct 21, 2024
1 parent 987b174 commit 8b3a059
Show file tree
Hide file tree
Showing 14 changed files with 499 additions and 46 deletions.
108 changes: 105 additions & 3 deletions api/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"github.com/TUM-Dev/gocast/dao"
"github.com/TUM-Dev/gocast/model"
"github.com/TUM-Dev/gocast/tools"
"github.com/TUM-Dev/gocast/tools/tum"
"github.com/gin-gonic/gin"
uuid "github.com/satori/go.uuid"
)

func configTokenRouter(r *gin.Engine, daoWrapper dao.DaoWrapper) {
routes := tokenRoutes{daoWrapper}
g := r.Group("/api/token")
g.Use(tools.Admin)
g.POST("/proxy/:token", routes.fetchStreamKey)
g.Use(tools.AtLeastLecturer)
g.POST("/create", routes.createToken)
g.DELETE("/:id", routes.deleteToken)
}
Expand All @@ -26,7 +28,34 @@ type tokenRoutes struct {

func (r tokenRoutes) deleteToken(c *gin.Context) {
id := c.Param("id")
err := r.TokenDao.DeleteToken(id)

foundContext, exists := c.Get("TUMLiveContext")
if !exists {
return
}
tumLiveContext := foundContext.(tools.TUMLiveContext)

token, err := r.TokenDao.GetTokenByID(id)
if err != nil {
logger.Error("can not get token", "err", err)
_ = c.Error(tools.RequestError{
Status: http.StatusInternalServerError,
CustomMessage: "can not get token",
Err: err,
})
return
}

// only the user who created the token or an admin can delete it
if token.UserID != tumLiveContext.User.ID && tumLiveContext.User.Role != model.AdminType {
_ = c.Error(tools.RequestError{
Status: http.StatusForbidden,
CustomMessage: "not allowed to delete token",
})
return
}

err = r.TokenDao.DeleteToken(id)
if err != nil {
logger.Error("can not delete token", "err", err)
_ = c.Error(tools.RequestError{
Expand Down Expand Up @@ -58,13 +87,22 @@ func (r tokenRoutes) createToken(c *gin.Context) {
})
return
}
if req.Scope != model.TokenScopeAdmin {
if req.Scope == model.TokenScopeAdmin && tumLiveContext.User.Role != model.AdminType {
_ = c.Error(tools.RequestError{
Status: http.StatusBadRequest,
CustomMessage: "not an admin",
})
return
}

if req.Scope != model.TokenScopeAdmin && req.Scope != model.TokenScopeLecturer {
_ = c.Error(tools.RequestError{
Status: http.StatusBadRequest,
CustomMessage: "invalid scope",
})
return
}

tokenStr := uuid.NewV4().String()
expires := sql.NullTime{Valid: req.Expires != nil}
if req.Expires != nil {
Expand All @@ -90,3 +128,67 @@ func (r tokenRoutes) createToken(c *gin.Context) {
"token": tokenStr,
})
}

// This is used by the proxy to get the stream key of the next stream of the lecturer given a lecturer token
//
// Proxy receives: rtmp://proxy.example.com/<lecturer-token>
// or: rtmp://proxy.example.com/<lecturer-token>?slug=ABC-123 <-- optional slug parameter in case the lecturer is streaming multiple courses simultaneously
//
// Proxy returns: rtmp://ingest.example.com/ABC-123?secret=610f609e4a2c43ac8a6d648177472b17
func (r *tokenRoutes) fetchStreamKey(c *gin.Context) {
// Optional slug parameter to get the stream key of a specific course (in case the lecturer is streaming multiple courses simultaneously)
slug := c.Query("slug")
t := c.Param("token")

// Get user from token
token, err := r.TokenDao.GetToken(t)
if err != nil {
_ = c.Error(tools.RequestError{
Status: http.StatusBadRequest,
CustomMessage: "invalid token",
})
return
}

// Only tokens of type lecturer are allowed to start streaming
if token.Scope != model.TokenScopeLecturer {
_ = c.Error(tools.RequestError{
Status: http.StatusUnauthorized,
CustomMessage: "invalid scope",
})
return
}

// Get user and check if he has the right to start a stream
user, err := r.UsersDao.GetUserByID(c, token.UserID)
if err != nil {
_ = c.Error(tools.RequestError{
Status: http.StatusInternalServerError,
CustomMessage: "could not get user",
Err: err,
})
return

}
if user.Role != model.LecturerType && user.Role != model.AdminType {
_ = c.Error(tools.RequestError{
Status: http.StatusUnauthorized,
CustomMessage: "user is not a lecturer or admin",
})
return
}

// Find current/next stream and course of which the user is a lecturer
year, term := tum.GetCurrentSemester()
streamKey, courseSlug, err := r.StreamsDao.GetSoonStartingStreamInfo(&user, slug, year, term)
if err != nil || streamKey == "" || courseSlug == "" {
_ = c.Error(tools.RequestError{
Status: http.StatusNotFound,
CustomMessage: "no stream found",
Err: err,
})
return
}

c.JSON(http.StatusOK, gin.H{"url": "" + tools.Cfg.IngestBase + "/" + courseSlug + "?secret=" + streamKey + "/" + courseSlug})
}
2 changes: 2 additions & 0 deletions api/token_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestToken(t *testing.T) {
wrapper := dao.DaoWrapper{
TokenDao: func() dao.TokenDao {
tokenMock := mock_dao.NewMockTokenDao(gomock.NewController(t))
tokenMock.EXPECT().GetTokenByID("1").Return(model.Token{}, nil).AnyTimes()
tokenMock.EXPECT().DeleteToken("1").Return(errors.New("")).AnyTimes()
return tokenMock
}(),
Expand All @@ -102,6 +103,7 @@ func TestToken(t *testing.T) {
wrapper := dao.DaoWrapper{
TokenDao: func() dao.TokenDao {
tokenMock := mock_dao.NewMockTokenDao(gomock.NewController(t))
tokenMock.EXPECT().GetTokenByID("1").Return(model.Token{}, nil).AnyTimes()
tokenMock.EXPECT().DeleteToken("1").Return(nil).AnyTimes()
return tokenMock
}(),
Expand Down
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@ meili:
apiKey: MASTER_KEY
vodURLTemplate: https://stream.lrz.de/vod/_definst_/mp4:tum/RBG/%s.mp4/playlist.m3u8
canonicalURL: https://tum.live
rtmpProxyURL: https://proxy.example.com
2 changes: 1 addition & 1 deletion dao/courses.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (d coursesDao) CreateCourse(ctx context.Context, course *model.Course, keep

func (d coursesDao) AddAdminToCourse(userID uint, courseID uint) error {
defer Cache.Clear()
return DB.Exec("insert into course_admins (user_id, course_id) values (?, ?)", userID, courseID).Error
return DB.Exec("insert into course_admins (user_id, course_id) values (?, ?) on duplicate key update user_id = user_id", userID, courseID).Error
}

// GetCurrentOrNextLectureForCourse Gets the next lecture for a course or the lecture that is currently live. Error otherwise.
Expand Down
105 changes: 105 additions & 0 deletions dao/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package dao
import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"time"

"gorm.io/gorm/clause"

"github.com/TUM-Dev/gocast/model"
uuid "github.com/satori/go.uuid"
"gorm.io/gorm"
)

Expand All @@ -32,6 +35,7 @@ type StreamsDao interface {
GetCurrentLiveNonHidden(ctx context.Context) (currentLive []model.Stream, err error)
GetLiveStreamsInLectureHall(lectureHallId uint) ([]model.Stream, error)
GetStreamsWithWatchState(courseID uint, userID uint) (streams []model.Stream, err error)
GetSoonStartingStreamInfo(user *model.User, slug string, year int, term string) (string, string, error)

SetLectureHall(streamIDs []uint, lectureHallID uint) error
UnsetLectureHall(streamIDs []uint) error
Expand Down Expand Up @@ -305,6 +309,107 @@ func (d streamsDao) GetStreamsWithWatchState(courseID uint, userID uint) (stream
return
}

// GetSoonStartingStreamInfo returns the stream key, course slug and course name of an upcoming stream.
func (d streamsDao) GetSoonStartingStreamInfo(user *model.User, slug string, year int, term string) (string, string, error) {
var result struct {
CourseID uint
StreamKey string
ID string
Slug string
}
now := time.Now()
query := DB.Table("streams").
Select("streams.course_id, streams.stream_key, streams.id, courses.slug").
Joins("JOIN course_admins ON course_admins.course_id = streams.course_id").
Joins("JOIN courses ON courses.id = course_admins.course_id").
Where("courses.slug != 'TESTCOURSE' AND streams.deleted_at IS NULL AND courses.deleted_at IS NULL AND course_admins.user_id = ? AND (streams.start <= ? AND streams.end >= ?)", user.ID, now.Add(15*time.Minute), now). // Streams starting in the next 15 minutes or currently running
Or("courses.slug != 'TESTCOURSE' AND streams.deleted_at IS NULL AND courses.deleted_at IS NULL AND course_admins.user_id = ? AND (streams.end >= ? AND streams.end <= ?)", user.ID, now.Add(-15*time.Minute), now). // Streams that just finished in the last 15 minutes
Order("streams.start ASC")

if slug != "" {
query = query.Where("courses.slug = ?", slug)
}
if year != 0 {
query = query.Where("courses.year = ?", year)
}
if term != "" {
query = query.Where("courses.teaching_term = ?", term)
}

err := query.Limit(1).Scan(&result).Error
if err == gorm.ErrRecordNotFound || result.StreamKey == "" || result.ID == "" || result.Slug == "" {
stream, course, err := d.CreateOrGetTestStreamAndCourse(user)
if err != nil {
return "", "", err
}
return stream.StreamKey, fmt.Sprintf("%s-%d", course.Slug, stream.ID), nil
}
if err != nil {
logger.Error("Error getting soon starting stream: %v", slog.String("err", err.Error()))
return "", "", err
}

return result.StreamKey, fmt.Sprintf("%s-%s", result.Slug, result.ID), nil
}

// Helper method to fetch test stream and course for current user.
func (d streamsDao) CreateOrGetTestStreamAndCourse(user *model.User) (model.Stream, model.Course, error) {
course, err := d.CreateOrGetTestCourse(user)
if err != nil {
return model.Stream{}, model.Course{}, err
}

var stream model.Stream
err = DB.FirstOrCreate(&stream, model.Stream{
CourseID: course.ID,
Name: "Test Stream",
Description: "This is a test stream",
LectureHallID: 0,
}).Error
if err != nil {
return model.Stream{}, model.Course{}, err
}

stream.Start = time.Now().Add(5 * time.Minute)
stream.End = time.Now().Add(1 * time.Hour)
stream.LiveNow = true
stream.Recording = true
stream.LiveNowTimestamp = time.Now().Add(5 * time.Minute)
stream.Private = true
streamKey := uuid.NewV4().String()
stream.StreamKey = strings.ReplaceAll(streamKey, "-", "")
stream.LectureHallID = 1
err = DB.Save(&stream).Error
if err != nil {
return model.Stream{}, model.Course{}, err
}

return stream, course, err
}

// Helper method to fetch test course for current user.
func (d streamsDao) CreateOrGetTestCourse(user *model.User) (model.Course, error) {
var course model.Course
err := DB.FirstOrCreate(&course, model.Course{
Name: "(" + strconv.Itoa(int(user.ID)) + ") " + user.Name + "'s Test Course",
TeachingTerm: "Test",
Slug: "TESTCOURSE",
Year: 1234,
Visibility: "hidden",
VODEnabled: false, // TODO: Change to VODEnabled: true for default testcourse if necessary
}).Error
if err != nil {
return model.Course{}, err
}

err = CoursesDao.AddAdminToCourse(NewDaoWrapper().CoursesDao, user.ID, course.ID)
if err != nil {
return model.Course{}, err
}

return course, nil
}

// SetLectureHall set lecture-halls of streamIds to lectureHallID
func (d streamsDao) SetLectureHall(streamIDs []uint, lectureHallID uint) error {
return DB.Model(&model.Stream{}).Where("id IN ?", streamIDs).Update("lecture_hall_id", lectureHallID).Error
Expand Down
29 changes: 25 additions & 4 deletions dao/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ type TokenDao interface {
AddToken(token model.Token) error

GetToken(token string) (model.Token, error)
GetAllTokens() ([]AllTokensDto, error)
GetTokenByID(id string) (model.Token, error)
GetAllTokens(user *model.User) ([]AllTokensDto, error)

TokenUsed(token model.Token) error

Expand All @@ -40,11 +41,31 @@ func (d tokenDao) GetToken(token string) (model.Token, error) {
return t, err
}

func (d tokenDao) GetTokenByID(id string) (model.Token, error) {
var t model.Token
err := DB.Model(&t).Where("id = ?", id).First(&t).Error
return t, err
}

// GetAllTokens returns all tokens and the corresponding users name, email and lrz id
func (d tokenDao) GetAllTokens() ([]AllTokensDto, error) {
func (d tokenDao) GetAllTokens(user *model.User) ([]AllTokensDto, error) {
var tokens []AllTokensDto
err := DB.Raw("SELECT tokens.*, u.name as user_name, u.email as user_email, u.lrz_id as user_lrz_id FROM tokens JOIN users u ON u.id = tokens.user_id WHERE tokens.deleted_at IS null").Scan(&tokens).Error
return tokens, err

query := DB.Table("tokens").
Select("tokens.*, u.name as user_name, u.email as user_email, u.lrz_id as user_lrz_id").
Joins("JOIN users u ON u.id = tokens.user_id").
Where("tokens.deleted_at IS NULL")

if user.Role != model.AdminType {
query = query.Where("tokens.user_id = ?", user.ID)
}

err := query.Scan(&tokens).Error
if err != nil {
return nil, err
}

return tokens, nil
}

// TokenUsed is called when a token is used. It sets the last_used field to the current time.
Expand Down
Loading

0 comments on commit 8b3a059

Please sign in to comment.