Skip to content

Commit

Permalink
Achieve capacity limits
Browse files Browse the repository at this point in the history
  • Loading branch information
duanhongyi committed Jul 22, 2022
1 parent b287c34 commit 160f329
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 145 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ goofys
goofys.test
xout
s3proxy.jar
.vscode
10 changes: 7 additions & 3 deletions api/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type PartSizeConfig struct {
PartSize uint64
PartSize uint64
PartCount uint64
}

Expand All @@ -45,8 +45,12 @@ type FlagStorage struct {

// Common Backend Config
UseContentType bool
Endpoint string
Backend interface{}

Provider string
Capacity uint64
DiskUsageInterval uint64
Endpoint string
Backend interface{}

// Tuning
MemoryLimit uint64
Expand Down
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,21 @@ require (
github.com/aws/aws-sdk-go v1.38.7
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.2
github.com/gopherjs/gopherjs v0.0.0-20210202160940-bed99a852dfe // indirect
github.com/jacobsa/fuse v0.0.0-20210818065549-10d864429bf7
github.com/jtolds/gls v4.2.0+incompatible // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/kr/pretty v0.1.1-0.20190720101428-71e7e4993750 // indirect
github.com/mattn/go-ieproxy v0.0.0-20190805055040-f9202b1cfdeb // indirect
github.com/minio/madmin-go v1.4.6
github.com/mitchellh/go-homedir v1.1.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/sevlyar/go-daemon v0.1.5
github.com/shirou/gopsutil v0.0.0-20190731134726-d80c43f9c984
github.com/sirupsen/logrus v1.8.1
github.com/smartystreets/assertions v0.0.0-20160201214316-443d812296a8 // indirect
github.com/smartystreets/goconvey v1.6.1-0.20160119221636-995f5b2e021c // indirect
github.com/urfave/cli v1.21.1-0.20190807111034-521735b7608a
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a
google.golang.org/api v0.49.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
gopkg.in/ini.v1 v1.46.0
gopkg.in/ini.v1 v1.62.0
)

replace github.com/aws/aws-sdk-go => ./s3ext
Expand Down
163 changes: 163 additions & 0 deletions go.sum

Large diffs are not rendered by default.

26 changes: 22 additions & 4 deletions internal/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type Capabilities struct {
MaxMultipartSize uint64
MaxMultipartSize uint64
// indicates that the blob store has native support for directories
DirBlob bool
Name string
Expand All @@ -44,7 +44,7 @@ type BlobItemOutput struct {
Size uint64
StorageClass *string
// may be nil in list responses for backends that don't return metadata in listings
Metadata map[string]*string
Metadata map[string]*string
}

type HeadBlobOutput struct {
Expand Down Expand Up @@ -179,7 +179,7 @@ type MultipartBlobAddInput struct {

type MultipartBlobAddOutput struct {
RequestId string
PartId *string
PartId *string
}

type MultipartBlobCopyInput struct {
Expand All @@ -192,7 +192,7 @@ type MultipartBlobCopyInput struct {

type MultipartBlobCopyOutput struct {
RequestId string
PartId *string
PartId *string
}

type MultipartBlobCommitOutput struct {
Expand Down Expand Up @@ -228,6 +228,14 @@ type MakeBucketOutput struct {
RequestId string
}

type GetBucketUsageInput struct {
}

type GetBucketUsageOutput struct {
Size uint64
RequestId string
}

/// Implementations of all the functions here are expected to be
/// concurrency-safe, except for
///
Expand Down Expand Up @@ -256,6 +264,7 @@ type StorageBackend interface {
MultipartExpire(param *MultipartExpireInput) (*MultipartExpireOutput, error)
RemoveBucket(param *RemoveBucketInput) (*RemoveBucketOutput, error)
MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error)
GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error)
Delegate() interface{}
}

Expand Down Expand Up @@ -424,6 +433,11 @@ func (s *StorageBackendInitWrapper) MakeBucket(param *MakeBucketInput) (*MakeBuc
return s.StorageBackend.MakeBucket(param)
}

func (s *StorageBackendInitWrapper) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
s.Init("")
return s.StorageBackend.GetBucketUsage(param)
}

type StorageBackendInitError struct {
error
cap Capabilities
Expand Down Expand Up @@ -547,3 +561,7 @@ func (e StorageBackendInitError) RemoveBucket(param *RemoveBucketInput) (*Remove
func (e StorageBackendInitError) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error) {
return nil, e
}

func (e StorageBackendInitError) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return nil, e
}
8 changes: 6 additions & 2 deletions internal/backend_adlv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func NewADLv1(bucket string, flags *FlagStorage, config *ADLv1Config) (*ADLv1, e
bucket: bucket,
cap: Capabilities{
//NoParallelMultipart: true,
DirBlob: true,
Name: "adl",
DirBlob: true,
Name: "adl",
// ADLv1 fails with 404 if we upload data
// larger than 30000000 bytes (28.6MB) (28MB
// also failed in at one point, but as of
Expand Down Expand Up @@ -701,3 +701,7 @@ func (b *ADLv1) mkdir(dir string) error {
}
return nil
}

func (b *ADLv1) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
4 changes: 4 additions & 0 deletions internal/backend_adlv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func (b *ADLv2) Capabilities() *Capabilities {
return &b.cap
}

func (b *ADLv2) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}

type ADL2Error struct {
adl2.DataLakeStorageError
}
Expand Down
10 changes: 7 additions & 3 deletions internal/backend_azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (b *AZBlob) refreshToken() (*azblob.ContainerURL, error) {
// our token totally expired, renew inline before using it
b.mu.Unlock()
b.tokenRenewGate <- 1
defer func() { <- b.tokenRenewGate } ()
defer func() { <-b.tokenRenewGate }()

b.mu.Lock()
// check again, because in the mean time maybe it's renewed
Expand All @@ -248,7 +248,7 @@ func (b *AZBlob) refreshToken() (*azblob.ContainerURL, error) {
if err != nil {
azbLog.Errorf("Unable to refresh token: %v", err)
}
<- b.tokenRenewGate
<-b.tokenRenewGate
}()

// if we cannot renew token, treat it as a
Expand Down Expand Up @@ -665,7 +665,7 @@ func (b *AZBlob) DeleteBlobs(param *DeleteBlobsInput) (ret *DeleteBlobsOutput, d

go func(key string) {
defer func() {
<- SmallActionsGate
<-SmallActionsGate
wg.Done()
}()

Expand Down Expand Up @@ -947,3 +947,7 @@ func (b *AZBlob) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error) {
}
return &MakeBucketOutput{}, nil
}

func (b *AZBlob) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
16 changes: 10 additions & 6 deletions internal/backend_gcs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import (
"sync"
"syscall"

"github.com/jacobsa/fuse"
"cloud.google.com/go/storage"
"github.com/jacobsa/fuse"
"google.golang.org/api/iterator"
)

// GCS variant of S3
type GCS3 struct {
*S3Backend
gcs *storage.Client
gcs *storage.Client
jsonCredFile string
}

Expand Down Expand Up @@ -103,12 +103,12 @@ func (s *GCS3) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) {
})
} else {
items = append(items, BlobItemOutput{
Key: &attrs.Name,
ETag: &attrs.Etag,
Key: &attrs.Name,
ETag: &attrs.Etag,
LastModified: &attrs.Updated,
Size: uint64(attrs.Size),
Size: uint64(attrs.Size),
StorageClass: &attrs.StorageClass,
Metadata: PMetadata(attrs.Metadata),
Metadata: PMetadata(attrs.Metadata),
})
}
n++
Expand Down Expand Up @@ -156,3 +156,7 @@ func (s *GCS3) DeleteBlobs(param *DeleteBlobsInput) (*DeleteBlobsOutput, error)
func (s *GCS3) MultipartBlobCopy(param *MultipartBlobCopyInput) (*MultipartBlobCopyOutput, error) {
return nil, syscall.ENOSYS
}

func (s *GCS3) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
68 changes: 68 additions & 0 deletions internal/backend_minio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2019 Ka-Hing Cheung
// Copyright 2021 Yandex LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"
"encoding/json"
"io/ioutil"
"net/url"

"github.com/minio/madmin-go"
. "github.com/yandex-cloud/geesefs/api/common"
)

type BucketsUsage struct {
BucketsSizes map[string]uint64
}

type MinioBackend struct {
*S3Backend
}

func NewMinio(bucket string, flags *FlagStorage, config *S3Config) (*MinioBackend, error) {
s3Backend, err := NewS3(bucket, flags, config)
if err != nil {
return nil, err
}
s3Backend.Capabilities().Name = "minio"
s := &MinioBackend{S3Backend: s3Backend}
return s, nil
}

func (s *MinioBackend) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
value, err := s.Config.Credentials.Get()
if err != nil {
return nil, err
}

endpointURL, _ := url.Parse(s.Endpoint)
madminClient, err := madmin.New(endpointURL.Host, value.AccessKeyID, value.SecretAccessKey, endpointURL.Scheme == "https")
if err != nil {
return nil, err
}
resp, err := madminClient.ExecuteMethod(context.Background(), "GET", madmin.RequestData{RelPath: "/v3/datausageinfo"})
if err != nil {
return nil, err
}
response, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result BucketsUsage
json.Unmarshal(response, &result)
return &GetBucketUsageOutput{Size: result.BucketsSizes[s.Bucket()]}, nil
}
26 changes: 15 additions & 11 deletions internal/backend_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ type S3Backend struct {
gcs bool
v2Signer bool

iam bool
iamToken atomic.Value
iam bool
iamToken atomic.Value
iamTokenExpiration time.Time
iamRefreshTimer *time.Timer
iamRefreshTimer *time.Timer
}

