Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only one streaming key for each course #1323

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .idea/TUM-Live-Backend.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion api/courseimport.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (r lectureHallRoutes) postSchedule(c *gin.Context) {
Streams: nil,
Users: nil,
Token: token,
StreamKey: strings.ReplaceAll(uuid.NewV4().String(), "-", "")[:15],
}

var streams []model.Stream
Expand All @@ -98,12 +99,13 @@ func (r lectureHallRoutes) postSchedule(c *gin.Context) {
if err == nil {
eventID = uint(eventIDInt)
}

streams = append(streams, model.Stream{
Start: event.Start,
End: event.End,
RoomName: event.RoomName,
LectureHallID: lectureHall.ID,
StreamKey: strings.ReplaceAll(uuid.NewV4().String(), "-", "")[:15],
StreamKey: course.StreamKey,
TUMOnlineEventID: eventID,
})
}
Expand Down
36 changes: 33 additions & 3 deletions api/courses.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func configGinCourseRouter(router *gin.Engine, daoWrapper dao.DaoWrapper) {
courses.PUT("/updateDescription/:streamID", routes.updateDescription)
courses.DELETE("/deleteLectureSeries/:streamID", routes.deleteLectureSeries)
courses.POST("/submitCut", routes.submitCut)
courses.POST("/regenerateKey", routes.regenerateCourseKey)

courses.POST("/addUnit", routes.addUnit)
courses.POST("/deleteUnit/:unitID", routes.deleteUnit)
Expand Down Expand Up @@ -1234,8 +1235,13 @@ func (r coursesRoutes) createLecture(c *gin.Context) {
for _, date := range req.DateSeries {
endTime := date.Add(time.Minute * time.Duration(req.Duration))

streamKey := uuid.NewV4().String()
streamKey = strings.ReplaceAll(streamKey, "-", "")
// When defined use course wide stream key
var streamKey string
if tumLiveContext.Course.StreamKey == "" {
streamKey = strings.ReplaceAll(uuid.NewV4().String(), "-", "")
} else {
streamKey = tumLiveContext.Course.StreamKey
}

lecture := model.Stream{
Name: req.Title,
Expand Down Expand Up @@ -1391,6 +1397,7 @@ func (r coursesRoutes) createCourse(c *gin.Context) {
ChatEnabled: req.EnChat,
Visibility: req.Access,
Streams: []model.Stream{},
StreamKey: strings.ReplaceAll(uuid.NewV4().String(), "-", ""),
}
if tumLiveContext.User.Role != model.AdminType {
course.Admins = []model.User{*tumLiveContext.User}
Expand Down Expand Up @@ -1481,8 +1488,31 @@ func (r coursesRoutes) deleteCourse(c *gin.Context) {
dao.Cache.Clear()
}

// regenerateCourseKey updates the stream key of the course and updates the key of all the streams
func (r coursesRoutes) regenerateCourseKey(c *gin.Context) {
ctx := c.MustGet("TUMLiveContext").(tools.TUMLiveContext)
course := *ctx.Course

course.StreamKey = strings.ReplaceAll(uuid.NewV4().String(), "-", "")
err := r.DaoWrapper.CoursesDao.UpdateCourse(c, course)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "could not update course")
return
}

for _, s := range course.Streams {
s.StreamKey = course.StreamKey

err := r.DaoWrapper.StreamsDao.UpdateStream(s)
// Log error but continue on remaining streams
if err != nil {
logger.Error("could not modify stream key ", err)
}
}
}

type createCourseRequest struct {
Access string // enrolled, public, hidden or loggedin
Access string // enrolled, public, hidden or logged in
CourseID string
EnChat bool
EnDL bool
Expand Down
73 changes: 73 additions & 0 deletions api/runner_grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package api

import (
"context"
"database/sql"

Check failure on line 5 in api/runner_grpc.go

View workflow job for this annotation

GitHub Actions / lint (./worker/edge)

File is not `gofumpt`-ed (gofumpt)
"github.com/TUM-Dev/gocast/dao"
"github.com/TUM-Dev/gocast/model"
log "github.com/sirupsen/logrus"
"github.com/tum-dev/gocast/runner/protobuf"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
"net"

Check failure on line 13 in api/runner_grpc.go

View workflow job for this annotation

GitHub Actions / lint (./worker/edge)

File is not `gofumpt`-ed (gofumpt)
"time"
)

var _ protobuf.FromRunnerServer = (*GrpcRunnerServer)(nil)

type GrpcRunnerServer struct {
protobuf.UnimplementedFromRunnerServer

dao.DaoWrapper
}

func (g GrpcRunnerServer) Register(ctx context.Context, request *protobuf.RegisterRequest) (*protobuf.RegisterResponse, error) {
runner := model.Runner{
Hostname: request.Hostname,
Port: int(request.Port),
LastSeen: sql.NullTime{Valid: true, Time: time.Now()},
}
err := g.RunnerDao.Create(ctx, &runner)
if err != nil {
return nil, err
}
return &protobuf.RegisterResponse{}, nil
}

func (g GrpcRunnerServer) Heartbeat(ctx context.Context, request *protobuf.HeartbeatRequest) (*protobuf.HeartbeatResponse, error) {
//TODO implement me

Check failure on line 39 in api/runner_grpc.go

View workflow job for this annotation

GitHub Actions / lint (./worker/edge)

commentFormatting: put a space between `//` and comment text (gocritic)
panic("implement me")
}

func (g GrpcRunnerServer) RequestSelfStream(ctx context.Context, request *protobuf.SelfStreamRequest) (*protobuf.SelfStreamResponse, error) {
//TODO implement me

Check failure on line 44 in api/runner_grpc.go

View workflow job for this annotation

GitHub Actions / lint (./worker/edge)

commentFormatting: put a space between `//` and comment text (gocritic)
panic("implement me")
}

func (g GrpcRunnerServer) mustEmbedUnimplementedFromRunnerServer() {

Check failure on line 48 in api/runner_grpc.go

View workflow job for this annotation

GitHub Actions / lint (./worker/edge)

func `GrpcRunnerServer.mustEmbedUnimplementedFromRunnerServer` is unused (unused)
//TODO implement me

Check failure on line 49 in api/runner_grpc.go

View workflow job for this annotation

GitHub Actions / lint (./worker/edge)

commentFormatting: put a space between `//` and comment text (gocritic)
panic("implement me")
}

func StartGrpcRunnerServer() {
lis, err := net.Listen("tcp", ":50056")
if err != nil {
log.WithError(err).Error("Failed to init grpc server")
return
}
grpcServer := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: time.Minute,
MaxConnectionAge: time.Minute * 5,
MaxConnectionAgeGrace: time.Second * 5,
Time: time.Minute * 10,
Timeout: time.Second * 20,
}))
protobuf.RegisterFromRunnerServer(grpcServer, &GrpcRunnerServer{DaoWrapper: dao.NewDaoWrapper()})
reflection.Register(grpcServer)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.WithError(err).Errorf("Can't serve grpc")
}
}()
}
29 changes: 29 additions & 0 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func configGinStreamRestRouter(router *gin.Engine, daoWrapper dao.DaoWrapper) {
streamById.GET("/subtitles/:lang", routes.getSubtitles)

streamById.GET("/playlist", routes.getStreamPlaylist)
streamById.POST("/regenerateKey", routes.regenerateKey)
streamById.POST("/restoreKey", routes.restoreKey)

thumbs := streamById.Group("/thumbs")
{
Expand Down Expand Up @@ -859,6 +861,33 @@ func (r streamRoutes) updateStreamVisibility(c *gin.Context) {
}
}

// regenerateKey regenerates the key for a stream.
func (r streamRoutes) regenerateKey(c *gin.Context) {
ctx := c.MustGet("TUMLiveContext").(tools.TUMLiveContext)
stream := *ctx.Stream

stream.StreamKey = strings.ReplaceAll(uuid.NewV4().String(), "-", "")
err := r.DaoWrapper.StreamsDao.UpdateStream(stream)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "could not update stream")
return
}
}

