Skip to content

Commit

Permalink
Add recursivity on sos upload (#70)
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre-Emmanuel Jacquier <[email protected]>
  • Loading branch information
pierre-emmanuelJ authored Dec 3, 2018
1 parent 6c8d119 commit f32c77b
Showing 1 changed file with 176 additions and 57 deletions.
233 changes: 176 additions & 57 deletions cmd/sos_upload.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,45 @@
package cmd

import (
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/vbauerster/mpb"
"github.com/vbauerster/mpb/decor"

humanize "github.com/dustin/go-humanize"
minio "github.com/minio/minio-go"
"github.com/spf13/cobra"
)

const (
parallelSosUpload = 10
)

type fileToUpload struct {
localPath string
remotePath string
contentType string
}

// uploadCmd represents the upload command
var sosUploadCmd = &cobra.Command{
Use: "upload <bucket name> <local file path> [remote file path]",
Use: "upload <bucket name> <local file path>+",
Short: "Upload an object into a bucket",
Aliases: gUploadAlias,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) < 2 {
return cmd.Usage()
}

args[1] = filepath.ToSlash(args[1])

var remoteFilePath string
if len(args) > 2 {
remoteFilePath = strings.TrimLeft(filepath.ToSlash(args[2]), "/")
remoteFilePath, err := cmd.Flags().GetString("remote-path")
if err != nil {
return err
}

minioClient, err := newMinioClient(sosZone)
Expand All @@ -47,85 +56,195 @@ var sosUploadCmd = &cobra.Command{
if err != nil {
return err
}

// Upload the file
bucketName := args[0]
objectName := filepath.Base(args[1])
filePath := args[1]

if strings.HasSuffix(remoteFilePath, "/") {
remoteFilePath = remoteFilePath + objectName
}

file, err := os.Open(filePath)
lo, err := os.OpenFile("./log", os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
return err
}
minioClient.TraceOn(lo)

// Only the first 512 bytes are used to sniff the content type.
buffer := make([]byte, 512)
_, err = file.Read(buffer)
recursive, err := cmd.Flags().GetBool("recursive")
if err != nil {
return err
}

if err = file.Close(); err != nil {
return err
}

if remoteFilePath == "" {
remoteFilePath = objectName
}
// Upload the file
filesToUpload := []fileToUpload{}
bucketName := args[0]

contentType := http.DetectContentType(buffer)
for _, arg := range args[1:] {

arg = filepath.ToSlash(arg)
objectName := filepath.Base(arg)
filePath := arg

remote := strings.TrimLeft(filepath.ToSlash(remoteFilePath), "/")

if (len(args[1:]) > 1 && remote != "") || (len(args[1:]) == 1 && strings.HasSuffix(remote, "/")) {
remote = filepath.Join(remoteFilePath, objectName)
}

if remote == "" {
remote = objectName
}

file, err := os.Open(filePath)
if err != nil {
return err
}

fileStat, err := file.Stat()
if err != nil {
return err
}

if recursive && fileStat.IsDir() {
filesToUpload, err = getFiles(filePath, strings.TrimRight(remote, "/"), filesToUpload)
} else {
var contentType string
if fileStat.Size() >= 512 {
// Only the first 512 bytes are used to sniff the content type.
buffer := make([]byte, 512)
_, err = file.Read(buffer)

contentType = http.DetectContentType(buffer)

}
filesToUpload = append(filesToUpload, fileToUpload{
localPath: filePath,
remotePath: remote,
contentType: contentType,
})
}
if err != nil {
return err
}

if err = file.Close(); err != nil {
return err
}

fileInfo, err := os.Stat(filePath)
if err != nil {
return err
}

progress := mpb.New(
lenFileToUpload := len(filesToUpload)

var taskWG sync.WaitGroup
p := mpb.New(
mpb.WithWaitGroup(&taskWG),
mpb.WithContext(gContext),
// override default (80) width
mpb.WithWidth(64),
// override default 120ms refresh rate
mpb.WithRefreshRate(180*time.Millisecond),
)
taskWG.Add(lenFileToUpload)

workerSem := make(chan int, parallelSosUpload)

for _, fToUpload := range filesToUpload {

workerSem <- 1

go func(fileToUP fileToUpload, sem chan int, wg *sync.WaitGroup) {
fileInfo, err := os.Stat(fileToUP.localPath)
if err != nil {
log.Fatal(err)
}

base := filepath.Base(fileToUP.localPath)
bar := p.AddBar(fileInfo.Size(),
mpb.AppendDecorators(
// simple name decorator
decor.Name(base, decor.WC{W: len(base) + 1, C: decor.DidentRight}),
),
mpb.PrependDecorators(
decor.OnComplete(decor.AverageETA(decor.ET_STYLE_GO), "done!"),
// decor.DSyncWidth bit enables column width synchronization
decor.Percentage(decor.WCSyncSpace),
),
)

f, err := os.Open(fileToUP.localPath)
if err != nil {
log.Fatal(err)
}
defer f.Close() //nolint: errcheck
defer wg.Done()

reader := bar.ProxyReader(f)
// Upload object with FPutObject
_, upErr := minioClient.PutObjectWithContext(
gContext,
bucketName,
fileToUP.remotePath,
f,
fileInfo.Size(),
minio.PutObjectOptions{
ContentType: fileToUP.contentType,
Progress: reader,
},
)
if upErr != nil {
log.Fatal(upErr)
}

<-sem
}(fToUpload, workerSem, &taskWG)
}
taskWG.Wait()
p.Wait()

bar := progress.AddBar(fileInfo.Size(),
mpb.PrependDecorators(
// simple name decorator
decor.Name(objectName, decor.WC{W: len(objectName) + 1, C: decor.DidentRight}),
// decor.DSyncWidth bit enables column width synchronization
decor.Percentage(decor.WCSyncSpace),
),
mpb.AppendDecorators(
decor.AverageETA(decor.ET_STYLE_GO),
),
)
return nil
},
}

f, err := os.Open(filePath)
if err != nil {
return err
func getFiles(folderName, remoteFilePath string, resFiles []fileToUpload) ([]fileToUpload, error) {
files, err := ioutil.ReadDir(folderName)
if err != nil {
return nil, err
}

for _, f := range files {
localPath := filepath.Join(folderName, f.Name())
if f.IsDir() {
resFiles, err = getFiles(localPath, filepath.Join(remoteFilePath, f.Name()), resFiles)
if err != nil {
return nil, err
}
continue
}
defer f.Close() // nolint: errcheck

reader := bar.ProxyReader(f)

// Upload object with FPutObject
n, err := minioClient.PutObjectWithContext(gContext, bucketName, remoteFilePath, f, fileInfo.Size(), minio.PutObjectOptions{ContentType: contentType, Progress: reader})
file, err := os.Open(localPath)
if err != nil {
return err
return nil, err
}

progress.Wait()
var contentType string
if f.Size() >= 512 {
// Only the first 512 bytes are used to sniff the content type.
buffer := make([]byte, 512)
_, err = file.Read(buffer)
if err != nil {
return nil, err
}

log.Printf("Successfully uploaded %s of size %s\n", objectName, humanize.IBytes(uint64(n)))
contentType = http.DetectContentType(buffer)
}

return nil
},
resFiles = append(resFiles, fileToUpload{
localPath: localPath,
remotePath: filepath.Join(remoteFilePath, f.Name()),
contentType: contentType,
})

if err := file.Close(); err != nil {
return nil, err
}
}
return resFiles, nil
}

func init() {
sosCmd.AddCommand(sosUploadCmd)
sosUploadCmd.Flags().BoolP("recursive", "r", false, "Upload a folder recursively")
sosUploadCmd.Flags().StringP("remote-path", "p", "", "Set a remote path for local file(s)")
}

0 comments on commit f32c77b

Please sign in to comment.