Skip to content

Commit

Permalink
working on the upload part for the stream. plan is to use this upload…
Browse files Browse the repository at this point in the history
… action to push everything after a hls stream is done or if the runner sees not posted hls files on start
  • Loading branch information
DawinYurtseven committed Dec 3, 2024
1 parent a7b9013 commit 01380c0
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 69 deletions.
26 changes: 18 additions & 8 deletions api/runner_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,15 +640,25 @@ func CreateJob(dao dao.DaoWrapper, ctx context.Context, values map[string]interf
Status: 3,
Type: "stream",
Values: string(value),
},
/*
model.Action{
Status: 3,
Type: "upload",
Values: string(value),
}
*/)
})
job.Actions = append(job.Actions, actions...)
break
case "transcode":
actions = append(actions, model.Action{
Status: 3,
Type: "transcode",
Values: string(value),
})
job.Actions = append(job.Actions, actions...)
break
case "upload":
actions = append(actions, model.Action{
Status: 3,
Type: "upload",
Values: string(value),
})
job.Actions = append(job.Actions, actions...)
break
}
err = dao.CreateJob(ctx, job)
if err != nil {
Expand Down
90 changes: 29 additions & 61 deletions runner/actions/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package actions

import (
"context"
"fmt"
"github.com/TUM-Dev/gocast/worker/cfg"
"github.com/tum-dev/gocast/runner/protobuf"
"io"
"log/slog"
"mime/multipart"
Expand All @@ -18,66 +18,36 @@ func (a *ActionProvider) UploadAction() *Action {
Type: UploadAction,
ActionFn: func(ctx context.Context, log *slog.Logger) (context.Context, error) {

streamID, ok := ctx.Value("stream").(uint64)
if !ok {
return ctx, fmt.Errorf("%w: context doesn't contain stream", ErrRequiredContextValNotFound)
}
courseID, ok := ctx.Value("course").(uint64)
if !ok {
return ctx, fmt.Errorf("%w: context doesn't contain courseID", ErrRequiredContextValNotFound)
}
version, ok := ctx.Value("version").(string)
if !ok {
return ctx, fmt.Errorf("%w: context doesn't contain version", ErrRequiredContextValNotFound)
}
//course := ctx.Value("course").(uint32)
stream := ctx.Value("stream").(uint32)
url := ctx.Value("url").(string)

URLstring := ctx.Value("URL").(string)

fileName := fmt.Sprintf("%s/%s/%s/%s.mp4", a.MassDir, courseID, streamID, version)
file := ctx.Value("uploadFile").(string)

//this is the part that from worker/upload.go
client := &http.Client{
// 5 minutes timeout, some large files can take a while.
Timeout: time.Minute * 5,
Timeout: time.Minute * 15,
}

r, w := io.Pipe()
writer := multipart.NewWriter(w)

//the same function as in the worker but without function calling
//so analyzing it and changing it later won't give much to look through

go func() {
defer func(w *io.PipeWriter) {
err := w.Close()
if err != nil {

}
}(w)
defer func(writer *multipart.Writer) {
err := writer.Close()
if err != nil {

}
}(writer)
formFileWriter, err := writer.CreateFormFile("filename", fileName)
defer w.Close()
defer writer.Close()
formFileWriter, err := writer.CreateFormFile("filename", file)
if err != nil {
log.Error("Cannot create form file: ", err)
log.Error("cannot create form file: ", err)
return
}
FileReader, err := os.Open(fileName)
fileReader, err := os.Open(file)
if err != nil {
log.Error("Cannot create form file: ", err)
log.Error("cannot create form file: ", err)
return
}
defer func(FileReader *os.File) {
err := FileReader.Close()
if err != nil {

}
}(FileReader)
_, err = io.Copy(formFileWriter, FileReader)
defer fileReader.Close()
_, err = io.Copy(formFileWriter, fileReader)
if err != nil {
log.Error("Cannot create form file: ", err)
log.Error("cannot create form file: ", err)
return
}

Expand All @@ -91,39 +61,37 @@ func (a *ActionProvider) UploadAction() *Action {
}

for name, value := range fields {
formFileWriter, err := writer.CreateFormField(name)
if err != nil {
log.Error("Cannot create form field: ", err)
return
}
_, err = io.Copy(formFileWriter, strings.NewReader(value))
formFieldWriter, err := writer.CreateFormField(name)
if err != nil {
log.Error("Cannot create form field: ", err)
return
}
_, err = io.Copy(formFieldWriter, strings.NewReader(value))
if err != nil {
log.Error("Cannot create form field: ", err)
return
}
}
}()
rsp, err := client.Post(URLstring, writer.FormDataContentType(), r)
rsp, err := client.Post(cfg.UploadUrl, writer.FormDataContentType(), r)
if err == nil && rsp.StatusCode != http.StatusOK {
log.Error("Request failed with response code: ", rsp.StatusCode)
}
if err == nil && rsp != nil {
all, err := io.ReadAll(rsp.Body)
if err == nil {
log.Debug(string(all), "fileUploaded", fileName)
log.Info("File got uploaded", "Uploaded file", file)
log.Debug("file", all)
}
}
if err != nil {
log.Error("Failed to post video to TUMLive", "error", err)
return ctx, err
}
log.Info("Successfully posted video to TUMLive", "stream", fileName)

return ctx, err
a.Server.NotifyVoDUploadFinished(ctx, &protobuf.VoDUploadFinished{
HLSUrl: url,
StreamID: stream,
RunnerID: "",
SourceType: "",
ThumbnailUrl: "",
})
return ctx, nil
},
}
}
2 changes: 2 additions & 0 deletions worker/cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
LrzSubDir string
MainBase string
LrzUploadUrl string
UploadUrl string
VodURLTemplate string
LogDir string
Hostname string
Expand All @@ -45,6 +46,7 @@ func SetConfig() {
LrzPhone = os.Getenv("LrzPhone")
LrzSubDir = os.Getenv("LrzSubDir")
LrzUploadUrl = os.Getenv("LrzUploadUrl")
UploadUrl = os.Getenv("UploadUrl")
MainBase = os.Getenv("MainBase") // eg. live.mm.rbg.tum.de
VodURLTemplate = os.Getenv("VodURLTemplate") // eg. https://stream.lrz.de/vod/_definst_/mp4:tum/RBG/%s.mp4/playlist.m3u8

Expand Down

0 comments on commit 01380c0

Please sign in to comment.