Skip to content

Commit

Permalink
adjusted the hls access from edge server and made the proxy work. adj…
Browse files Browse the repository at this point in the history
…usted the cmd.yaml for less clutter during hls stream
  • Loading branch information
DawinYurtseven committed Nov 12, 2024
1 parent aeaafdb commit 7e66a67
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 73 deletions.
16 changes: 16 additions & 0 deletions api/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func configActionRouter(r *gin.Engine, wrapper dao.DaoWrapper) {
routes := actionRoutes{dao: wrapper.ActionDao}

g.GET("/failed", routes.getFailedActions)
g.GET("/:id", routes.getActionById)
}

type actionRoutes struct {
Expand All @@ -37,3 +38,18 @@ func (a actionRoutes) getFailedActions(c *gin.Context) {
res := make([]gin.H, len(models))
c.JSON(http.StatusOK, res)
}

func (a actionRoutes) getActionById(c *gin.Context) {
log.Info("Getting action by id: ", c.Param("id"))
ctx := context.Background()
model, err := a.dao.GetActionByID(ctx, c.Param("id"))
if err != nil {
_ = c.Error(tools.RequestError{
Status: http.StatusNotFound,
CustomMessage: "Action not found",
Err: err,
})
return
}
c.JSON(http.StatusOK, model)
}
148 changes: 96 additions & 52 deletions api/runner_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,21 @@ type GrpcRunnerServer struct {
//

func (g GrpcRunnerServer) Register(ctx context.Context, request *protobuf.RegisterRequest) (*protobuf.RegisterResponse, error) {
runner := model.Runner{
Hostname: request.Hostname,
Port: int(request.Port),
LastSeen: time.Now(),
Status: "Alive",
Workload: 0,
runner, err := g.RunnerDao.Get(ctx, request.Hostname)
if runner == nil || runner.Hostname == "" {
runner = &model.Runner{
Hostname: request.Hostname,
Port: int(request.Port),
LastSeen: time.Now(),
Status: "Alive",
Workload: 0,
}
}
err := g.RunnerDao.Create(ctx, &runner)
err = g.RunnerDao.Create(ctx, runner)
if err != nil {
return nil, err
}
return &protobuf.RegisterResponse{}, nil
return &protobuf.RegisterResponse{ID: runner.Hostname}, nil
}

func (g GrpcRunnerServer) Heartbeat(ctx context.Context, request *protobuf.HeartbeatRequest) (*protobuf.HeartbeatResponse, error) {
Expand All @@ -71,6 +74,7 @@ func (g GrpcRunnerServer) Heartbeat(ctx context.Context, request *protobuf.Heart
Disk: request.Disk,
Uptime: request.Uptime,
Version: request.Version,
Actions: request.CurrentAction,
}
ctx = context.WithValue(ctx, "newStats", newStats)
log.Info("Updating runner stats ", "runner", r)
Expand Down Expand Up @@ -103,26 +107,32 @@ func StreamRequest(ctx context.Context, dao dao.DaoWrapper, runner model.Runner)
logger.Error("No source", "source", source)
return
}
/*server, err := dao.IngestServerDao.GetBestIngestServer()
if err != nil {
logger.Error("can't find ingest server", "err", err)
return
}*/

//var slot model.StreamName
/*if version == "COMB" { //try to find a transcoding slot for comb view:
slot, err = dao.IngestServerDao.GetTranscodedStreamSlot(server.ID)
}
if version != "COMB" || err != nil {
slot, err = dao.IngestServerDao.GetStreamSlot(server.ID)
//TODO: Implement environment variable for ingest
ingest := false
if ingest {
server, err := dao.IngestServerDao.GetBestIngestServer()
if err != nil {
logger.Error("No free stream slot", "err", err)
logger.Error("can't find ingest server", "err", err)
return
}
}*/

var slot model.StreamName
if version == "COMB" { //try to find a transcoding slot for comb view:
slot, err = dao.IngestServerDao.GetTranscodedStreamSlot(server.ID)
}
if version != "COMB" || err != nil {
slot, err = dao.IngestServerDao.GetStreamSlot(server.ID)
if err != nil {
logger.Error("No free stream slot", "err", err)
return
}
}
slot.StreamID = stream.ID
dao.IngestServerDao.SaveSlot(slot)
}

src := "rtsp://" + source
//slot.StreamID = stream.ID
//dao.IngestServerDao.SaveSlot(slot)
req := protobuf.StreamRequest{
ActionID: actionID,
Stream: uint64(stream.ID),
Expand Down Expand Up @@ -259,18 +269,17 @@ func (g GrpcRunnerServer) RequestSelfStream(ctx context.Context, request *protob
}, nil
}

func (g GrpcRunnerServer) NotifyStreamEnd(ctx context.Context, request *protobuf.StreamEndRequest) (*protobuf.StreamEndResponse, error) {
func (g GrpcRunnerServer) NotifyStreamEnded(ctx context.Context, request *protobuf.StreamEnded) (*protobuf.Status, error) {
//TODO Test me
stream, err := g.StreamsDao.GetStreamByID(ctx, fmt.Sprintf("%v", request.ActionID))
stream, err := g.StreamsDao.GetStreamByID(ctx, fmt.Sprintf("%v", request.StreamID))
if err != nil {
return nil, err
return &protobuf.Status{Ok: false}, err
}
err = g.StreamsDao.SaveEndedState(stream.ID, true)
if err != nil {
return nil, err
return &protobuf.Status{Ok: false}, err
}
return &protobuf.StreamEndResponse{}, nil

return &protobuf.Status{Ok: true}, nil
}

func (g GrpcRunnerServer) NotifyStreamStarted(ctx context.Context, request *protobuf.StreamStarted) (*protobuf.Status, error) {
Expand Down Expand Up @@ -331,6 +340,8 @@ func (g GrpcRunnerServer) NotifyStreamStarted(ctx context.Context, request *prot
request.HLSUrl = strings.ReplaceAll(request.HLSUrl, "?dvr", "")
}

logger.Info("hls url", "url", request.HLSUrl)

switch request.Version {
case "CAM":
g.StreamsDao.SaveCAMURL(&stream, request.HLSUrl)
Expand Down Expand Up @@ -409,10 +420,9 @@ func SetTranscodeFinished(ctx context.Context, req *protobuf.ActionFinished) (*p
func NotifyForStreams(dao dao.DaoWrapper) func() {
return func() {

logger.Info("Notifying runners")
logger.Info("Collecting due streams")

streams := dao.StreamsDao.GetDueStreamsForWorkers()
logger.Info("incoming stream count", "count", len(streams))
for i := range streams {
err := dao.StreamsDao.SaveEndedState(streams[i].ID, false)
if err != nil {
Expand Down Expand Up @@ -445,7 +455,7 @@ func NotifyForStreams(dao dao.DaoWrapper) func() {
values["source"] = lectureHallForStream.PresIP
err = CreateJob(dao, ctx, values) //presentation
if err != nil {
logger.Error("Can't create job", err)
log.Error("Can't create job", err)
}
break
case 2: //camera
Expand Down Expand Up @@ -489,33 +499,33 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
logger.Info("Assigning runners to action")
ctx := context.Background()

//Running normal jobs with the idea that they are working as they should
jobs, err := dao.JobDao.GetAllOpenJobs(ctx)
//checking for each running action if the runner is still doing the job or if it is dead
activeAction, err := dao.ActionDao.GetRunningActions(ctx)
if err != nil {
logger.Error("Can't get jobs", err)
return
logger.Error("Can't get running actions", err)
}
for _, job := range jobs {
action, err := job.GetNextAction()
if err != nil {
logger.Error("Can't get next action", err)
for _, action := range activeAction {
if action.End.Before(time.Now().Add(5 * time.Minute)) {
action.SetToIgnored()
log.Info("Action ignored, check for progress manually", "action", action.ID)
continue
}
err = AssignRunnerAction(dao, action)
runner, err := action.GetCurrentRunner()
if err != nil {
logger.Error("Can't assign runner to action", err)
logger.Error("Can't get current runner", err)
action.SetToFailed()
err = dao.ActionDao.UpdateAction(ctx, &action)
if err != nil {
return
}
continue
}
}
//checking for each running action if the runner is still doing the job or if it is dead
activeAction, err := dao.ActionDao.GetRunningActions(ctx)
if err != nil {
logger.Error("Can't get running actions", err)
}
for _, action := range activeAction {
runner := action.GetCurrentRunner()
if !runner.IsAlive() && !action.IsCompleted() {
action.SetToFailed()
err = dao.ActionDao.UpdateAction(ctx, &action)
if err != nil {
return
}
}
}

Expand All @@ -525,12 +535,45 @@ func NotifyRunnerAssignments(dao dao.DaoWrapper) func() {
return
}
for _, failedAction := range failedActions {
failedAction.SetToRestarted()
failedAction.SetToRunning()
err = AssignRunnerAction(dao, &failedAction)
err = dao.ActionDao.UpdateAction(ctx, &failedAction)
if err != nil {
return
}
if err != nil {
logger.Error("Can't assign runner to action", err)
}
}

//Running normal jobs with the idea that they are working as they should
jobs, err := dao.JobDao.GetAllOpenJobs(ctx)
if err != nil {
logger.Error("Can't get jobs", err)
return
}
for _, job := range jobs {
action, err := job.GetNextAction()
if err != nil {
logger.Error("Can't get next action", err)
continue
}
if dao.JobDao.UpdateJob(ctx, job) != nil {
logger.Error("Can't update job", err)
continue
}
err = AssignRunnerAction(dao, action)
if err != nil {
logger.Error("Can't assign runner to action", err)
continue
}
action.SetToRunning()
err = dao.ActionDao.UpdateAction(ctx, action)
if err != nil {
return
}
}

}
}

Expand Down Expand Up @@ -569,11 +612,12 @@ func AssignRunnerAction(dao dao.DaoWrapper, action *model.Action) error {
//TranscodingRequest(ctx, dao, runner)
break
}

action.SetToRunning()
return nil
}

func CreateJob(dao dao.DaoWrapper, ctx context.Context, values map[string]interface{}) error {
logger.Info("Creating Job", "values", values)
job := model.Job{
Start: time.Now(),
Completed: false,
Expand Down
5 changes: 5 additions & 0 deletions dao/Action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ActionDao interface {
GetRunningActions(ctx context.Context) ([]model.Action, error)
GetAll(ctx context.Context) ([]model.Action, error)
GetAllFailedActions(ctx context.Context) ([]model.Action, error)
UpdateAction(ctx context.Context, action *model.Action) error
}

type actionDao struct {
Expand Down Expand Up @@ -70,3 +71,7 @@ func (d actionDao) GetAllFailedActions(ctx context.Context) ([]model.Action, err
err := d.db.WithContext(ctx).Find(&actions, "status = ?", 2).Error
return actions, err
}

func (d actionDao) UpdateAction(ctx context.Context, action *model.Action) error {
return d.db.WithContext(ctx).Model(&model.Action{}).Where("id = ?", action.ID).Updates(action).Error
}
5 changes: 5 additions & 0 deletions dao/Jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type JobDao interface {
GetRunners(ctx context.Context, job model.Job) ([]*model.Runner, error)
RemoveAction(ctx context.Context, job model.Job, actionID uint32) error
GetAllOpenJobs(ctx context.Context) ([]model.Job, error)
UpdateJob(ctx context.Context, job model.Job) error
}

type jobDao struct {
Expand Down Expand Up @@ -53,3 +54,7 @@ func (j jobDao) GetAllOpenJobs(ctx context.Context) ([]model.Job, error) {
err := j.db.WithContext(ctx).Model(&model.Job{}).Preload("Actions").Find(&jobs).Where("completed = ?", false).Error
return jobs, err
}

func (j jobDao) UpdateJob(ctx context.Context, job model.Job) error {
return j.db.WithContext(ctx).Model(&job).Updates(&job).Error
}
4 changes: 2 additions & 2 deletions dao/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type RunnerDao interface {
// Get Runner by hostname
Get(context.Context, string) (model.Runner, error)
Get(context.Context, string) (*model.Runner, error)

// Get all Runners in an array
GetAll(context.Context) ([]model.Runner, error)
Expand All @@ -33,7 +33,7 @@ func NewRunnerDao() RunnerDao {
}

// Get a Runner by id.
func (d runnerDao) Get(c context.Context, hostname string) (res model.Runner, err error) {
func (d runnerDao) Get(c context.Context, hostname string) (res *model.Runner, err error) {
return res, DB.WithContext(c).First(&res, "hostname = ?", hostname).Error
}

Expand Down
13 changes: 8 additions & 5 deletions model/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const (
running
failed
awaiting
restarted
ignored
)

type Action struct {
Expand Down Expand Up @@ -55,16 +55,19 @@ func (a *Action) SetToAwaiting() {
a.Status = awaiting
}

func (a *Action) SetToRestarted() {
a.Status = restarted
func (a *Action) SetToIgnored() {
a.Status = ignored
}

func (a *Action) IsCompleted() bool {
return a.Status == completed
}

func (a *Action) GetCurrentRunner() Runner {
return a.AllRunners[len(a.AllRunners)-1]
func (a *Action) GetCurrentRunner() (*Runner, error) {
if len(a.AllRunners) == 0 {
return nil, errors.New("no runner assigned")
}
return &a.AllRunners[len(a.AllRunners)-1], nil
}

func (a *Action) AssignRunner(runner Runner) {
Expand Down
5 changes: 3 additions & 2 deletions model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ func (j *Job) GetAllActions() ([]Action, error) {
func (j *Job) GetNextAction() (*Action, error) {
if j.Actions == nil {
return nil, errors.New("no actions found")
} else if j.Actions[0].Status == completed {
return nil, errors.New("action already completed, not pushed")
} else if j.Actions[0].Status != awaiting {
j.Actions = j.Actions[1:]
return nil, errors.New("action not in awaiting, not pushed")
}
if len(j.Actions) == 0 {
j.Completed = true
Expand Down
8 changes: 7 additions & 1 deletion model/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Runner struct {
Disk string
Uptime string
Version string
Actions string
}

// BeforeCreate returns errors if hostnames and ports of workers are invalid.
Expand All @@ -47,6 +48,11 @@ func (r *Runner) UpdateStats(tx *gorm.DB, ctx context.Context) (bool, error) {
}

func (r *Runner) IsAlive() bool {
r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -1))
r.Alive = r.LastSeen.After(time.Now().Add(time.Minute * -2))
if r.Alive {
r.Status = "Alive"
} else {
r.Status = "Dead"
}
return r.Alive
}
Loading

0 comments on commit 7e66a67

Please sign in to comment.