From f32c77b4bb08abef603e5e8b95e4adb4c3591a34 Mon Sep 17 00:00:00 2001 From: pierre-emmanuelJ Date: Mon, 3 Dec 2018 12:46:17 +0100 Subject: [PATCH] Add recursivity on sos upload (#70) Signed-off-by: Pierre-Emmanuel Jacquier --- cmd/sos_upload.go | 233 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 176 insertions(+), 57 deletions(-) diff --git a/cmd/sos_upload.go b/cmd/sos_upload.go index bcd18ae4..589175f1 100644 --- a/cmd/sos_upload.go +++ b/cmd/sos_upload.go @@ -1,24 +1,35 @@ package cmd import ( + "io/ioutil" "log" "net/http" "os" "path/filepath" "strings" + "sync" "time" "github.com/vbauerster/mpb" "github.com/vbauerster/mpb/decor" - humanize "github.com/dustin/go-humanize" minio "github.com/minio/minio-go" "github.com/spf13/cobra" ) +const ( + parallelSosUpload = 10 +) + +type fileToUpload struct { + localPath string + remotePath string + contentType string +} + // uploadCmd represents the upload command var sosUploadCmd = &cobra.Command{ - Use: "upload [remote file path]", + Use: "upload +", Short: "Upload an object into a bucket", Aliases: gUploadAlias, RunE: func(cmd *cobra.Command, args []string) error { @@ -26,11 +37,9 @@ var sosUploadCmd = &cobra.Command{ return cmd.Usage() } - args[1] = filepath.ToSlash(args[1]) - - var remoteFilePath string - if len(args) > 2 { - remoteFilePath = strings.TrimLeft(filepath.ToSlash(args[2]), "/") + remoteFilePath, err := cmd.Flags().GetString("remote-path") + if err != nil { + return err } minioClient, err := newMinioClient(sosZone) @@ -47,85 +56,195 @@ var sosUploadCmd = &cobra.Command{ if err != nil { return err } - - // Upload the file - bucketName := args[0] - objectName := filepath.Base(args[1]) - filePath := args[1] - - if strings.HasSuffix(remoteFilePath, "/") { - remoteFilePath = remoteFilePath + objectName - } - - file, err := os.Open(filePath) + lo, err := os.OpenFile("./log", os.O_RDWR|os.O_CREATE, os.ModePerm) if err != nil { return err } + minioClient.TraceOn(lo) - // Only the first 512 bytes are used to sniff the content type. - buffer := make([]byte, 512) - _, err = file.Read(buffer) + recursive, err := cmd.Flags().GetBool("recursive") if err != nil { return err } - if err = file.Close(); err != nil { - return err - } - - if remoteFilePath == "" { - remoteFilePath = objectName - } + // Upload the file + filesToUpload := []fileToUpload{} + bucketName := args[0] - contentType := http.DetectContentType(buffer) + for _, arg := range args[1:] { + + arg = filepath.ToSlash(arg) + objectName := filepath.Base(arg) + filePath := arg + + remote := strings.TrimLeft(filepath.ToSlash(remoteFilePath), "/") + + if (len(args[1:]) > 1 && remote != "") || (len(args[1:]) == 1 && strings.HasSuffix(remote, "/")) { + remote = filepath.Join(remoteFilePath, objectName) + } + + if remote == "" { + remote = objectName + } + + file, err := os.Open(filePath) + if err != nil { + return err + } + + fileStat, err := file.Stat() + if err != nil { + return err + } + + if recursive && fileStat.IsDir() { + filesToUpload, err = getFiles(filePath, strings.TrimRight(remote, "/"), filesToUpload) + } else { + var contentType string + if fileStat.Size() >= 512 { + // Only the first 512 bytes are used to sniff the content type. + buffer := make([]byte, 512) + _, err = file.Read(buffer) + + contentType = http.DetectContentType(buffer) + + } + filesToUpload = append(filesToUpload, fileToUpload{ + localPath: filePath, + remotePath: remote, + contentType: contentType, + }) + } + if err != nil { + return err + } + + if err = file.Close(); err != nil { + return err + } - fileInfo, err := os.Stat(filePath) - if err != nil { - return err } - progress := mpb.New( + lenFileToUpload := len(filesToUpload) + + var taskWG sync.WaitGroup + p := mpb.New( + mpb.WithWaitGroup(&taskWG), mpb.WithContext(gContext), // override default (80) width mpb.WithWidth(64), // override default 120ms refresh rate mpb.WithRefreshRate(180*time.Millisecond), ) + taskWG.Add(lenFileToUpload) + + workerSem := make(chan int, parallelSosUpload) + + for _, fToUpload := range filesToUpload { + + workerSem <- 1 + + go func(fileToUP fileToUpload, sem chan int, wg *sync.WaitGroup) { + fileInfo, err := os.Stat(fileToUP.localPath) + if err != nil { + log.Fatal(err) + } + + base := filepath.Base(fileToUP.localPath) + bar := p.AddBar(fileInfo.Size(), + mpb.AppendDecorators( + // simple name decorator + decor.Name(base, decor.WC{W: len(base) + 1, C: decor.DidentRight}), + ), + mpb.PrependDecorators( + decor.OnComplete(decor.AverageETA(decor.ET_STYLE_GO), "done!"), + // decor.DSyncWidth bit enables column width synchronization + decor.Percentage(decor.WCSyncSpace), + ), + ) + + f, err := os.Open(fileToUP.localPath) + if err != nil { + log.Fatal(err) + } + defer f.Close() //nolint: errcheck + defer wg.Done() + + reader := bar.ProxyReader(f) + // Upload object with FPutObject + _, upErr := minioClient.PutObjectWithContext( + gContext, + bucketName, + fileToUP.remotePath, + f, + fileInfo.Size(), + minio.PutObjectOptions{ + ContentType: fileToUP.contentType, + Progress: reader, + }, + ) + if upErr != nil { + log.Fatal(upErr) + } + + <-sem + }(fToUpload, workerSem, &taskWG) + } + taskWG.Wait() + p.Wait() - bar := progress.AddBar(fileInfo.Size(), - mpb.PrependDecorators( - // simple name decorator - decor.Name(objectName, decor.WC{W: len(objectName) + 1, C: decor.DidentRight}), - // decor.DSyncWidth bit enables column width synchronization - decor.Percentage(decor.WCSyncSpace), - ), - mpb.AppendDecorators( - decor.AverageETA(decor.ET_STYLE_GO), - ), - ) + return nil + }, +} - f, err := os.Open(filePath) - if err != nil { - return err +func getFiles(folderName, remoteFilePath string, resFiles []fileToUpload) ([]fileToUpload, error) { + files, err := ioutil.ReadDir(folderName) + if err != nil { + return nil, err + } + + for _, f := range files { + localPath := filepath.Join(folderName, f.Name()) + if f.IsDir() { + resFiles, err = getFiles(localPath, filepath.Join(remoteFilePath, f.Name()), resFiles) + if err != nil { + return nil, err + } + continue } - defer f.Close() // nolint: errcheck - - reader := bar.ProxyReader(f) - // Upload object with FPutObject - n, err := minioClient.PutObjectWithContext(gContext, bucketName, remoteFilePath, f, fileInfo.Size(), minio.PutObjectOptions{ContentType: contentType, Progress: reader}) + file, err := os.Open(localPath) if err != nil { - return err + return nil, err } - progress.Wait() + var contentType string + if f.Size() >= 512 { + // Only the first 512 bytes are used to sniff the content type. + buffer := make([]byte, 512) + _, err = file.Read(buffer) + if err != nil { + return nil, err + } - log.Printf("Successfully uploaded %s of size %s\n", objectName, humanize.IBytes(uint64(n))) + contentType = http.DetectContentType(buffer) + } - return nil - }, + resFiles = append(resFiles, fileToUpload{ + localPath: localPath, + remotePath: filepath.Join(remoteFilePath, f.Name()), + contentType: contentType, + }) + + if err := file.Close(); err != nil { + return nil, err + } + } + return resFiles, nil } func init() { sosCmd.AddCommand(sosUploadCmd) + sosUploadCmd.Flags().BoolP("recursive", "r", false, "Upload a folder recursively") + sosUploadCmd.Flags().StringP("remote-path", "p", "", "Set a remote path for local file(s)") }