// restoreKey restores the key for a stream to the course key
func (r streamRoutes) restoreKey(c *gin.Context) {
ctx := c.MustGet("TUMLiveContext").(tools.TUMLiveContext)
stream := *ctx.Stream
course := *ctx.Course

stream.StreamKey = course.StreamKey
err := r.DaoWrapper.StreamsDao.UpdateStream(stream)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "could not update stream")
return
}
}

func (r streamRoutes) updateChatEnabled(c *gin.Context) {
stream, err := r.StreamsDao.GetStreamByID(context.Background(), c.Param("streamID"))
if err != nil {
Expand Down
14 changes: 6 additions & 8 deletions api/worker_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,20 @@
if request.StreamKey == "" {
return nil, errors.New("stream key empty")
}
stream, err := s.StreamsDao.GetStreamByKey(ctx, request.StreamKey)
stream, err := s.StreamsDao.GetStreamByKeyAndTime(ctx, request.StreamKey, time.Now())
if err != nil {
return nil, err
}
logger.Info("Got stream: %+v\n", stream)

Check failure on line 134 in api/worker_grpc.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

slog.Logger.Info arg "stream" should be a string or a slog.Attr (possible missing key or value)

Check failure on line 134 in api/worker_grpc.go

View workflow job for this annotation

GitHub Actions / lint (./worker/edge)

slog: slog.Logger.Info arg "stream" should be a string or a slog.Attr (possible missing key or value) (govet)
course, err := s.DaoWrapper.CoursesDao.GetCourseById(ctx, stream.CourseID)
if err != nil {
return nil, err
}
if request.CourseSlug != fmt.Sprintf("%s-%d", course.Slug, stream.ID) {
return nil, fmt.Errorf("bad stream name, should: %s, is: %s", fmt.Sprintf("%s-%d", course.Slug, stream.ID), request.CourseSlug)
}
// reject streams that are more than 30 minutes in the future or more than 30 minutes past
if !(time.Now().After(stream.Start.Add(time.Minute*-30)) && time.Now().Before(stream.End.Add(time.Minute*30))) {
logger.Warn("Stream rejected, time out of bounds", "streamId", stream.ID)
return nil, errors.New("stream rejected")

if request.CourseSlug != course.Slug {
return nil, fmt.Errorf("bad stream name, should: %s, is: %s", course.Slug, request.CourseSlug)
}

ingestServer, err := s.DaoWrapper.IngestServerDao.GetBestIngestServer()
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions cmd/tumlive/tumlive.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
var initializers = []initializer{
tools.LoadConfig,
api.ServeWorkerGRPC,
api.StartGrpcRunnerServer,
tools.InitBranding,
}

Expand Down Expand Up @@ -189,6 +190,7 @@ func main() {
&model.Subtitles{},
&model.TranscodingFailure{},
&model.Email{},
&model.Runner{},
)
if err != nil {
sentry.CaptureException(err)
Expand Down
2 changes: 2 additions & 0 deletions dao/dao_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type DaoWrapper struct {
SubtitlesDao
TranscodingFailureDao
EmailDao
RunnerDao RunnerDao
}

func NewDaoWrapper() DaoWrapper {
Expand Down Expand Up @@ -63,5 +64,6 @@ func NewDaoWrapper() DaoWrapper {
SubtitlesDao: NewSubtitlesDao(),
TranscodingFailureDao: NewTranscodingFailureDao(),
EmailDao: NewEmailDao(),
RunnerDao: NewRunnerDao(),
}
}
47 changes: 47 additions & 0 deletions dao/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package dao

import (
"context"

Check failure on line 4 in dao/runner.go

View workflow job for this annotation

GitHub Actions / lint (./worker/edge)

File is not `gofumpt`-ed (gofumpt)
"github.com/TUM-Dev/gocast/model"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

//go:generate mockgen -source=runner.go -destination ../mock_dao/runner.go

type RunnerDao interface {
// Get Runner by hostname
Get(context.Context, string) (model.Runner, error)

// Create a new Runner for the database
Create(context.Context, *model.Runner) error

// Delete a Runner by hostname.
Delete(context.Context, string) error
}

type runnerDao struct {
db *gorm.DB
}

func NewRunnerDao() RunnerDao {
return runnerDao{db: DB}
}

// Get a Runner by id.
func (d runnerDao) Get(c context.Context, hostname string) (res model.Runner, err error) {
return res, DB.WithContext(c).First(&res, "hostname = ?", hostname).Error
}

// Create a Runner.
func (d runnerDao) Create(c context.Context, it *model.Runner) error {
return DB.WithContext(c).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "hostname"}},
UpdateAll: true,
}).Create(&it).Error
}

// Delete a Runner by hostname.
func (d runnerDao) Delete(c context.Context, hostname string) error {
return DB.WithContext(c).Delete(&model.Runner{}, hostname).Error
}
24 changes: 23 additions & 1 deletion dao/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type StreamsDao interface {

GetDueStreamsForWorkers() []model.Stream
GetDuePremieresForWorkers() []model.Stream
GetStreamByKey(ctx context.Context, key string) (stream model.Stream, err error)
GetStreamByKey(ctx context.Context, key string) (stream model.Stream, err error) // deprecated
GetStreamByKeyAndTime(ctx context.Context, key string, time2 time.Time) (stream model.Stream, err error)
GetUnitByID(id string) (model.StreamUnit, error)
GetStreamByTumOnlineID(ctx context.Context, id uint) (stream model.Stream, err error)
GetStreamsByIds(ids []uint) ([]model.Stream, error)
Expand Down Expand Up @@ -129,12 +130,32 @@ func (d streamsDao) GetDuePremieresForWorkers() []model.Stream {
return res
}

// Deprecated: Stream keys are generally no longer unique to
// each stream. Use GetStreamByKeyAndTime instead
func (d streamsDao) GetStreamByKey(ctx context.Context, key string) (stream model.Stream, err error) {
var res model.Stream
err = DB.First(&res, "stream_key = ?", key).Error
return res, err
}

// GetStreamByKeyAndTime finds stream by key and time t.
// t must be between stream start and end
func (d streamsDao) GetStreamByKeyAndTime(ctx context.Context, key string, t time.Time) (stream model.Stream, err error) {
// first get all streams by key
var streams []model.Stream
result := DB.Where("stream_key = ?", key).Find(&streams)
if result.Error != nil {
return model.Stream{}, err
}
// find stream that encompasses the given time with 30 minutes of padding before and after
for _, s := range streams {
if t.After(stream.Start.Add(time.Minute*-30)) && t.Before(stream.End.Add(time.Minute*30)) {
return s, err
}
}
return model.Stream{}, fmt.Errorf("no stream at %s", t.String())
}

func (d streamsDao) GetUnitByID(id string) (model.StreamUnit, error) {
var unit model.StreamUnit
err := DB.First(&unit, "id = ?", id).Error
Expand Down Expand Up @@ -323,6 +344,7 @@ func (d streamsDao) UpdateStream(stream model.Stream) error {
"start": stream.Start,
"end": stream.End,
"chat_enabled": stream.ChatEnabled,
"stream_key": stream.StreamKey,
}).Error
return err
}
Expand Down
3 changes: 2 additions & 1 deletion docs/static/tum-live-starter.sql
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ CREATE TABLE `courses` (
`vod_chat_enabled` tinyint(1) DEFAULT NULL,
`visibility` varchar(191) DEFAULT 'loggedin',
`token` longtext DEFAULT NULL,
`streamKey` longtext,
`user_created_by_token` tinyint(1) DEFAULT 0,
`camera_preset_preferences` longtext DEFAULT NULL,
PRIMARY KEY (`id`),
Expand All @@ -257,7 +258,7 @@ CREATE TABLE `courses` (

LOCK TABLES `courses` WRITE;
/*!40000 ALTER TABLE `courses` DISABLE KEYS */;
INSERT INTO `courses` VALUES (1,'2022-04-18 13:40:05.843','2022-04-18 13:46:46.546',NULL,1,'Einführung Brauereiwesen','brauereiwesen',2022,'S','',1,1,1,0,1,0,0,'public','',0,''),(2,'2022-04-18 13:40:54.686','2022-04-18 13:40:54.698',NULL,1,'Spieleentwicklung für Dummies','games101',2022,'S','',1,1,1,0,1,0,0,'loggedin','',0,''),(3,'2022-04-18 13:41:55.741','2022-04-18 13:41:55.754',NULL,1,'Praktikum: Golang','godev',2021,'W','',1,1,1,0,1,0,0,'public','',0,'');
INSERT INTO `courses` VALUES (1,'2022-04-18 13:40:05.843','2022-04-18 13:46:46.546',NULL,1,'Einführung Brauereiwesen','brauereiwesen',2022,'S','',1,1,1,0,1,0,0,'public','', 'ba09dd459e50476da90864fecfa7ae14',0,''),(2,'2022-04-18 13:40:54.686','2022-04-18 13:40:54.698',NULL,1,'Spieleentwicklung für Dummies','games101',2022,'S','',1,1,1,0,1,0,0,'loggedin','','6fe65fe1be4946b68983db45beb7d28f',0,''),(3,'2022-04-18 13:41:55.741','2022-04-18 13:41:55.754',NULL,1,'Praktikum: Golang','godev',2021,'W','',1,1,1,0,1,0,0,'public','','48011344a82249baad57f1a7b17f28ec',0,'');
/*!40000 ALTER TABLE `courses` ENABLE KEYS */;
UNLOCK TABLES;

Expand Down
Loading
Loading