From 31820493fb1739e20e5a18fecf85273433b913fd Mon Sep 17 00:00:00 2001 From: Florent Poinsard <35779988+frouioui@users.noreply.github.com> Date: Tue, 17 Sep 2024 17:28:14 -0600 Subject: [PATCH] Migrate the S3 SDK from v1 to v2 (#16664) Signed-off-by: Florent Poinsard --- go.mod | 21 ++- go.sum | 44 ++++- go/vt/mysqlctl/s3backupstorage/retryer.go | 84 +++++---- .../mysqlctl/s3backupstorage/retryer_test.go | 86 ++++------ go/vt/mysqlctl/s3backupstorage/s3.go | 160 ++++++++++-------- go/vt/mysqlctl/s3backupstorage/s3_test.go | 96 +++++++---- go/vt/mysqlctl/xtrabackupengine.go | 2 +- 7 files changed, 298 insertions(+), 195 deletions(-) diff --git a/go.mod b/go.mod index ec3eef4fe32..9b08e9a0054 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/HdrHistogram/hdrhistogram-go v0.9.0 // indirect github.com/aquarapid/vaultlib v0.5.1 github.com/armon/go-metrics v0.4.1 // indirect - github.com/aws/aws-sdk-go v1.55.5 github.com/buger/jsonparser v1.1.1 github.com/cespare/xxhash/v2 v2.3.0 github.com/corpix/uarand v0.1.1 // indirect @@ -32,7 +31,6 @@ require ( github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/serf v0.10.1 // indirect github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 - github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.9 github.com/klauspost/pgzip v1.2.6 github.com/krishicks/yaml-patch v0.0.10 @@ -92,6 +90,11 @@ require ( require ( github.com/DataDog/datadog-go/v5 v5.5.0 github.com/Shopify/toxiproxy/v2 v2.9.0 + github.com/aws/aws-sdk-go-v2 v1.30.4 + github.com/aws/aws-sdk-go-v2/config v1.27.31 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.15 + github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 + github.com/aws/smithy-go v1.20.4 github.com/bndr/gotabulate v1.1.2 github.com/gammazero/deque v0.2.1 github.com/google/safehtml v0.1.0 @@ -124,6 +127,20 @@ require ( github.com/DataDog/go-tuf v1.1.0-0.5.2 // indirect github.com/DataDog/sketches-go v1.4.6 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.30 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect diff --git a/go.sum b/go.sum index be825d9ab8a..8e49c959a93 100644 --- a/go.sum +++ b/go.sum @@ -73,8 +73,44 @@ github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= -github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= -github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= +github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw= +github.com/aws/aws-sdk-go-v2/config v1.27.31 h1:kxBoRsjhT3pq0cKthgj6RU6bXTm/2SgdoUMyrVw0rAI= +github.com/aws/aws-sdk-go-v2/config v1.27.31/go.mod h1:z04nZdSWFPaDwK3DdJOG2r+scLQzMYuJeW0CujEm9FM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.30 h1:aau/oYFtibVovr2rDt8FHlU17BTicFEMAi29V1U+L5Q= +github.com/aws/aws-sdk-go-v2/credentials v1.17.30/go.mod h1:BPJ/yXV92ZVq6G8uYvbU0gSl8q94UB63nMT5ctNO38g= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 h1:yjwoSyDZF8Jth+mUk5lSPJCkMC0lMy6FaCD51jm6ayE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12/go.mod h1:fuR57fAgMk7ot3WcNQfb6rSEn+SUffl7ri+aa8uKysI= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.15 h1:ijB7hr56MngOiELJe0C5aQRaBQ11LveNgWFyG02AUto= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.15/go.mod h1:0QEmQSSWMVfiAk93l1/ayR9DQ9+jwni7gHS2NARZXB0= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 h1:mimdLQkIX1zr8GIPY1ZtALdBQGxcASiBd2MOp8m/dMc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16/go.mod h1:YHk6owoSwrIsok+cAH9PENCOGoH5PU2EllX4vLtSrsY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 h1:GckUnpm4EJOAio1c8o25a+b3lVfwVzC9gnSBqiiNmZM= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18/go.mod h1:Br6+bxfG33Dk3ynmkhsW2Z/t9D4+lRqdLDNCKi85w0U= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 h1:tJ5RnkHCiSH0jyd6gROjlJtNwov0eGYNz8s8nFcR0jQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18/go.mod h1:++NHzT+nAF7ZPrHPsA+ENvsXkOO8wEu+C6RXltAG4/c= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 h1:jg16PhLPUiHIj8zYIW6bqzeQSuHVEiWnGA0Brz5Xv2I= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16/go.mod h1:Uyk1zE1VVdsHSU7096h/rwnXDzOzYQVl+FNPhPw7ShY= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 h1:mx2ucgtv+MWzJesJY9Ig/8AFHgoE5FwLXwUVgW/FGdI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1/go.mod h1:BSPI0EfnYUuNHPS0uqIo5VrRwzie+Fp+YhQOUs16sKI= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 h1:zCsFCKvbj25i7p1u94imVoO447I/sFv8qq+lGJhRN0c= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.5/go.mod h1:ZeDX1SnKsVlejeuz41GiajjZpRSWR7/42q/EyA/QEiM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 h1:SKvPgvdvmiTWoi0GAJ7AsJfOz3ngVkD/ERbs5pUnHNI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5/go.mod h1:20sz31hv/WsPa3HhU3hfrIet2kxM4Pe0r20eBZ20Tac= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 h1:OMsEmCyz2i89XwRwPouAJvhj81wINh+4UK+k/0Yo/q8= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.5/go.mod h1:vmSqFK+BVIwVpDAGZB3CoCXHzurt4qBE8lf+I/kRTh0= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -288,10 +324,6 @@ github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 h1:Mo9W14pwbO9VfRe+y github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428/go.mod h1:uhpZMVGznybq1itEKXj6RYw9I71qK4kH+OGMjRC4KEo= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= -github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= diff --git a/go/vt/mysqlctl/s3backupstorage/retryer.go b/go/vt/mysqlctl/s3backupstorage/retryer.go index 052b1ef26d1..9e4e87da702 100644 --- a/go/vt/mysqlctl/s3backupstorage/retryer.go +++ b/go/vt/mysqlctl/s3backupstorage/retryer.go @@ -1,51 +1,75 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package s3backupstorage import ( + "context" "strings" "time" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go-v2/aws" ) -// ClosedConnectionRetryer implements the aws request.Retryer interface +// ClosedConnectionRetryer implements the aws.Retryer interface // and is used to retry closed connection errors during MultipartUpload -// operations. +// operations. It is a simplified version of the RetryableConnectionError +// implementation, which always retry on any type of connection error. type ClosedConnectionRetryer struct { - awsRetryer request.Retryer + awsRetryer aws.Retryer } -// RetryRules is part of the Retryer interface. It defers to the underlying -// aws Retryer to compute backoff rules. -func (retryer *ClosedConnectionRetryer) RetryRules(r *request.Request) time.Duration { - return retryer.awsRetryer.RetryRules(r) -} - -// ShouldRetry is part of the Retryer interface. It retries on errors that occur -// due to a closed network connection, and then falls back to the underlying aws -// Retryer for checking additional retry conditions. -func (retryer *ClosedConnectionRetryer) ShouldRetry(r *request.Request) bool { - if retryer.MaxRetries() == 0 { +// IsErrorRetryable returns true if the error should be retried. We first try +// to see if the error is due to the use of a closed connection, if it is, +// we retry, and if not, we default to what the aws.Retryer would do. +func (retryer *ClosedConnectionRetryer) IsErrorRetryable(err error) bool { + if retryer.MaxAttempts() == 0 { return false } - if r.Retryable != nil { - return *r.Retryable - } - - if r.Error != nil { - if awsErr, ok := r.Error.(awserr.Error); ok { - if strings.Contains(awsErr.Error(), "use of closed network connection") { - return true - } + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + return true } } - return retryer.awsRetryer.ShouldRetry(r) + return retryer.awsRetryer.IsErrorRetryable(err) +} + +// MaxAttempts returns the maximum number of attempts that can be made for +// an attempt before failing. A value of 0 implies that the attempt should +// be retried until it succeeds if the errors are retryable. +func (retryer *ClosedConnectionRetryer) MaxAttempts() int { + return retryer.awsRetryer.MaxAttempts() +} + +// RetryDelay returns the delay that should be used before retrying the +// attempt. Will return error if the delay could not be determined. +func (retryer *ClosedConnectionRetryer) RetryDelay(attempt int, opErr error) (time.Duration, error) { + return retryer.awsRetryer.RetryDelay(attempt, opErr) +} + +// GetRetryToken attempts to deduct the retry cost from the retry token pool. +// Returning the token release function, or error. +func (retryer *ClosedConnectionRetryer) GetRetryToken(ctx context.Context, opErr error) (releaseToken func(error) error, err error) { + return retryer.awsRetryer.GetRetryToken(ctx, opErr) } -// MaxRetries is part of the Retryer interface. It defers to the -// underlying aws Retryer for the max number of retries. -func (retryer *ClosedConnectionRetryer) MaxRetries() int { - return retryer.awsRetryer.MaxRetries() +// GetInitialToken returns the initial attempt token that can increment the +// retry token pool if the attempt is successful. +func (retryer *ClosedConnectionRetryer) GetInitialToken() (releaseToken func(error) error) { + return retryer.awsRetryer.GetInitialToken() } diff --git a/go/vt/mysqlctl/s3backupstorage/retryer_test.go b/go/vt/mysqlctl/s3backupstorage/retryer_test.go index 9fe3004e6ff..666f565e7e4 100644 --- a/go/vt/mysqlctl/s3backupstorage/retryer_test.go +++ b/go/vt/mysqlctl/s3backupstorage/retryer_test.go @@ -1,79 +1,69 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package s3backupstorage import ( + "context" "errors" "testing" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/request" "github.com/stretchr/testify/assert" ) type testRetryer struct{ retry bool } -func (r *testRetryer) MaxRetries() int { return 5 } -func (r *testRetryer) RetryRules(req *request.Request) time.Duration { return time.Second } -func (r *testRetryer) ShouldRetry(req *request.Request) bool { return r.retry } +func (r *testRetryer) GetInitialToken() (releaseToken func(error) error) { panic("implement me") } +func (r *testRetryer) GetRetryToken(ctx context.Context, opErr error) (releaseToken func(error) error, err error) { + panic("implement me") +} +func (r *testRetryer) IsErrorRetryable(err error) bool { return r.retry } +func (r *testRetryer) MaxAttempts() int { return 5 } +func (r *testRetryer) RetryDelay(attempt int, opErr error) (time.Duration, error) { + return time.Second, nil +} func TestShouldRetry(t *testing.T) { tests := []struct { name string - r *request.Request + err error fallbackPolicy bool expected bool }{ - { - name: "non retryable request", - r: &request.Request{ - Retryable: aws.Bool(false), - }, + name: "no error", fallbackPolicy: false, expected: false, }, { - name: "retryable request", - r: &request.Request{ - Retryable: aws.Bool(true), - }, - fallbackPolicy: false, - expected: true, - }, - { - name: "non aws error", - r: &request.Request{ - Retryable: nil, - Error: errors.New("some error"), - }, + name: "non aws error", + err: errors.New("some error"), fallbackPolicy: false, expected: false, }, { - name: "closed connection error", - r: &request.Request{ - Retryable: nil, - Error: awserr.New("5xx", "use of closed network connection", nil), - }, - fallbackPolicy: false, - expected: true, - }, - { - name: "closed connection error (non nil origError)", - r: &request.Request{ - Retryable: nil, - Error: awserr.New("5xx", "use of closed network connection", errors.New("some error")), - }, + name: "closed connection error", + err: errors.New("use of closed network connection"), fallbackPolicy: false, expected: true, }, { - name: "other aws error hits fallback policy", - r: &request.Request{ - Retryable: nil, - Error: awserr.New("code", "not a closed network connectionn", errors.New("some error")), - }, + name: "other aws error hits fallback policy", + err: errors.New("not a closed network connection"), fallbackPolicy: true, expected: true, }, @@ -82,13 +72,7 @@ func TestShouldRetry(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { retryer := &ClosedConnectionRetryer{&testRetryer{test.fallbackPolicy}} - msg := "" - if test.r.Error != nil { - if awsErr, ok := test.r.Error.(awserr.Error); ok { - msg = awsErr.Error() - } - } - assert.Equal(t, test.expected, retryer.ShouldRetry(test.r), msg) + assert.Equal(t, test.expected, retryer.IsErrorRetryable(test.err)) }) } } diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index 9686b4a3978..1af1362ae30 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -39,13 +39,12 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/client" - "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" - "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go/middleware" "github.com/spf13/pflag" "vitess.io/vitess/go/vt/concurrency" @@ -105,15 +104,24 @@ func init() { servenv.OnParseFor("vttablet", registerFlags) } -type logNameToLogLevel map[string]aws.LogLevelType +type logNameToLogLevel map[string]aws.ClientLogMode var logNameMap logNameToLogLevel const sseCustomerPrefix = "sse_c:" +type iClient interface { + manager.UploadAPIClient + manager.DownloadAPIClient +} + +type clientWrapper struct { + *s3.Client +} + // S3BackupHandle implements the backupstorage.BackupHandle interface. type S3BackupHandle struct { - client s3iface.S3API + client iClient bs *S3BackupStorage dir string name string @@ -154,9 +162,9 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize } // Calculate s3 upload part size using the source filesize - partSizeBytes := s3manager.DefaultUploadPartSize + partSizeBytes := manager.DefaultUploadPartSize if filesize > 0 { - minimumPartSize := float64(filesize) / float64(s3manager.MaxUploadParts) + minimumPartSize := float64(filesize) / float64(manager.MaxUploadParts) // Round up to ensure large enough partsize calculatedPartSizeBytes := int64(math.Ceil(minimumPartSize)) if calculatedPartSizeBytes > partSizeBytes { @@ -169,25 +177,32 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize go func() { defer bh.waitGroup.Done() - uploader := s3manager.NewUploaderWithClient(bh.client, func(u *s3manager.Uploader) { + uploader := manager.NewUploader(bh.client, func(u *manager.Uploader) { u.PartSize = partSizeBytes }) object := objName(bh.dir, bh.name, filename) sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send")) // Using UploadWithContext breaks uploading to Minio and Ceph https://github.com/vitessio/vitess/issues/14188 - _, err := uploader.Upload(&s3manager.UploadInput{ + _, err := uploader.Upload(context.Background(), &s3.PutObjectInput{ Bucket: &bucket, - Key: object, + Key: &object, Body: reader, ServerSideEncryption: bh.bs.s3SSE.awsAlg, SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg, SSECustomerKey: bh.bs.s3SSE.customerKey, SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5, - }, s3manager.WithUploaderRequestOptions(func(r *request.Request) { - r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) { - sendStats.TimedIncrement(time.Since(r.AttemptTime)) + }, func(u *manager.Uploader) { + u.ClientOptions = append(u.ClientOptions, func(o *s3.Options) { + o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error { + return stack.Finalize.Add(middleware.FinalizeMiddlewareFunc("CompleteAttemptMiddleware", func(ctx context.Context, input middleware.FinalizeInput, next middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) { + start := time.Now() + output, metadata, err := next.HandleFinalize(ctx, input) + sendStats.TimedIncrement(time.Since(start)) + return output, metadata, err + }), middleware.Before) + }) }) - })) + }) if err != nil { reader.CloseWithError(err) bh.RecordError(err) @@ -221,15 +236,20 @@ func (bh *S3BackupHandle) ReadFile(ctx context.Context, filename string) (io.Rea } object := objName(bh.dir, bh.name, filename) sendStats := bh.bs.params.Stats.Scope(stats.Operation("AWS:Request:Send")) - out, err := bh.client.GetObjectWithContext(ctx, &s3.GetObjectInput{ + out, err := bh.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: &bucket, - Key: object, + Key: &object, SSECustomerAlgorithm: bh.bs.s3SSE.customerAlg, SSECustomerKey: bh.bs.s3SSE.customerKey, SSECustomerKeyMD5: bh.bs.s3SSE.customerMd5, - }, func(r *request.Request) { - r.Handlers.CompleteAttempt.PushBack(func(r *request.Request) { - sendStats.TimedIncrement(time.Since(r.AttemptTime)) + }, func(o *s3.Options) { + o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error { + return stack.Finalize.Add(middleware.FinalizeMiddlewareFunc("CompleteAttemptMiddleware", func(ctx context.Context, input middleware.FinalizeInput, next middleware.FinalizeHandler) (middleware.FinalizeOutput, middleware.Metadata, error) { + start := time.Now() + output, metadata, err := next.HandleFinalize(ctx, input) + sendStats.TimedIncrement(time.Since(start)) + return output, metadata, err + }), middleware.Before) }) }) if err != nil { @@ -241,7 +261,7 @@ func (bh *S3BackupHandle) ReadFile(ctx context.Context, filename string) (io.Rea var _ backupstorage.BackupHandle = (*S3BackupHandle)(nil) type S3ServerSideEncryption struct { - awsAlg *string + awsAlg types.ServerSideEncryption customerAlg *string customerKey *string customerMd5 *string @@ -268,13 +288,13 @@ func (s3ServerSideEncryption *S3ServerSideEncryption) init() error { s3ServerSideEncryption.customerKey = aws.String(string(decodedKey)) s3ServerSideEncryption.customerMd5 = aws.String(base64.StdEncoding.EncodeToString(md5Hash[:])) } else if sse != "" { - s3ServerSideEncryption.awsAlg = &sse + s3ServerSideEncryption.awsAlg = types.ServerSideEncryption(sse) } return nil } func (s3ServerSideEncryption *S3ServerSideEncryption) reset() { - s3ServerSideEncryption.awsAlg = nil + s3ServerSideEncryption.awsAlg = "" s3ServerSideEncryption.customerAlg = nil s3ServerSideEncryption.customerKey = nil s3ServerSideEncryption.customerMd5 = nil @@ -282,7 +302,7 @@ func (s3ServerSideEncryption *S3ServerSideEncryption) reset() { // S3BackupStorage implements the backupstorage.BackupStorage interface. type S3BackupStorage struct { - _client *s3.S3 + _client *s3.Client mu sync.Mutex s3SSE S3ServerSideEncryption params backupstorage.Params @@ -307,28 +327,28 @@ func (bs *S3BackupStorage) ListBackups(ctx context.Context, dir string) ([]backu return nil, err } - var searchPrefix *string + var searchPrefix string if dir == "/" { searchPrefix = objName("") } else { searchPrefix = objName(dir, "") } - log.Infof("objName: %v", *searchPrefix) + log.Infof("objName: %s", searchPrefix) query := &s3.ListObjectsV2Input{ Bucket: &bucket, Delimiter: &delimiter, - Prefix: searchPrefix, + Prefix: &searchPrefix, } var subdirs []string for { - objs, err := c.ListObjectsV2(query) + objs, err := c.ListObjectsV2(ctx, query) if err != nil { return nil, err } for _, prefix := range objs.CommonPrefixes { - subdir := strings.TrimPrefix(*prefix.Prefix, *searchPrefix) + subdir := strings.TrimPrefix(*prefix.Prefix, searchPrefix) subdir = strings.TrimSuffix(subdir, delimiter) subdirs = append(subdirs, subdir) } @@ -345,7 +365,7 @@ func (bs *S3BackupStorage) ListBackups(ctx context.Context, dir string) ([]backu result := make([]backupstorage.BackupHandle, 0, len(subdirs)) for _, subdir := range subdirs { result = append(result, &S3BackupHandle{ - client: c, + client: &clientWrapper{Client: c}, bs: bs, dir: dir, name: subdir, @@ -364,7 +384,7 @@ func (bs *S3BackupStorage) StartBackup(ctx context.Context, dir, name string) (b } return &S3BackupHandle{ - client: c, + client: &clientWrapper{Client: c}, bs: bs, dir: dir, name: name, @@ -381,28 +401,29 @@ func (bs *S3BackupStorage) RemoveBackup(ctx context.Context, dir, name string) e return err } + path := objName(dir, name) query := &s3.ListObjectsV2Input{ Bucket: &bucket, - Prefix: objName(dir, name), + Prefix: &path, } for { - objs, err := c.ListObjectsV2(query) + objs, err := c.ListObjectsV2(ctx, query) if err != nil { return err } - objIds := make([]*s3.ObjectIdentifier, 0, len(objs.Contents)) + objIds := make([]types.ObjectIdentifier, 0, len(objs.Contents)) for _, obj := range objs.Contents { - objIds = append(objIds, &s3.ObjectIdentifier{ + objIds = append(objIds, types.ObjectIdentifier{ Key: obj.Key, }) } quiet := true // return less in the Delete response - out, err := c.DeleteObjects(&s3.DeleteObjectsInput{ + out, err := c.DeleteObjects(ctx, &s3.DeleteObjectsInput{ Bucket: &bucket, - Delete: &s3.Delete{ + Delete: &types.Delete{ Objects: objIds, Quiet: &quiet, }, @@ -413,7 +434,7 @@ func (bs *S3BackupStorage) RemoveBackup(ctx context.Context, dir, name string) e } for _, objError := range out.Errors { - return errors.New(objError.String()) + return errors.New(*objError.Message) } if objs.NextContinuationToken == nil { @@ -442,16 +463,15 @@ func (bs *S3BackupStorage) WithParams(params backupstorage.Params) backupstorage var _ backupstorage.BackupStorage = (*S3BackupStorage)(nil) // getLogLevel converts the string loglevel to an aws.LogLevelType -func getLogLevel() *aws.LogLevelType { - l := new(aws.LogLevelType) - *l = aws.LogOff // default setting +func getLogLevel() aws.ClientLogMode { + var l aws.ClientLogMode if level, found := logNameMap[requiredLogLevel]; found { - *l = level // adjust as required + l = level // adjust as required } return l } -func (bs *S3BackupStorage) client() (*s3.S3, error) { +func (bs *S3BackupStorage) client() (*s3.Client, error) { bs.mu.Lock() defer bs.mu.Unlock() if bs._client == nil { @@ -459,34 +479,28 @@ func (bs *S3BackupStorage) client() (*s3.S3, error) { httpClient := &http.Client{Transport: bs.transport} - session, err := session.NewSession() + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(region), + config.WithClientLogMode(logLevel), + config.WithHTTPClient(httpClient), + ) if err != nil { return nil, err } - awsConfig := aws.Config{ - HTTPClient: httpClient, - LogLevel: logLevel, - Endpoint: aws.String(endpoint), - Region: aws.String(region), - S3ForcePathStyle: aws.Bool(forcePath), - } - - if retryCount >= 0 { - awsConfig = *request.WithRetryer(&awsConfig, &ClosedConnectionRetryer{ - awsRetryer: &client.DefaultRetryer{ - NumMaxRetries: retryCount, - }, - }) - } - - bs._client = s3.New(session, &awsConfig) + bs._client = s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = forcePath + if retryCount >= 0 { + o.RetryMaxAttempts = retryCount + o.Retryer = &ClosedConnectionRetryer{} + } + }) if len(bucket) == 0 { return nil, fmt.Errorf("--s3_backup_storage_bucket required") } - if _, err := bs._client.HeadBucket(&s3.HeadBucketInput{Bucket: &bucket}); err != nil { + if _, err := bs._client.HeadBucket(context.Background(), &s3.HeadBucketInput{Bucket: &bucket}); err != nil { return nil, err } @@ -497,24 +511,24 @@ func (bs *S3BackupStorage) client() (*s3.S3, error) { return bs._client, nil } -func objName(parts ...string) *string { +func objName(parts ...string) string { res := "" if root != "" { res += root + delimiter } res += strings.Join(parts, delimiter) - return &res + return res } func init() { backupstorage.BackupStorageMap["s3"] = newS3BackupStorage() logNameMap = logNameToLogLevel{ - "LogOff": aws.LogOff, - "LogDebug": aws.LogDebug, - "LogDebugWithSigning": aws.LogDebugWithSigning, - "LogDebugWithHTTPBody": aws.LogDebugWithHTTPBody, - "LogDebugWithRequestRetries": aws.LogDebugWithRequestRetries, - "LogDebugWithRequestErrors": aws.LogDebugWithRequestErrors, + "LogOff": 0, + "LogDebug": aws.LogRequest, + "LogDebugWithSigning": aws.LogSigning, + "LogDebugWithHTTPBody": aws.LogRequestWithBody, + "LogDebugWithRequestRetries": aws.LogRetries, + "LogDebugWithRequestErrors": aws.LogRequest | aws.LogRetries, } } diff --git a/go/vt/mysqlctl/s3backupstorage/s3_test.go b/go/vt/mysqlctl/s3backupstorage/s3_test.go index 6f4207a645f..1acfddcce1e 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3_test.go +++ b/go/vt/mysqlctl/s3backupstorage/s3_test.go @@ -1,22 +1,22 @@ package s3backupstorage import ( + "context" "crypto/md5" "crypto/rand" "encoding/base64" "errors" "fmt" "net/http" - "net/url" "os" "testing" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/client" - "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go/middleware" + smithyhttp "github.com/aws/smithy-go/transport/http" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,29 +26,46 @@ import ( ) type s3FakeClient struct { - s3iface.S3API + *s3.Client err error delay time.Duration } -func (sfc *s3FakeClient) PutObjectRequest(in *s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) { - u, _ := url.Parse("http://localhost:1234") - req := request.Request{ - HTTPRequest: &http.Request{ // without this we segfault \_(ツ)_/¯ (see https://github.com/aws/aws-sdk-go/blob/v1.28.8/aws/request/request_context.go#L13) - Header: make(http.Header), - URL: u, - }, - Retryer: client.DefaultRetryer{}, +type fakeClientDo struct { + delay time.Duration +} + +func (fcd fakeClientDo) Do(request *http.Request) (*http.Response, error) { + if fcd.delay > 0 { + time.Sleep(fcd.delay) + } + return nil, nil +} + +func (sfc *s3FakeClient) PutObject(ctx context.Context, in *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + var o s3.Options + for _, fn := range optFns { + fn(&o) } - req.Handlers.Send.PushBack(func(r *request.Request) { - r.Error = sfc.err - if sfc.delay > 0 { - time.Sleep(sfc.delay) - } - }) + stack := middleware.NewStack("PutObject", smithyhttp.NewStackRequest) + for _, apiOption := range o.APIOptions { + _ = apiOption(stack) + } - return &req, &s3.PutObjectOutput{} + handler := middleware.DecorateHandler(smithyhttp.NewClientHandler(&fakeClientDo{delay: sfc.delay}), stack) + _, _, err := handler.Handle(ctx, in) + if err != nil { + return nil, err + } + + if sfc.err != nil { + return nil, sfc.err + } + + return &s3.PutObjectOutput{ + ETag: aws.String("fake-etag"), + }, nil } func TestAddFileError(t *testing.T) { @@ -56,11 +73,16 @@ func TestAddFileError(t *testing.T) { client: &s3FakeClient{err: errors.New("some error")}, bs: &S3BackupStorage{ params: backupstorage.NoParams(), + s3SSE: S3ServerSideEncryption{ + customerAlg: new(string), + customerKey: new(string), + customerMd5: new(string), + }, }, readOnly: false, } - wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000) + wc, err := bh.AddFile(context.Background(), "somefile", 100000) require.NoErrorf(t, err, "AddFile() expected no error, got %s", err) assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser") @@ -87,12 +109,17 @@ func TestAddFileStats(t *testing.T) { Logger: logutil.NewMemoryLogger(), Stats: fakeStats, }, + s3SSE: S3ServerSideEncryption{ + customerAlg: new(string), + customerKey: new(string), + customerMd5: new(string), + }, }, readOnly: false, } for i := 0; i < 4; i++ { - wc, err := bh.AddFile(aws.BackgroundContext(), fmt.Sprintf("somefile-%d", i), 100000) + wc, err := bh.AddFile(context.Background(), fmt.Sprintf("somefile-%d", i), 100000) require.NoErrorf(t, err, "AddFile() expected no error, got %s", err) assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser") @@ -131,11 +158,16 @@ func TestAddFileErrorStats(t *testing.T) { Logger: logutil.NewMemoryLogger(), Stats: fakeStats, }, + s3SSE: S3ServerSideEncryption{ + customerAlg: new(string), + customerKey: new(string), + customerMd5: new(string), + }, }, readOnly: false, } - wc, err := bh.AddFile(aws.BackgroundContext(), "somefile", 100000) + wc, err := bh.AddFile(context.Background(), "somefile", 100000) require.NoErrorf(t, err, "AddFile() expected no error, got %s", err) assert.NotNil(t, wc, "AddFile() expected non-nil WriteCloser") @@ -163,7 +195,7 @@ func TestNoSSE(t *testing.T) { err := sseData.init() require.NoErrorf(t, err, "init() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") @@ -178,7 +210,7 @@ func TestSSEAws(t *testing.T) { err := sseData.init() require.NoErrorf(t, err, "init() expected to succeed") - assert.Equal(t, aws.String("aws:kms"), sseData.awsAlg, "awsAlg expected to be aws:kms") + assert.Equal(t, types.ServerSideEncryption("aws:kms"), sseData.awsAlg, "awsAlg expected to be aws:kms") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") @@ -186,7 +218,7 @@ func TestSSEAws(t *testing.T) { sseData.reset() require.NoErrorf(t, err, "reset() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") @@ -227,7 +259,7 @@ func TestSSECustomerFileBinaryKey(t *testing.T) { err = sseData.init() require.NoErrorf(t, err, "init() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Equal(t, aws.String("AES256"), sseData.customerAlg, "customerAlg expected to be AES256") assert.Equal(t, aws.String(string(randomKey)), sseData.customerKey, "customerKey expected to be equal to the generated randomKey") md5Hash := md5.Sum(randomKey) @@ -236,7 +268,7 @@ func TestSSECustomerFileBinaryKey(t *testing.T) { sseData.reset() require.NoErrorf(t, err, "reset() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") @@ -262,7 +294,7 @@ func TestSSECustomerFileBase64Key(t *testing.T) { err = sseData.init() require.NoErrorf(t, err, "init() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Equal(t, aws.String("AES256"), sseData.customerAlg, "customerAlg expected to be AES256") assert.Equal(t, aws.String(string(randomKey)), sseData.customerKey, "customerKey expected to be equal to the generated randomKey") md5Hash := md5.Sum(randomKey) @@ -271,7 +303,7 @@ func TestSSECustomerFileBase64Key(t *testing.T) { sseData.reset() require.NoErrorf(t, err, "reset() expected to succeed") - assert.Nil(t, sseData.awsAlg, "awsAlg expected to be nil") + assert.Empty(t, sseData.awsAlg, "awsAlg expected to be empty") assert.Nil(t, sseData.customerAlg, "customerAlg expected to be nil") assert.Nil(t, sseData.customerKey, "customerKey expected to be nil") assert.Nil(t, sseData.customerMd5, "customerMd5 expected to be nil") diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index e6d02eedc1d..784d718af26 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -277,7 +277,7 @@ func (be *XtrabackupEngine) executeFullBackup(ctx context.Context, params Backup if err != nil { return BackupUnusable, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName) } - if _, err := mwc.Write([]byte(data)); err != nil { + if _, err := mwc.Write(data); err != nil { return BackupUnusable, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName) }