diff --git a/drivers/halalcloud/driver.go b/drivers/halalcloud/driver.go index 20f9ae69ba3..28b2f9be37e 100644 --- a/drivers/halalcloud/driver.go +++ b/drivers/halalcloud/driver.go @@ -4,24 +4,25 @@ import ( "context" "crypto/sha1" "fmt" + "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/pkg/http_range" "github.com/alist-org/alist/v3/pkg/utils" - bauth "github.com/baidubce/bce-sdk-go/auth" - "github.com/baidubce/bce-sdk-go/bce" - "github.com/baidubce/bce-sdk-go/services/bos" - "github.com/baidubce/bce-sdk-go/services/bos/api" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/city404/v6-public-rpc-proto/go/v6/common" pbPublicUser "github.com/city404/v6-public-rpc-proto/go/v6/user" pubUserFile "github.com/city404/v6-public-rpc-proto/go/v6/userfile" - "github.com/jinzhu/copier" "github.com/rclone/rclone/lib/readers" "github.com/zzzhr1990/go-common-entity/userfile" "io" + "net/url" + "path" "strconv" - "strings" "time" ) @@ -31,7 +32,6 @@ type HalalCloud struct { Addition uploadThread int - fileStatus int // 文件状态 类型,0小文件(1M)、1中型文件(16M)、2大型文件(32M) } func (d *HalalCloud) Config() driver.Config { @@ -48,10 +48,6 @@ func (d *HalalCloud) Init(ctx context.Context) error { d.uploadThread, d.UploadThread = 3, "3" } - if d.CustomUploadPartSize == "" { - d.CustomUploadPartSize = "0" - } - if d.HalalCommon == nil { d.HalalCommon = &HalalCommon{ Common: &Common{}, @@ -186,10 +182,6 @@ func (d *HalalCloud) getFiles(ctx context.Context, dir model.Obj) ([]model.Obj, client := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()) opDir := d.GetCurrentDir(dir) - //if len(args) > 0 { - // opDir = d.GetCurrentOpDir(dir, args, 0) - //} - filesList := FilesList{} for { result, err := client.List(ctx, &pubUserFile.FileListRequest{ Parent: &pubUserFile.File{Path: opDir}, @@ -201,14 +193,9 @@ func (d *HalalCloud) getFiles(ctx context.Context, dir model.Obj) ([]model.Obj, if err != nil { return nil, err } - err = copier.Copy(&filesList, result) - if err != nil { - return nil, err - } - if filesList.Files != nil && len(filesList.Files) > 0 { - for i := 0; i < len(filesList.Files); i++ { - files = append(files, filesList.Files[i]) - } + + for i := 0; len(result.Files) > i; i++ { + files = append(files, (*Files)(result.Files[i])) } if result.ListInfo == nil || result.ListInfo.Token == "" { @@ -222,28 +209,11 @@ func (d *HalalCloud) getFiles(ctx context.Context, dir model.Obj) ([]model.Obj, func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { - id := file.GetID() - - newPath := userfile.NewFormattedPath(d.GetCurrentDir(file)).GetPath() client := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()) - //if len(id) > 0 { - // newPath = "" - //} ctx1, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() - result, err := client.ParseFileSlice(ctx1, &pubUserFile.File{ - Identity: id, - Parent: file.(*Files).Parent, - Name: file.GetName(), - Path: newPath, - MimeType: file.(*Files).MimeType, - Size: file.(*Files).Size, - Type: file.(*Files).Type, - CreateTs: file.(*Files).CreateTs, - UpdateTs: file.(*Files).UpdateTs, - DeleteTs: file.(*Files).DeleteTs, - ContentIdentity: file.(*Files).ContentIdentity, - }) + + result, err := client.ParseFileSlice(ctx1, (*pubUserFile.File)(file.(*Files))) if err != nil { return nil, err } @@ -320,7 +290,6 @@ func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.Lin func (d *HalalCloud) makeDir(ctx context.Context, dir model.Obj, name string) (model.Obj, error) { newDir := userfile.NewFormattedPath(d.GetCurrentOpDir(dir, []string{name}, 0)).GetPath() _, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).Create(ctx, &pubUserFile.File{ - // Parent: &pubUserFile.File{Path: currentDir}, Path: newDir, }) return nil, err @@ -347,9 +316,6 @@ func (d *HalalCloud) move(ctx context.Context, obj model.Obj, dir model.Obj) (mo func (d *HalalCloud) rename(ctx context.Context, obj model.Obj, name string) (model.Obj, error) { id := obj.GetID() newPath := userfile.NewFormattedPath(d.GetCurrentOpDir(obj, []string{name}, 0)).GetPath() - //if len(id) > 0 { - // newPath = "" - //} _, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).Rename(ctx, &pubUserFile.File{ Path: newPath, Identity: id, @@ -399,101 +365,39 @@ func (d *HalalCloud) remove(ctx context.Context, obj model.Obj) error { func (d *HalalCloud) put(ctx context.Context, dstDir model.Obj, fileStream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { - tempFile, err := fileStream.CacheFullInTempFile() - if err != nil { - return nil, err - } - - newDir := userfile.NewFormattedPath(d.GetCurrentDir(dstDir)).GetPath() - newDir = strings.TrimSuffix(newDir, "/") + "/" + fileStream.GetName() + newDir := path.Join(dstDir.GetPath(), fileStream.GetName()) result, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).CreateUploadToken(ctx, &pubUserFile.File{ Path: newDir, }) if err != nil { return nil, err } - clientConfig := bos.BosClientConfiguration{ - Ak: result.AccessKey, - Sk: result.SecretKey, - Endpoint: result.Endpoint, - RedirectDisabled: false, - //SessionToken: result.SessionToken, - } - - // 初始化一个BosClient - bosClient, err := bos.NewClientWithConfig(&clientConfig) + u, _ := url.Parse(result.Endpoint) + u.Host = "s3." + u.Host + result.Endpoint = u.String() + s, err := session.NewSession(&aws.Config{ + HTTPClient: base.HttpClient, + Credentials: credentials.NewStaticCredentials(result.AccessKey, result.SecretKey, result.Token), + Region: aws.String(result.Region), + Endpoint: aws.String(result.Endpoint), + S3ForcePathStyle: aws.Bool(true), + }) if err != nil { - return nil, fmt.Errorf("failed to create bos client: %w", err) + return nil, err } - stsCredential, err := bauth.NewSessionBceCredentials( - result.AccessKey, - result.SecretKey, - result.Token) - if err != nil { - return nil, fmt.Errorf("failed to create sts credential: %w", err) + uploader := s3manager.NewUploader(s, func(u *s3manager.Uploader) { + u.Concurrency = d.uploadThread + }) + if fileStream.GetSize() > s3manager.MaxUploadParts*s3manager.DefaultUploadPartSize { + uploader.PartSize = fileStream.GetSize() / (s3manager.MaxUploadParts - 1) } - bosClient.Config.Credentials = stsCredential - bosClient.MaxParallel = int64(d.uploadThread) - - d.setFileStatus(fileStream.GetSize()) // 设置文件状态 - - bosClient.MultipartSize = d.getSliceSize() - - if fileStream.GetSize() < 1*utils.MB { - partBody, _ := bce.NewBodyFromSizedReader(tempFile, fileStream.GetSize()) - _, err := bosClient.PutObject(result.Bucket, result.Key, partBody, nil) - //_, err = bosClient.PutObjectFromStream(result.GetBucket(), fileStream.GetName(), tempFile, nil) - if err != nil { - return nil, fmt.Errorf("failed to upload file: %v ===> %s/%s", err, clientConfig.Ak, clientConfig.Sk) - } - up(100) - } else { - res, err := bosClient.BasicInitiateMultipartUpload(result.Bucket, result.Key) - // 分块大小按MULTIPART_ALIGN=1MB对齐 - partSize := (bosClient.MultipartSize + - bos.MULTIPART_ALIGN - 1) / bos.MULTIPART_ALIGN * bos.MULTIPART_ALIGN - - // 获取文件大小,并计算分块数目,最大分块数MAX_PART_NUMBER=10000 - fileSize := fileStream.GetSize() - partNum := (fileSize + partSize - 1) / partSize - if partNum > bos.MAX_PART_NUMBER { // 超过最大分块数,需调整分块大小 - partSize = (fileSize + bos.MAX_PART_NUMBER + 1) / bos.MAX_PART_NUMBER - partSize = (partSize + bos.MULTIPART_ALIGN - 1) / bos.MULTIPART_ALIGN * bos.MULTIPART_ALIGN - partNum = (fileSize + partSize - 1) / partSize - } - // 创建保存每个分块上传后的ETag和PartNumber信息的列表 - partEtags := make([]api.UploadInfoType, 0) - - // 逐个分块上传 - for i := int64(1); i <= partNum; i++ { - // 计算偏移offset和本次上传的大小uploadSize - uploadSize := partSize - offset := partSize * (i - 1) - left := fileSize - offset - if left < partSize { - uploadSize = left - } - - // 创建指定大小的文件流 - partBody, _ := bce.NewBodyFromSizedReader(tempFile, uploadSize) - - // 上传当前分块 - etag, _ := bosClient.BasicUploadPart(result.Bucket, result.Key, res.UploadId, int(i), partBody) - - // 保存当前分块上传成功后返回的序号和ETag - partEtags = append(partEtags, api.UploadInfoType{int(i), etag}) - - up(float64(i) / float64(partNum) * 100) - } + _, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{ + Bucket: aws.String(result.Bucket), + Key: aws.String(result.Key), + Body: io.TeeReader(fileStream, driver.NewProgress(fileStream.GetSize(), up)), + }) + return nil, err - completeArgs := &api.CompleteMultipartUploadArgs{Parts: partEtags} - _, err = bosClient.CompleteMultipartUploadFromStruct( - result.Bucket, result.Key, res.UploadId, completeArgs) - if err != nil { - return nil, err - } - } - return nil, nil } var _ driver.Driver = (*HalalCloud)(nil) diff --git a/drivers/halalcloud/meta.go b/drivers/halalcloud/meta.go index e184cc1281f..d4040323eb0 100644 --- a/drivers/halalcloud/meta.go +++ b/drivers/halalcloud/meta.go @@ -9,9 +9,8 @@ type Addition struct { // Usually one of two driver.RootPath // define other - RefreshToken string `json:"refresh_token" required:"true" help:"login type is refresh_token,this is required"` - UploadThread string `json:"upload_thread" default:"3" help:"1 <= thread <= 32"` - CustomUploadPartSize string `json:"custom_upload_part_size" default:"0" help:"0 for auto"` + RefreshToken string `json:"refresh_token" required:"true" help:"login type is refresh_token,this is required"` + UploadThread string `json:"upload_thread" default:"3" help:"1 <= thread <= 32"` AppID string `json:"app_id" required:"true" default:"alist/10001"` AppVersion string `json:"app_version" required:"true" default:"1.0.0"` diff --git a/drivers/halalcloud/types.go b/drivers/halalcloud/types.go index 9bb7e2c5b03..9772421264b 100644 --- a/drivers/halalcloud/types.go +++ b/drivers/halalcloud/types.go @@ -4,6 +4,7 @@ import ( "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/pkg/utils" "github.com/city404/v6-public-rpc-proto/go/v6/common" + pubUserFile "github.com/city404/v6-public-rpc-proto/go/v6/userfile" "google.golang.org/grpc" "time" ) @@ -50,34 +51,7 @@ type FilesList struct { var _ model.Obj = (*Files)(nil) -type Files struct { - Identity string `json:"identity,omitempty"` - Parent string `json:"parent,omitempty"` - Name string `json:"name,omitempty"` - Path string `json:"path,omitempty"` - MimeType string `json:"mime_type,omitempty"` - Size int64 `json:"size,omitempty"` - Type int64 `json:"type,omitempty"` - CreateTs int64 `json:"create_ts,omitempty"` - UpdateTs int64 `json:"update_ts,omitempty"` - DeleteTs int64 `json:"delete_ts,omitempty"` - Deleted bool `json:"deleted,omitempty"` - Dir bool `json:"dir,omitempty"` - Hidden bool `json:"hidden,omitempty"` - Locked bool `json:"locked,omitempty"` - Shared bool `json:"shared,omitempty"` - Starred bool `json:"starred,omitempty"` - Trashed bool `json:"trashed,omitempty"` - LockedAt int64 `json:"locked_at,omitempty"` - LockedBy string `json:"locked_by,omitempty"` - SharedAt int64 `json:"shared_at,omitempty"` - Flag int64 `json:"flag,omitempty"` - Unique string `json:"unique,omitempty"` - ContentIdentity string `json:"content_identity,omitempty"` - Label int64 `json:"label,omitempty"` - StoreType int64 `json:"store_type,omitempty"` - Version int64 `json:"version,omitempty"` -} +type Files pubUserFile.File func (f *Files) GetSize() int64 { return f.Size @@ -114,20 +88,6 @@ func (f *Files) GetPath() string { return f.Path } -func fileToObj(f Files) *model.ObjThumb { - return &model.ObjThumb{ - Object: model.Object{ - ID: f.Identity, - Path: f.Path, - Name: f.Name, - Size: f.Size, - Modified: f.ModTime(), - Ctime: f.CreateTime(), - IsFolder: f.IsDir(), - }, - } -} - type SteamFile struct { file model.File } diff --git a/drivers/halalcloud/util.go b/drivers/halalcloud/util.go index 0607d5bcf4d..e85c8e59b01 100644 --- a/drivers/halalcloud/util.go +++ b/drivers/halalcloud/util.go @@ -79,17 +79,6 @@ func (d *HalalCloud) NewAuthServiceWithOauth(options ...HalalOption) (*AuthServi } if oauthToken.Url != "" { - /* resultChan := make(chan *AuthService, 1) - errorChan := make(chan error, 1) - - go func() { - aService, err := d.GetRefreshToken(svc, &userClient, oauthToken) - if err != nil { - errorChan <- err - } else { - resultChan <- aService - } - }()*/ return nil, fmt.Errorf(`need verify: Click Here`, oauthToken.Url) } @@ -97,49 +86,6 @@ func (d *HalalCloud) NewAuthServiceWithOauth(options ...HalalOption) (*AuthServi } -/* - func (d *HalalCloud) GetRefreshToken(svc *AuthService, userClient *pbPublicUser.PubUserClient, oauthToken *pbPublicUser.OauthTokenResponse) (*AuthService, error) { - checkTimer := time.NewTicker(5 * time.Second) - defer checkTimer.Stop() - for { - select { - case <-checkTimer.C: - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - checkLoginResponse, err := (*userClient).VerifyAuthToken(ctx, &pbPublicUser.LoginRequest{ - State: oauthToken.State, - Callback: oauthToken.Callback, - ReturnType: 2, - }) - if err != nil { - return nil, err - } - if checkLoginResponse.Status == 6 { - login := checkLoginResponse.Login - if login == nil { - return nil, fmt.Errorf("login is nil") - } - if login.User != nil && len(login.Token.RefreshToken) > 0 { - // checkLoginResponse = checkLoginResponse - _ = d.refreshTokenFunc(login.Token.RefreshToken) - svc.OnAccessTokenRefreshed(login.Token.AccessToken, login.Token.AccessTokenExpireTs, login.Token.RefreshToken, login.Token.RefreshTokenExpireTs) - newAuthService, err := d.NewAuthService(login.Token.RefreshToken) - if err != nil { - return nil, err - } - return newAuthService, nil - // break - } - } - - // reset timer - checkTimer.Reset(5 * time.Second) - - } - } - } -*/ func (d *HalalCloud) NewAuthService(refreshToken string, options ...HalalOption) (*AuthService, error) { svc := d.HalalCommon.AuthService @@ -170,8 +116,6 @@ func (d *HalalCloud) NewAuthService(refreshToken string, options ...HalalOption) err := invoker(ctxx, method, req, reply, cc, opts...) // invoking RPC method if err != nil { grpcStatus, ok := status.FromError(err) - // if error is grpc error and error code is unauthenticated and error message contains "invalid accesstoken" and refresh token is not empty - // then refresh access token and retry if ok && grpcStatus.Code() == codes.Unauthenticated && strings.Contains(grpcStatus.Err().Error(), "invalid accesstoken") && len(refreshToken) > 0 { // refresh token refreshResponse, err := pbPublicUser.NewPubUserClient(cc).Refresh(ctx, &pbPublicUser.Token{ @@ -195,7 +139,6 @@ func (d *HalalCloud) NewAuthService(refreshToken string, options ...HalalOption) } } } - // post-processing return err })) grpcConnection, err := grpc.NewClient(grpcServer, grpcOptions...) @@ -225,7 +168,7 @@ func (s *AuthService) GetGrpcConnection() *grpc.ClientConn { } func (s *AuthService) Close() { - s.grpcConnection.Close() + _ = s.grpcConnection.Close() } func (s *AuthService) signContext(method string, ctx context.Context) context.Context { @@ -272,21 +215,6 @@ func (d *HalalCloud) GetCurrentDir(dir model.Obj) string { type Common struct { } -func tryAndGetRawFiles(addr *pubUserFile.SliceDownloadInfo) ([]byte, error) { - tryTimes := 0 - for { - tryTimes++ - dataBytes, err := getRawFiles(addr) - if err != nil { - if tryTimes > 3 { - return nil, err - } - continue - } - return dataBytes, nil - } -} - func getRawFiles(addr *pubUserFile.SliceDownloadInfo) ([]byte, error) { if addr == nil { @@ -336,8 +264,6 @@ func getRawFiles(addr *pubUserFile.SliceDownloadInfo) ([]byte, error) { } -// do others that not defined in Driver interface -// openObject represents a download in progress type openObject struct { ctx context.Context mu sync.Mutex @@ -414,9 +340,6 @@ func (oo *openObject) Close() (err error) { if oo.closed { return nil } - //err = utils.Retry(3, 500*time.Millisecond, func() (err error) { - // return oo.d.Finish() - //}) // 校验Sha1 if string(oo.shaTemp.Sum(nil)) != oo.sha { return fmt.Errorf("failed to finish download: %w", err) @@ -431,39 +354,6 @@ func GetMD5Hash(text string) string { return hex.EncodeToString(tHash[:]) } -const ( - SmallSliceSize int64 = 1 * utils.MB - MediumSliceSize = 16 * utils.MB - LargeSliceSize = 32 * utils.MB -) - -func (d *HalalCloud) getSliceSize() int64 { - customUploadPartSize, _ := strconv.ParseInt(d.CustomUploadPartSize, 10, 64) - if customUploadPartSize != 0 { - return customUploadPartSize * utils.MB - } - switch d.fileStatus { - case 0: - return SmallSliceSize - case 1: - return MediumSliceSize - case 2: - return LargeSliceSize - default: - return SmallSliceSize - } -} - -func (d *HalalCloud) setFileStatus(fileSize int64) { - if fileSize <= 32*utils.MB { - d.fileStatus = 0 - } else if fileSize <= 512*utils.MB { - d.fileStatus = 1 - } else { - d.fileStatus = 2 - } -} - // chunkSize describes a size and position of chunk type chunkSize struct { position int64