diff --git a/api/action.go b/api/action.go index 129863879..1468341b9 100644 --- a/api/action.go +++ b/api/action.go @@ -16,6 +16,7 @@ func configActionRouter(r *gin.Engine, wrapper dao.DaoWrapper) { routes := actionRoutes{dao: wrapper.ActionDao} g.GET("/failed", routes.getFailedActions) + g.GET("/:id", routes.getActionById) } type actionRoutes struct { @@ -37,3 +38,18 @@ func (a actionRoutes) getFailedActions(c *gin.Context) { res := make([]gin.H, len(models)) c.JSON(http.StatusOK, res) } + +func (a actionRoutes) getActionById(c *gin.Context) { + log.Info("Getting action by id: ", c.Param("id")) + ctx := context.Background() + model, err := a.dao.GetActionByID(ctx, c.Param("id")) + if err != nil { + _ = c.Error(tools.RequestError{ + Status: http.StatusNotFound, + CustomMessage: "Action not found", + Err: err, + }) + return + } + c.JSON(http.StatusOK, model) +} diff --git a/api/runner_grpc.go b/api/runner_grpc.go index cec04c75b..e27942572 100644 --- a/api/runner_grpc.go +++ b/api/runner_grpc.go @@ -34,18 +34,21 @@ type GrpcRunnerServer struct { // func (g GrpcRunnerServer) Register(ctx context.Context, request *protobuf.RegisterRequest) (*protobuf.RegisterResponse, error) { - runner := model.Runner{ - Hostname: request.Hostname, - Port: int(request.Port), - LastSeen: time.Now(), - Status: "Alive", - Workload: 0, + runner, err := g.RunnerDao.Get(ctx, request.Hostname) + if runner == nil || runner.Hostname == "" { + runner = &model.Runner{ + Hostname: request.Hostname, + Port: int(request.Port), + LastSeen: time.Now(), + Status: "Alive", + Workload: 0, + } } - err := g.RunnerDao.Create(ctx, &runner) + err = g.RunnerDao.Create(ctx, runner) if err != nil { return nil, err } - return &protobuf.RegisterResponse{}, nil + return &protobuf.RegisterResponse{ID: runner.Hostname}, nil } func (g GrpcRunnerServer) Heartbeat(ctx context.Context, request *protobuf.HeartbeatRequest) (*protobuf.HeartbeatResponse, error) { @@ -71,6 +74,7 @@ func (g GrpcRunnerServer) Heartbeat(ctx context.Context, request *protobuf.Heart Disk: request.Disk, Uptime: request.Uptime, Version: request.Version, + Actions: request.CurrentAction, } ctx = context.WithValue(ctx, "newStats", newStats) log.Info("Updating runner stats ", "runner", r) @@ -103,26 +107,32 @@ func StreamRequest(ctx context.Context, dao dao.DaoWrapper, runner model.Runner) logger.Error("No source", "source", source) return } - /*server, err := dao.IngestServerDao.GetBestIngestServer() - if err != nil { - logger.Error("can't find ingest server", "err", err) - return - }*/ - //var slot model.StreamName - /*if version == "COMB" { //try to find a transcoding slot for comb view: - slot, err = dao.IngestServerDao.GetTranscodedStreamSlot(server.ID) - } - if version != "COMB" || err != nil { - slot, err = dao.IngestServerDao.GetStreamSlot(server.ID) + //TODO: Implement environment variable for ingest + ingest := false + if ingest { + server, err := dao.IngestServerDao.GetBestIngestServer() if err != nil { - logger.Error("No free stream slot", "err", err) + logger.Error("can't find ingest server", "err", err) return } - }*/ + + var slot model.StreamName + if version == "COMB" { //try to find a transcoding slot for comb view: + slot, err = dao.IngestServerDao.GetTranscodedStreamSlot(server.ID) + } + if version != "COMB" || err != nil { + slot, err = dao.IngestServerDao.GetStreamSlot(server.ID) + if err != nil { + logger.Error("No free stream slot", "err", err) + return + } + } + slot.StreamID = stream.ID + dao.IngestServerDao.SaveSlot(slot) + } + src := "rtsp://" + source - //slot.StreamID = stream.ID - //dao.IngestServerDao.SaveSlot(slot) req := protobuf.StreamRequest{ ActionID: actionID, Stream: uint64(stream.ID), @@ -259,18 +269,17 @@ func (g GrpcRunnerServer) RequestSelfStream(ctx context.Context, request *protob }, nil } -func (g GrpcRunnerServer) NotifyStreamEnd(ctx context.Context, request *protobuf.StreamEndRequest) (*protobuf.StreamEndResponse, error) { +func (g GrpcRunnerServer) NotifyStreamEnded(ctx context.Context, request *protobuf.StreamEnded) (*protobuf.Status, error) { //TODO Test me - stream, err := g.StreamsDao.GetStreamByID(ctx, fmt.Sprintf("%v", request.ActionID)) + stream, err := g.StreamsDao.GetStreamByID(ctx, fmt.Sprintf("%v", request.StreamID)) if err != nil { - return nil, err + return &protobuf.Status{Ok: false}, err } err = g.StreamsDao.SaveEndedState(stream.ID, true) if err != nil { - return nil, err + return &protobuf.Status{Ok: false}, err } - return &protobuf.StreamEndResponse{}, nil - + return &protobuf.Status{Ok: true}, nil } func (g GrpcRunnerServer) NotifyStreamStarted(ctx context.Context, request *protobuf.StreamStarted) (*protobuf.Status, error) { @@ -331,6 +340,8 @@ func (g GrpcRunnerServer) NotifyStreamStarted(ctx context.Context, request *prot request.HLSUrl = strings.ReplaceAll(request.HLSUrl, "?dvr", "") } + logger.Info("hls url", "url", request.HLSUrl) + switch request.Version { case "CAM": g.StreamsDao.SaveCAMURL(&stream, request.HLSUrl) @@ -409,10 +420,9 @@ func SetTranscodeFinished(ctx context.Context, req *protobuf.ActionFinished) (*p func NotifyForStreams(dao dao.DaoWrapper) func() { return func() { - logger.Info("Notifying runners") + logger.Info("Collecting due streams") streams := dao.StreamsDao.GetDueStreamsForWorkers() - logger.Info("incoming stream count", "count", len(streams)) for i := range streams { err := dao.StreamsDao.SaveEndedState(streams[i].ID, false) if err != nil { @@ -445,7 +455,7 @@ func NotifyForStreams(dao dao.DaoWrapper) func() { values["source"] = lectureHallForStream.PresIP err = CreateJob(dao, ctx, values) //presentation if err != nil { - logger.Error("Can't create job", err) + log.Error("Can't create job", err) } break case 2: //camera @@ -489,33 +499,33 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { logger.Info("Assigning runners to action") ctx := context.Background() - //Running normal jobs with the idea that they are working as they should - jobs, err := dao.JobDao.GetAllOpenJobs(ctx) + //checking for each running action if the runner is still doing the job or if it is dead + activeAction, err := dao.ActionDao.GetRunningActions(ctx) if err != nil { - logger.Error("Can't get jobs", err) - return + logger.Error("Can't get running actions", err) } - for _, job := range jobs { - action, err := job.GetNextAction() - if err != nil { - logger.Error("Can't get next action", err) + for _, action := range activeAction { + if action.End.Before(time.Now().Add(5 * time.Minute)) { + action.SetToIgnored() + log.Info("Action ignored, check for progress manually", "action", action.ID) continue } - err = AssignRunnerAction(dao, action) + runner, err := action.GetCurrentRunner() if err != nil { - logger.Error("Can't assign runner to action", err) + logger.Error("Can't get current runner", err) + action.SetToFailed() + err = dao.ActionDao.UpdateAction(ctx, &action) + if err != nil { + return + } continue } - } - //checking for each running action if the runner is still doing the job or if it is dead - activeAction, err := dao.ActionDao.GetRunningActions(ctx) - if err != nil { - logger.Error("Can't get running actions", err) - } - for _, action := range activeAction { - runner := action.GetCurrentRunner() if !runner.IsAlive() && !action.IsCompleted() { action.SetToFailed() + err = dao.ActionDao.UpdateAction(ctx, &action) + if err != nil { + return + } } } @@ -525,12 +535,45 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() { return } for _, failedAction := range failedActions { - failedAction.SetToRestarted() + failedAction.SetToRunning() err = AssignRunnerAction(dao, &failedAction) + err = dao.ActionDao.UpdateAction(ctx, &failedAction) + if err != nil { + return + } + if err != nil { + logger.Error("Can't assign runner to action", err) + } + } + + //Running normal jobs with the idea that they are working as they should + jobs, err := dao.JobDao.GetAllOpenJobs(ctx) + if err != nil { + logger.Error("Can't get jobs", err) + return + } + for _, job := range jobs { + action, err := job.GetNextAction() + if err != nil { + logger.Error("Can't get next action", err) + continue + } + if dao.JobDao.UpdateJob(ctx, job) != nil { + logger.Error("Can't update job", err) + continue + } + err = AssignRunnerAction(dao, action) if err != nil { logger.Error("Can't assign runner to action", err) + continue + } + action.SetToRunning() + err = dao.ActionDao.UpdateAction(ctx, action) + if err != nil { + return } } + } } @@ -569,11 +612,12 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error { //TranscodingRequest(ctx, dao, runner) break } - + action.SetToRunning() return nil } func CreateJob(dao dao.DaoWrapper, ctx context.Context, values map[string]interface{}) error { + logger.Info("Creating Job", "values", values) job := model.Job{ Start: time.Now(), Completed: false, diff --git a/dao/Action.go b/dao/Action.go index 4e957df98..27f7e9df3 100644 --- a/dao/Action.go +++ b/dao/Action.go @@ -17,6 +17,7 @@ type ActionDao interface { GetRunningActions(ctx context.Context) ([]model.Action, error) GetAll(ctx context.Context) ([]model.Action, error) GetAllFailedActions(ctx context.Context) ([]model.Action, error) + UpdateAction(ctx context.Context, action *model.Action) error } type actionDao struct { @@ -70,3 +71,7 @@ func (d actionDao) GetAllFailedActions(ctx context.Context) ([]model.Action, err err := d.db.WithContext(ctx).Find(&actions, "status = ?", 2).Error return actions, err } + +func (d actionDao) UpdateAction(ctx context.Context, action *model.Action) error { + return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error +} diff --git a/dao/Jobs.go b/dao/Jobs.go index 3252970f9..5b8d0fac0 100644 --- a/dao/Jobs.go +++ b/dao/Jobs.go @@ -15,6 +15,7 @@ type JobDao interface { GetRunners(ctx context.Context, job model.Job) ([]*model.Runner, error) RemoveAction(ctx context.Context, job model.Job, actionID uint32) error GetAllOpenJobs(ctx context.Context) ([]model.Job, error) + UpdateJob(ctx context.Context, job model.Job) error } type jobDao struct { @@ -53,3 +54,7 @@ func (j jobDao) GetAllOpenJobs(ctx context.Context) ([]model.Job, error) { err := j.db.WithContext(ctx).Model(&model.Job{}).Preload("Actions").Find(&jobs).Where("completed = ?", false).Error return jobs, err } + +func (j jobDao) UpdateJob(ctx context.Context, job model.Job) error { + return j.db.WithContext(ctx).Model(&job).Updates(&job).Error +} diff --git a/dao/runner.go b/dao/runner.go index 529aad78a..bc7baa6a0 100644 --- a/dao/runner.go +++ b/dao/runner.go @@ -12,7 +12,7 @@ import ( type RunnerDao interface { // Get Runner by hostname - Get(context.Context, string) (model.Runner, error) + Get(context.Context, string) (*model.Runner, error) // Get all Runners in an array GetAll(context.Context) ([]model.Runner, error) @@ -33,7 +33,7 @@ func NewRunnerDao() RunnerDao { } // Get a Runner by id. -func (d runnerDao) Get(c context.Context, hostname string) (res model.Runner, err error) { +func (d runnerDao) Get(c context.Context, hostname string) (res *model.Runner, err error) { return res, DB.WithContext(c).First(&res, "hostname = ?", hostname).Error } diff --git a/model/action.go b/model/action.go index 25a2aa662..9d065af0b 100644 --- a/model/action.go +++ b/model/action.go @@ -11,7 +11,7 @@ const ( running failed awaiting - restarted + ignored ) type Action struct { @@ -55,16 +55,19 @@ func (a *Action) SetToAwaiting() { a.Status = awaiting } -func (a *Action) SetToRestarted() { - a.Status = restarted +func (a *Action) SetToIgnored() { + a.Status = ignored } func (a *Action) IsCompleted() bool { return a.Status == completed } -func (a *Action) GetCurrentRunner() Runner { - return a.AllRunners[len(a.AllRunners)-1] +func (a *Action) GetCurrentRunner() (*Runner, error) { + if len(a.AllRunners) == 0 { + return nil, errors.New("no runner assigned") + } + return &a.AllRunners[len(a.AllRunners)-1], nil } func (a *Action) AssignRunner(runner Runner) { diff --git a/model/job.go b/model/job.go index d2c05c2be..3e9eccbce 100644 --- a/model/job.go +++ b/model/job.go @@ -38,8 +38,9 @@ func (j *Job) GetAllActions() ([]Action, error) { func (j *Job) GetNextAction() (*Action, error) { if j.Actions == nil { return nil, errors.New("no actions found") - } else if j.Actions[0].Status == completed { - return nil, errors.New("action already completed, not pushed") + } else if j.Actions[0].Status != awaiting { + j.Actions = j.Actions[1:] + return nil, errors.New("action not in awaiting, not pushed") } if len(j.Actions) == 0 { j.Completed = true diff --git a/model/runner.go b/model/runner.go index e0187453e..ab592e7db 100644 --- a/model/runner.go +++ b/model/runner.go @@ -22,6 +22,7 @@ type Runner struct { Disk string Uptime string Version string + Actions string } // BeforeCreate returns errors if hostnames and ports of workers are invalid. @@ -47,6 +48,11 @@ func (r *Runner) UpdateStats(tx *gorm.DB, ctx context.Context) (bool, error) { } func (r *Runner) IsAlive() bool { - r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -1)) + r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -2)) + if r.Alive { + r.Status = "Alive" + } else { + r.Status = "Dead" + } return r.Alive } diff --git a/runner/ServerHandler.go b/runner/ServerHandler.go index 0b479faf2..2055ca5e5 100644 --- a/runner/ServerHandler.go +++ b/runner/ServerHandler.go @@ -2,6 +2,7 @@ package runner import ( "context" + "fmt" "github.com/tum-dev/gocast/runner/protobuf" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -76,6 +77,10 @@ func (r *Runner) ReadDiagnostics(retries int) { r.ReadDiagnostics(retries - 1) return } + actions := "" + for id, _ := range r.activeActions { + actions += fmt.Sprintln(id + ",") + } _, err = con.Heartbeat(context.Background(), &protobuf.HeartbeatRequest{ Hostname: r.cfg.Hostname, @@ -88,7 +93,7 @@ func (r *Runner) ReadDiagnostics(retries int) { Disk: disk, Uptime: uptime, Version: r.cfg.Version, - CurrentAction: "none", + CurrentAction: actions, }) if err != nil { r.log.Warn("Error sending the heartbeat", "error", err, "sleeping(s)", 5-retries) diff --git a/runner/actions/stream.go b/runner/actions/stream.go index f0a4da1fd..a46d60aaa 100644 --- a/runner/actions/stream.go +++ b/runner/actions/stream.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/tum-dev/gocast/runner/protobuf" "log/slog" + "net/url" "os" "os/exec" "path/filepath" @@ -34,6 +35,10 @@ func (a *ActionProvider) StreamAction() *Action { if !ok { return ctx, fmt.Errorf("%w: context doesn't contain hostname", ErrRequiredContextValNotFound) } + edgeVM, ok := ctx.Value("Edge").(string) + if !ok { + return ctx, fmt.Errorf("%w: context doesn't contain edge", ErrRequiredContextValNotFound) + } streamID, ok := ctx.Value("stream").(uint64) if !ok { return ctx, fmt.Errorf("%w: context doesn't contain stream", ErrRequiredContextValNotFound) @@ -101,12 +106,24 @@ func (a *ActionProvider) StreamAction() *Action { time.Sleep(5 * time.Second) // little backoff to prevent dossing source continue } + + localhls := fmt.Sprintf("%v/%d/%d/%s/%s/playlist.m3u8", hostname, courseID, streamID, version, end.Format("15-04-05")) + + edgeURL, err := url.Parse(fmt.Sprintf("http://%v:8089/%v", + edgeVM, localhls, + )) + if err != nil { + return ctx, fmt.Errorf("%w: cannot create urlPath", err) + } + log.Info("streaming", "edgeURL", edgeURL.String()) + resp := a.Server.NotifyStreamStarted(ctx, &protobuf.StreamStarted{ Hostname: hostname, StreamID: uint32(streamID), CourseID: uint32(courseID), Version: version, - HLSUrl: fmt.Sprintf("http://localhost:8187/%d/%d/%s/%s/playlist.m3u8", courseID, streamID, version, end.Format("15-04-05")), + HLSUrl: edgeURL.String(), + // fmt.Sprintf("http://%v:%v/%v:8187/%d/%d/%s/%s/playlist.m3u8", edgeVM, 8089, hostname, courseID, streamID, version, end.Format("15-04-05")), //edgeURL.String(), }) if resp.Ok != true { log.Warn("streamAction: NotifyStreamStarted failed") diff --git a/runner/cmd.yaml b/runner/cmd.yaml index d4e171cdc..5d2aa34e7 100644 --- a/runner/cmd.yaml +++ b/runner/cmd.yaml @@ -1,4 +1,4 @@ -stream: '-y -hide_banner -nostats %v -t %.0f -i %v -c:v copy -c:a copy -f mpegts %v -c:v libx264 -preset veryfast -tune zerolatency -maxrate 2500k -bufsize 3000k -g 60 -r 30 -x264-params keyint=60:scenecut=0 -c:a aac -ar 44100 -b:a 128k -f hls -hls_time 2 -hls_list_size 3600 -hls_playlist_type event -hls_flags append_list -hls_segment_filename %v/%%05d.ts %v' +stream: '-y -hide_banner -loglevel quiet -nostats %v -t %.0f -i %v -c:v copy -c:a copy -f mpegts %v -c:v libx264 -preset veryfast -tune zerolatency -maxrate 2500k -bufsize 3000k -g 60 -r 30 -x264-params keyint=60:scenecut=0 -c:a aac -ar 44100 -b:a 128k -f hls -hls_time 2 -hls_list_size 3600 -hls_playlist_type event -hls_flags append_list -hls_segment_filename %v/%%05d.ts %v' SeparateAudioFast: "-i %v -vn -c:a copy %v" SeparateAudio: "-i %v -vn %v" diff --git a/runner/handlers.go b/runner/handlers.go index d76ceffb5..dc0833878 100644 --- a/runner/handlers.go +++ b/runner/handlers.go @@ -29,6 +29,8 @@ func (r *Runner) RequestStream(ctx context.Context, req *protobuf.StreamRequest) ctx = contextFromStreamReq(req, ctx) ctx = context.WithValue(ctx, "URL", "") ctx = context.WithValue(ctx, "Hostname", r.cfg.Hostname) + ctx = context.WithValue(ctx, "Port", r.cfg.Port) + ctx = context.WithValue(ctx, "Edge", r.cfg.EdgeServer) ctx = context.WithValue(ctx, "actionID", req.ActionID) r.log.Info("stream request", "jobID", req.ActionID) a := []*actions.Action{ diff --git a/runner/hls.go b/runner/hls.go index ddb9ef584..bdce778ec 100644 --- a/runner/hls.go +++ b/runner/hls.go @@ -17,12 +17,10 @@ func NewHLSServer(LiveDir string, log *slog.Logger) *HLSServer { func (h *HLSServer) Start() error { http.Handle("/", h) h.log.Info("starting hls server", "port", 8187) - return http.ListenAndServe(":8187", nil) + return http.ListenAndServe(":8187", h) } func (h *HLSServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - - h.log.Info("serving request", "path", r.URL.Path, "method", r.Method) + //w.Header().Set("Access-Control-Allow-Origin", "*") h.fs.ServeHTTP(w, r) } diff --git a/runner/runner.go b/runner/runner.go index 5f6d6b6ac..a9dbc4f2e 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -28,6 +28,7 @@ type envConfig struct { RecPath string `env:"REC_PATH" envDefault:"storage/rec"` GocastServer string `env:"GOCAST_SERVER" envDefault:"localhost:50056"` Hostname string `env:"REALHOST" envDefault:"localhost"` + EdgeServer string `env:"EDGE_SERVER" envDefault:"localhost"` Version string `env:"VERSION" envDefault:"dev"` } diff --git a/worker/edge/edge.go b/worker/edge/edge.go index a29682d2d..fd1332819 100644 --- a/worker/edge/edge.go +++ b/worker/edge/edge.go @@ -34,14 +34,14 @@ var ( inflightLock = sync.Mutex{} inflight = make(map[string]*sync.Mutex) - allowedRe = regexp.MustCompile(`^/[a-zA-Z0-9]+/([a-zA-Z0-9_]+/)*[a-zA-Z0-9_]+\.(ts|m3u8)$`) // e.g. /vm123/live/stream/1234.ts - // allowedRe = regexp.MustCompile("^.*$") // e.g. /vm123/live/strean/1234.ts + allowedRe = regexp.MustCompile(`^/[a-zA-Z0-9:]+/([\w.\-_]+/)*[\w.\-_]+\.(ts|m3u8)$`) // e.g. /vm123/live/stream/1234.ts + // allowedRe = regexp.MustCompile("^.*$") // e.g. /vm123/live/stream/1234.ts ) var port = ":8089" var ( - originPort = "8085" + originPort = "8187" originProto = "http://" ) @@ -303,8 +303,10 @@ func edgeHandler(writer http.ResponseWriter, request *http.Request) { return } - urlParts := strings.SplitN(request.URL.Path, "/", 3) // -> ["", "vm123", "live/stream/1234.ts"] + writer.Header().Set("Access-Control-Allow-Origin", allowedOrigin) + urlParts := strings.SplitN(request.URL.Path, "/", 3) // -> ["", "vm123", "live/stream/1234.ts"] + log.Println("requested:", urlParts[2]) // proxy m3u8 playlist if strings.HasSuffix(request.URL.Path, ".m3u8") { request.Host = urlParts[1] @@ -328,6 +330,7 @@ func edgeHandler(writer http.ResponseWriter, request *http.Request) { proxy.ServeHTTP(writer, request) return } + err := fetchFile(urlParts[1], urlParts[2]) if err != nil { log.Printf("Could not fetch file: %v", err)