-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DEVPROD-12040: Add conditional writes to s3.put #8464
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |
"time" | ||
|
||
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types" | ||
"github.com/aws/smithy-go" | ||
"github.com/evergreen-ci/evergreen" | ||
"github.com/evergreen-ci/evergreen/agent/internal" | ||
"github.com/evergreen-ci/evergreen/agent/internal/client" | ||
|
@@ -96,7 +97,9 @@ type s3put struct { | |
// PatchOnly defaults to false. If set to true, this command will noop without error for non-patch tasks. | ||
PatchOnly string `mapstructure:"patch_only" plugin:"patch_only"` | ||
|
||
// SkipExisting, when set to true, will not upload files if they already exist in s3. | ||
// SkipExisting, when set to 'true', will not upload files if they already | ||
// exist in s3. This will not cause the s3.put command to fail. This | ||
// behavior respects s3's strong read-after-write consistency model. | ||
SkipExisting string `mapstructure:"skip_existing" plugin:"expand"` | ||
|
||
// workDir sets the working directory relative to which s3put should look for files to upload. | ||
|
@@ -349,6 +352,7 @@ func (s3pc *s3put) putWithRetry(ctx context.Context, comm client.Communicator, l | |
err error | ||
uploadedFiles []string | ||
filesList []string | ||
skippedFiles []string | ||
) | ||
|
||
timer := time.NewTimer(0) | ||
|
@@ -390,6 +394,7 @@ retryLoop: | |
|
||
// reset to avoid duplicated uploaded references | ||
uploadedFiles = []string{} | ||
skippedFiles = make([]string, 0) | ||
|
||
uploadLoop: | ||
for _, fpath := range filesList { | ||
|
@@ -410,16 +415,6 @@ retryLoop: | |
|
||
fpath = filepath.Join(filepath.Join(s3pc.workDir, s3pc.LocalFilesIncludeFilterPrefix), fpath) | ||
|
||
if s3pc.skipExistingBool { | ||
exists, err := s3pc.remoteFileExists(ctx, remoteName) | ||
if err != nil { | ||
return errors.Wrapf(err, "checking if file '%s' exists", remoteName) | ||
} | ||
if exists { | ||
logger.Task().Infof("Not uploading file '%s' because remote file '%s' already exists. Continuing to upload other files.", fpath, remoteName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we keep this log in the new implementation? It's useful for users debugging their tasks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, agreed! |
||
continue uploadLoop | ||
} | ||
} | ||
err = s3pc.bucket.Upload(ctx, remoteName, fpath) | ||
if err != nil { | ||
// retry errors other than "file doesn't exist", which we handle differently based on what | ||
|
@@ -440,6 +435,22 @@ retryLoop: | |
} | ||
} | ||
|
||
if s3pc.skipExistingBool { | ||
if s3pc.preservePath { | ||
skippedFiles = append(skippedFiles, remoteName) | ||
} else { | ||
skippedFiles = append(skippedFiles, fpath) | ||
} | ||
|
||
var ae smithy.APIError | ||
|
||
if errors.As(err, &ae) { | ||
if ae.ErrorCode() == "PreconditionFailed" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a comment clarifying what "PreconditionFailed" is here/the significance in S3? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure! This is the error that setting IfNoneExists will trigger when set, and it means that the file already exists. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a comment to the code as well saying this so that future readers can understand what the significance of this error is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, absolutely |
||
continue uploadLoop | ||
} | ||
} | ||
} | ||
|
||
// in all other cases, log an error and retry after an interval. | ||
logger.Task().Error(errors.WithMessage(err, "putting S3 file")) | ||
timer.Reset(backoffCounter.Duration()) | ||
|
@@ -469,9 +480,11 @@ retryLoop: | |
|
||
logger.Task().WarningWhen(strings.Contains(s3pc.Bucket, "."), "Bucket names containing dots that are created after Sept. 30, 2020 are not guaranteed to have valid attached URLs.") | ||
|
||
if len(uploadedFiles) != len(filesList) && !s3pc.skipMissing { | ||
logger.Task().Infof("Attempted to upload %d files, %d successfully uploaded.", len(filesList), len(uploadedFiles)) | ||
return errors.Errorf("uploaded %d files of %d requested", len(uploadedFiles), len(filesList)) | ||
processedCount := len(skippedFiles) + len(uploadedFiles) | ||
|
||
if processedCount != len(filesList) && !s3pc.skipMissing { | ||
logger.Task().Infof("Attempted to upload %d files, %d successfully uploaded.", len(filesList), processedCount) | ||
return errors.Errorf("uploaded %d files of %d requested", processedCount, len(filesList)) | ||
} | ||
|
||
return nil | ||
|
@@ -534,13 +547,19 @@ func (s3pc *s3put) createPailBucket(ctx context.Context, httpClient *http.Client | |
if s3pc.bucket != nil { | ||
return nil | ||
} | ||
|
||
opts := pail.S3Options{ | ||
Credentials: pail.CreateAWSCredentials(s3pc.AwsKey, s3pc.AwsSecret, s3pc.AwsSessionToken), | ||
Region: s3pc.Region, | ||
Name: s3pc.Bucket, | ||
Permissions: pail.S3Permissions(s3pc.Permissions), | ||
ContentType: s3pc.ContentType, | ||
} | ||
|
||
if s3pc.skipExistingBool { | ||
opts.IfNotExists = true | ||
} | ||
|
||
bucket, err := pail.NewS3MultiPartBucketWithHTTPClient(ctx, httpClient, opts) | ||
s3pc.bucket = bucket | ||
return err | ||
|
@@ -557,16 +576,3 @@ func (s3pc *s3put) isPublic() bool { | |
return (s3pc.Visibility == "" || s3pc.Visibility == artifact.Public) && | ||
(s3pc.Permissions == string(s3Types.BucketCannedACLPublicRead) || s3pc.Permissions == string(s3Types.BucketCannedACLPublicReadWrite)) | ||
} | ||
|
||
func (s3pc *s3put) remoteFileExists(ctx context.Context, remoteName string) (bool, error) { | ||
opts := pail.S3Options{ | ||
Name: s3pc.Bucket, | ||
Credentials: pail.CreateAWSCredentials(s3pc.AwsKey, s3pc.AwsSecret, s3pc.AwsSessionToken), | ||
Region: s3pc.Region, | ||
} | ||
bucket, err := pail.NewS3Bucket(ctx, opts) | ||
if err != nil { | ||
return false, errors.Wrap(err, "creating S3 bucket") | ||
} | ||
return bucket.Exists(ctx, remoteName) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think skip_existing: error doesn't actually make a ton of logical sense -- we aren't skipping an existing file (i.e. succeeding), we're choosing to fail. Parameters are free so I think a new parameter like
error_on_existing: true
is more clear. This also maintains our behavior of parsing booleans.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking this over I agree with you here and also think this doesn't make sense as a feature at all. I reverted back to the old behavior where this is just toggled as a plain boolean and there is no option to error out.