diff --git a/api/runner_grpc.go b/api/runner_grpc.go index 51ecb223d..8a2830ced 100644 --- a/api/runner_grpc.go +++ b/api/runner_grpc.go @@ -640,15 +640,25 @@ func CreateJob(dao dao.DaoWrapper, ctx context.Context, values map[string]interf Status: 3, Type: "stream", Values: string(value), - }, - /* - model.Action{ - Status: 3, - Type: "upload", - Values: string(value), - } - */) + }) + job.Actions = append(job.Actions, actions...) + break + case "transcode": + actions = append(actions, model.Action{ + Status: 3, + Type: "transcode", + Values: string(value), + }) job.Actions = append(job.Actions, actions...) + break + case "upload": + actions = append(actions, model.Action{ + Status: 3, + Type: "upload", + Values: string(value), + }) + job.Actions = append(job.Actions, actions...) + break } err = dao.CreateJob(ctx, job) if err != nil { diff --git a/runner/actions/upload.go b/runner/actions/upload.go index 569e67e84..721f46a7b 100644 --- a/runner/actions/upload.go +++ b/runner/actions/upload.go @@ -2,8 +2,8 @@ package actions import ( "context" - "fmt" "github.com/TUM-Dev/gocast/worker/cfg" + "github.com/tum-dev/gocast/runner/protobuf" "io" "log/slog" "mime/multipart" @@ -18,66 +18,36 @@ func (a *ActionProvider) UploadAction() *Action { Type: UploadAction, ActionFn: func(ctx context.Context, log *slog.Logger) (context.Context, error) { - streamID, ok := ctx.Value("stream").(uint64) - if !ok { - return ctx, fmt.Errorf("%w: context doesn't contain stream", ErrRequiredContextValNotFound) - } - courseID, ok := ctx.Value("course").(uint64) - if !ok { - return ctx, fmt.Errorf("%w: context doesn't contain courseID", ErrRequiredContextValNotFound) - } - version, ok := ctx.Value("version").(string) - if !ok { - return ctx, fmt.Errorf("%w: context doesn't contain version", ErrRequiredContextValNotFound) - } + //course := ctx.Value("course").(uint32) + stream := ctx.Value("stream").(uint32) + url := ctx.Value("url").(string) - URLstring := ctx.Value("URL").(string) - - fileName := fmt.Sprintf("%s/%s/%s/%s.mp4", a.MassDir, courseID, streamID, version) + file := ctx.Value("uploadFile").(string) + //this is the part that from worker/upload.go client := &http.Client{ - // 5 minutes timeout, some large files can take a while. - Timeout: time.Minute * 5, + Timeout: time.Minute * 15, } - r, w := io.Pipe() writer := multipart.NewWriter(w) - //the same function as in the worker but without function calling - //so analyzing it and changing it later won't give much to look through - go func() { - defer func(w *io.PipeWriter) { - err := w.Close() - if err != nil { - - } - }(w) - defer func(writer *multipart.Writer) { - err := writer.Close() - if err != nil { - - } - }(writer) - formFileWriter, err := writer.CreateFormFile("filename", fileName) + defer w.Close() + defer writer.Close() + formFileWriter, err := writer.CreateFormFile("filename", file) if err != nil { - log.Error("Cannot create form file: ", err) + log.Error("cannot create form file: ", err) return } - FileReader, err := os.Open(fileName) + fileReader, err := os.Open(file) if err != nil { - log.Error("Cannot create form file: ", err) + log.Error("cannot create form file: ", err) return } - defer func(FileReader *os.File) { - err := FileReader.Close() - if err != nil { - - } - }(FileReader) - _, err = io.Copy(formFileWriter, FileReader) + defer fileReader.Close() + _, err = io.Copy(formFileWriter, fileReader) if err != nil { - log.Error("Cannot create form file: ", err) + log.Error("cannot create form file: ", err) return } @@ -91,39 +61,37 @@ func (a *ActionProvider) UploadAction() *Action { } for name, value := range fields { - formFileWriter, err := writer.CreateFormField(name) - if err != nil { - log.Error("Cannot create form field: ", err) - return - } - _, err = io.Copy(formFileWriter, strings.NewReader(value)) + formFieldWriter, err := writer.CreateFormField(name) if err != nil { log.Error("Cannot create form field: ", err) return } + _, err = io.Copy(formFieldWriter, strings.NewReader(value)) if err != nil { log.Error("Cannot create form field: ", err) return } } }() - rsp, err := client.Post(URLstring, writer.FormDataContentType(), r) + rsp, err := client.Post(cfg.UploadUrl, writer.FormDataContentType(), r) if err == nil && rsp.StatusCode != http.StatusOK { log.Error("Request failed with response code: ", rsp.StatusCode) } if err == nil && rsp != nil { all, err := io.ReadAll(rsp.Body) if err == nil { - log.Debug(string(all), "fileUploaded", fileName) + log.Info("File got uploaded", "Uploaded file", file) + log.Debug("file", all) } } - if err != nil { - log.Error("Failed to post video to TUMLive", "error", err) - return ctx, err - } - log.Info("Successfully posted video to TUMLive", "stream", fileName) - - return ctx, err + a.Server.NotifyVoDUploadFinished(ctx, &protobuf.VoDUploadFinished{ + HLSUrl: url, + StreamID: stream, + RunnerID: "", + SourceType: "", + ThumbnailUrl: "", + }) + return ctx, nil }, } } diff --git a/worker/cfg/cfg.go b/worker/cfg/cfg.go index 858639d02..ad54a45fe 100644 --- a/worker/cfg/cfg.go +++ b/worker/cfg/cfg.go @@ -19,6 +19,7 @@ var ( LrzSubDir string MainBase string LrzUploadUrl string + UploadUrl string VodURLTemplate string LogDir string Hostname string @@ -45,6 +46,7 @@ func SetConfig() { LrzPhone = os.Getenv("LrzPhone") LrzSubDir = os.Getenv("LrzSubDir") LrzUploadUrl = os.Getenv("LrzUploadUrl") + UploadUrl = os.Getenv("UploadUrl") MainBase = os.Getenv("MainBase") // eg. live.mm.rbg.tum.de VodURLTemplate = os.Getenv("VodURLTemplate") // eg. https://stream.lrz.de/vod/_definst_/mp4:tum/RBG/%s.mp4/playlist.m3u8