Skip to content

Commit

Permalink
add S3 minimum part size defined by the user (#17171)
Browse files Browse the repository at this point in the history
Signed-off-by: Renan Rangel <[email protected]>
  • Loading branch information
rvrangel authored Dec 26, 2024
1 parent 837ddd4 commit 30e1e40
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 9 deletions.
20 changes: 20 additions & 0 deletions examples/operator/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,9 @@ spec:
maxLength: 256
pattern: ^[^\r\n]*$
type: string
minPartSize:
format: int64
type: integer
region:
minLength: 1
type: string
Expand Down Expand Up @@ -1995,6 +1998,9 @@ spec:
maxLength: 256
pattern: ^[^\r\n]*$
type: string
minPartSize:
format: int64
type: integer
region:
minLength: 1
type: string
Expand Down Expand Up @@ -3510,6 +3516,14 @@ spec:
mysql80Compatible:
type: string
type: object
mysqldExporter:
type: string
vtbackup:
type: string
vtorc:
type: string
vttablet:
type: string
type: object
name:
maxLength: 63
Expand Down Expand Up @@ -5241,6 +5255,9 @@ spec:
maxLength: 256
pattern: ^[^\r\n]*$
type: string
minPartSize:
format: int64
type: integer
region:
minLength: 1
type: string
Expand Down Expand Up @@ -6688,6 +6705,9 @@ spec:
maxLength: 256
pattern: ^[^\r\n]*$
type: string
minPartSize:
format: int64
type: integer
region:
minLength: 1
type: string
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.66.3
github.com/aws/smithy-go v1.22.0
github.com/bndr/gotabulate v1.1.2
github.com/dustin/go-humanize v1.0.1
github.com/gammazero/deque v0.2.1
github.com/google/safehtml v0.1.0
github.com/hashicorp/go-version v1.7.0
Expand Down Expand Up @@ -153,7 +154,6 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 // indirect
github.com/ebitengine/purego v0.8.1 // indirect
github.com/envoyproxy/go-control-plane v0.13.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ Flags:
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--restart_before_backup Perform a mysqld clean/full restart after applying binlogs, but before taking the backup. Only makes sense to work around xtrabackup bugs.
--s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided).
--s3_backup_aws_min_partsize int Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size. (default 5242880)
--s3_backup_aws_region string AWS region to use. (default "us-east-1")
--s3_backup_aws_retries int AWS request retries. (default -1)
--s3_backup_force_path_style force the s3 path style.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Flags:
--purge_logs_interval duration how often try to remove old logs (default 1h0m0s)
--remote_operation_timeout duration time to wait for a remote operation (default 15s)
--s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided).
--s3_backup_aws_min_partsize int Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size. (default 5242880)
--s3_backup_aws_region string AWS region to use. (default "us-east-1")
--s3_backup_aws_retries int AWS request retries. (default -1)
--s3_backup_force_path_style force the s3 path style.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ Flags:
--restore_from_backup_ts string (init restore parameter) if set, restore the latest backup taken at or before this timestamp. Example: '2021-04-29.133050'
--retain_online_ddl_tables duration How long should vttablet keep an old migrated table before purging it (default 24h0m0s)
--s3_backup_aws_endpoint string endpoint of the S3 backend (region must be provided).
--s3_backup_aws_min_partsize int Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size. (default 5242880)
--s3_backup_aws_region string AWS region to use. (default "us-east-1")
--s3_backup_aws_retries int AWS request retries. (default -1)
--s3_backup_force_path_style force the s3 path style.
Expand Down
39 changes: 33 additions & 6 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3/types"
transport "github.com/aws/smithy-go/endpoints"
"github.com/aws/smithy-go/middleware"
"github.com/dustin/go-humanize"
"github.com/spf13/pflag"

errorsbackup "vitess.io/vitess/go/vt/mysqlctl/errors"
Expand All @@ -57,6 +58,11 @@ import (
"vitess.io/vitess/go/vt/servenv"
)

const (
sseCustomerPrefix = "sse_c:"
MaxPartSize = 1024 * 1024 * 1024 * 5 // 5GiB - limited by AWS https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
)

var (
// AWS API region
region string
Expand Down Expand Up @@ -86,6 +92,11 @@ var (

// path component delimiter
delimiter = "/"

// minimum part size
minPartSize int64

ErrPartSize = errors.New("minimum S3 part size must be between 5MiB and 5GiB")
)

func registerFlags(fs *pflag.FlagSet) {
Expand All @@ -98,6 +109,7 @@ func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&tlsSkipVerifyCert, "s3_backup_tls_skip_verify_cert", false, "skip the 'certificate is valid' check for SSL connections.")
fs.StringVar(&requiredLogLevel, "s3_backup_log_level", "LogOff", "determine the S3 loglevel to use from LogOff, LogDebug, LogDebugWithSigning, LogDebugWithHTTPBody, LogDebugWithRequestRetries, LogDebugWithRequestErrors.")
fs.StringVar(&sse, "s3_backup_server_side_encryption", "", "server-side encryption algorithm (e.g., AES256, aws:kms, sse_c:/path/to/key/file).")
fs.Int64Var(&minPartSize, "s3_backup_aws_min_partsize", manager.MinUploadPartSize, "Minimum part size to use, defaults to 5MiB but can be increased due to the dataset size.")
}