func NewS3(bucket string, flags *FlagStorage, config *S3Config) (*S3Backend, error) {
if config.MultipartCopyThreshold == 0 {
config.MultipartCopyThreshold = 128*1024*1024
config.MultipartCopyThreshold = 128 * 1024 * 1024
}
awsConfig, err := config.ToAwsConfig(flags)
if err != nil {
Expand Down Expand Up @@ -100,8 +100,8 @@ func NewS3(bucket string, flags *FlagStorage, config *S3Config) (*S3Backend, err
}

type IAMCredResponse struct {
Code string
Token string
Code string
Token string
Expiration time.Time
}

Expand All @@ -128,11 +128,11 @@ func (s *S3Backend) TryIAM() error {
s.iam = true
s.iamToken.Store(creds.Token)
s.iamTokenExpiration = creds.Expiration
ttl := s.iamTokenExpiration.Sub(time.Now().Add(5*time.Minute))
ttl := s.iamTokenExpiration.Sub(time.Now().Add(5 * time.Minute))
if ttl < 0 {
ttl = s.iamTokenExpiration.Sub(time.Now())
if ttl >= 30*time.Second {
ttl = 30*time.Second
ttl = 30 * time.Second
}
}
s.iamRefreshTimer = time.AfterFunc(ttl, func() {
Expand Down Expand Up @@ -997,7 +997,7 @@ func (s *S3Backend) MultipartBlobAdd(param *MultipartBlobAddInput) (*MultipartBl

return &MultipartBlobAddOutput{
RequestId: s.getRequestId(req),
PartId: resp.ETag,
PartId: resp.ETag,
}, nil
}

Expand All @@ -1006,7 +1006,7 @@ func (s *S3Backend) MultipartBlobCopy(param *MultipartBlobCopyInput) (*Multipart
Bucket: &s.bucket,
Key: param.Commit.Key,
PartNumber: aws.Int64(int64(param.PartNumber)),
CopySource: aws.String(pathEscape(s.bucket+"/"+param.CopySource)),
CopySource: aws.String(pathEscape(s.bucket + "/" + param.CopySource)),
UploadId: param.Commit.UploadId,
}
if param.Size != 0 {
Expand All @@ -1028,7 +1028,7 @@ func (s *S3Backend) MultipartBlobCopy(param *MultipartBlobCopyInput) (*Multipart

return &MultipartBlobCopyOutput{
RequestId: s.getRequestId(req),
PartId: resp.CopyPartResult.ETag,
PartId: resp.CopyPartResult.ETag,
}, nil
}

Expand Down Expand Up @@ -1172,3 +1172,7 @@ func (s *S3Backend) MakeBucket(param *MakeBucketInput) (*MakeBucketOutput, error
func (s *S3Backend) Delegate() interface{} {
return s
}

func (s *S3Backend) GetBucketUsage(param *GetBucketUsageInput) (*GetBucketUsageOutput, error) {
return &GetBucketUsageOutput{Size: 0}, nil
}
Loading

0 comments on commit 160f329

Please sign in to comment.