func init() {
Expand All @@ -111,8 +123,6 @@ type logNameToLogLevel map[string]aws.ClientLogMode

var logNameMap logNameToLogLevel

const sseCustomerPrefix = "sse_c:"

type endpointResolver struct {
r s3.EndpointResolverV2
endpoint *string
Expand Down Expand Up @@ -166,7 +176,12 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}

partSizeBytes := calculateUploadPartSize(filesize)
partSizeBytes, err := calculateUploadPartSize(filesize)
if err != nil {
return nil, err
}

bh.bs.params.Logger.Infof("Using S3 upload part size: %s", humanize.IBytes(uint64(partSizeBytes)))

reader, writer := io.Pipe()
bh.handleAddFile(ctx, filename, partSizeBytes, reader, func(err error) {
Expand Down Expand Up @@ -213,9 +228,11 @@ func (bh *S3BackupHandle) handleAddFile(ctx context.Context, filename string, pa
}()
}

func calculateUploadPartSize(filesize int64) int64 {
// calculateUploadPartSize is a helper to calculate the part size, taking into consideration the minimum part size
// passed in by an operator.
func calculateUploadPartSize(filesize int64) (partSizeBytes int64, err error) {
// Calculate s3 upload part size using the source filesize
partSizeBytes := manager.DefaultUploadPartSize
partSizeBytes = manager.DefaultUploadPartSize
if filesize > 0 {
minimumPartSize := float64(filesize) / float64(manager.MaxUploadParts)
// Round up to ensure large enough partsize
Expand All @@ -224,7 +241,17 @@ func calculateUploadPartSize(filesize int64) int64 {
partSizeBytes = calculatedPartSizeBytes
}
}
return partSizeBytes

if minPartSize != 0 && partSizeBytes < minPartSize {
if minPartSize > MaxPartSize || minPartSize < manager.MinUploadPartSize { // 5GiB and 5MiB respectively
return 0, fmt.Errorf("%w, currently set to %s",
ErrPartSize, humanize.IBytes(uint64(minPartSize)),
)
}
partSizeBytes = int64(minPartSize)
}

return
}

// EndBackup is part of the backupstorage.BackupHandle interface.
Expand Down
12 changes: 10 additions & 2 deletions go/vt/mysqlctl/s3backupstorage/s3_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ func FailFirstWrite(s3bh *S3BackupHandle, ctx context.Context, filename string,
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}

partSizeBytes := calculateUploadPartSize(filesize)
partSizeBytes, err := calculateUploadPartSize(filesize)
if err != nil {
return nil, err
}

reader, writer := io.Pipe()
r := io.Reader(reader)

Expand All @@ -181,7 +185,11 @@ func FailAllWrites(s3bh *S3BackupHandle, ctx context.Context, filename string, f
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}

partSizeBytes := calculateUploadPartSize(filesize)
partSizeBytes, err := calculateUploadPartSize(filesize)
if err != nil {
return nil, err
}

reader, writer := io.Pipe()
r := &failReadPipeReader{PipeReader: reader}

Expand Down
65 changes: 65 additions & 0 deletions go/vt/mysqlctl/s3backupstorage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,68 @@ func TestWithParams(t *testing.T) {
assert.NotNil(t, s3.transport.DialContext)
assert.NotNil(t, s3.transport.Proxy)
}

func TestCalculateUploadPartSize(t *testing.T) {
originalMinimum := minPartSize
defer func() { minPartSize = originalMinimum }()

tests := []struct {
name string
filesize int64
minimumPartSize int64
want int64
err error
}{
{
name: "minimum - 10 MiB",
filesize: 1024 * 1024 * 10, // 10 MiB
minimumPartSize: 1024 * 1024 * 5, // 5 MiB
want: 1024 * 1024 * 5, // 5 MiB,
err: nil,
},
{
name: "below minimum - 10 MiB",
filesize: 1024 * 1024 * 10, // 10 MiB
minimumPartSize: 1024 * 1024 * 8, // 8 MiB
want: 1024 * 1024 * 8, // 8 MiB,
err: nil,
},
{
name: "above minimum - 1 TiB",
filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB
minimumPartSize: 1024 * 1024 * 5, // 5 MiB
want: 109951163, // ~104 MiB
err: nil,
},
{
name: "below minimum - 1 TiB",
filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB
minimumPartSize: 1024 * 1024 * 200, // 200 MiB
want: 1024 * 1024 * 200, // 200 MiB
err: nil,
},
{
name: "below S3 limits - 5 MiB",
filesize: 1024 * 1024 * 3, // 3 MiB
minimumPartSize: 1024 * 1024 * 4, // 4 MiB
want: 1024 * 1024 * 5, // 5 MiB - should always return the minimum
err: nil,
},
{
name: "above S3 limits - 5 GiB",
filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB
minimumPartSize: 1024 * 1024 * 1024 * 6, // 6 GiB
want: 0,
err: ErrPartSize,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
minPartSize = tt.minimumPartSize
partSize, err := calculateUploadPartSize(tt.filesize)
require.ErrorIs(t, err, tt.err)
require.Equal(t, tt.want, partSize)
})
}
}

0 comments on commit 30e1e40

Please sign in to comment.