Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEVPROD-12040: Add conditional writes to s3.put #8464

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions agent/command/s3_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"github.com/evergreen-ci/evergreen"
"github.com/evergreen-ci/evergreen/agent/internal"
"github.com/evergreen-ci/evergreen/agent/internal/client"
Expand Down Expand Up @@ -96,7 +97,9 @@ type s3put struct {
// PatchOnly defaults to false. If set to true, this command will noop without error for non-patch tasks.
PatchOnly string `mapstructure:"patch_only" plugin:"patch_only"`

// SkipExisting, when set to true, will not upload files if they already exist in s3.
// SkipExisting, when set to 'true', will not upload files if they already
// exist in s3. This will not cause the s3.put command to fail. This
// behavior respects s3's strong read-after-write consistency model.
SkipExisting string `mapstructure:"skip_existing" plugin:"expand"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think skip_existing: error doesn't actually make a ton of logical sense -- we aren't skipping an existing file (i.e. succeeding), we're choosing to fail. Parameters are free so I think a new parameter like error_on_existing: true is more clear. This also maintains our behavior of parsing booleans.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking this over I agree with you here and also think this doesn't make sense as a feature at all. I reverted back to the old behavior where this is just toggled as a plain boolean and there is no option to error out.


// workDir sets the working directory relative to which s3put should look for files to upload.
Expand Down Expand Up @@ -349,6 +352,7 @@ func (s3pc *s3put) putWithRetry(ctx context.Context, comm client.Communicator, l
err error
uploadedFiles []string
filesList []string
skippedFiles []string
)

timer := time.NewTimer(0)
Expand Down Expand Up @@ -390,6 +394,7 @@ retryLoop:

// reset to avoid duplicated uploaded references
uploadedFiles = []string{}
skippedFiles = make([]string, 0)

uploadLoop:
for _, fpath := range filesList {
Expand All @@ -410,16 +415,6 @@ retryLoop:

fpath = filepath.Join(filepath.Join(s3pc.workDir, s3pc.LocalFilesIncludeFilterPrefix), fpath)

if s3pc.skipExistingBool {
exists, err := s3pc.remoteFileExists(ctx, remoteName)
if err != nil {
return errors.Wrapf(err, "checking if file '%s' exists", remoteName)
}
if exists {
logger.Task().Infof("Not uploading file '%s' because remote file '%s' already exists. Continuing to upload other files.", fpath, remoteName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep this log in the new implementation? It's useful for users debugging their tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agreed!

continue uploadLoop
}
}
err = s3pc.bucket.Upload(ctx, remoteName, fpath)
if err != nil {
// retry errors other than "file doesn't exist", which we handle differently based on what
Expand All @@ -440,6 +435,22 @@ retryLoop:
}
}

if s3pc.skipExistingBool {
if s3pc.preservePath {
skippedFiles = append(skippedFiles, remoteName)
} else {
skippedFiles = append(skippedFiles, fpath)
}

var ae smithy.APIError

if errors.As(err, &ae) {
if ae.ErrorCode() == "PreconditionFailed" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment clarifying what "PreconditionFailed" is here/the significance in S3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! This is the error that setting IfNoneExists will trigger when set, and it means that the file already exists.

Copy link
Contributor

@Kimchelly Kimchelly Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment to the code as well saying this so that future readers can understand what the significance of this error is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, absolutely

continue uploadLoop
}
}
}

// in all other cases, log an error and retry after an interval.
logger.Task().Error(errors.WithMessage(err, "putting S3 file"))
timer.Reset(backoffCounter.Duration())
Expand Down Expand Up @@ -469,9 +480,11 @@ retryLoop:

logger.Task().WarningWhen(strings.Contains(s3pc.Bucket, "."), "Bucket names containing dots that are created after Sept. 30, 2020 are not guaranteed to have valid attached URLs.")

if len(uploadedFiles) != len(filesList) && !s3pc.skipMissing {
logger.Task().Infof("Attempted to upload %d files, %d successfully uploaded.", len(filesList), len(uploadedFiles))
return errors.Errorf("uploaded %d files of %d requested", len(uploadedFiles), len(filesList))
processedCount := len(skippedFiles) + len(uploadedFiles)

if processedCount != len(filesList) && !s3pc.skipMissing {
logger.Task().Infof("Attempted to upload %d files, %d successfully uploaded.", len(filesList), processedCount)
return errors.Errorf("uploaded %d files of %d requested", processedCount, len(filesList))
}

return nil
Expand Down Expand Up @@ -534,13 +547,19 @@ func (s3pc *s3put) createPailBucket(ctx context.Context, httpClient *http.Client
if s3pc.bucket != nil {
return nil
}

opts := pail.S3Options{
Credentials: pail.CreateAWSCredentials(s3pc.AwsKey, s3pc.AwsSecret, s3pc.AwsSessionToken),
Region: s3pc.Region,
Name: s3pc.Bucket,
Permissions: pail.S3Permissions(s3pc.Permissions),
ContentType: s3pc.ContentType,
}

if s3pc.skipExistingBool {
opts.IfNotExists = true
}

bucket, err := pail.NewS3MultiPartBucketWithHTTPClient(ctx, httpClient, opts)
s3pc.bucket = bucket
return err
Expand All @@ -557,16 +576,3 @@ func (s3pc *s3put) isPublic() bool {
return (s3pc.Visibility == "" || s3pc.Visibility == artifact.Public) &&
(s3pc.Permissions == string(s3Types.BucketCannedACLPublicRead) || s3pc.Permissions == string(s3Types.BucketCannedACLPublicReadWrite))
}

func (s3pc *s3put) remoteFileExists(ctx context.Context, remoteName string) (bool, error) {
opts := pail.S3Options{
Name: s3pc.Bucket,
Credentials: pail.CreateAWSCredentials(s3pc.AwsKey, s3pc.AwsSecret, s3pc.AwsSessionToken),
Region: s3pc.Region,
}
bucket, err := pail.NewS3Bucket(ctx, opts)
if err != nil {
return false, errors.Wrap(err, "creating S3 bucket")
}
return bucket.Exists(ctx, remoteName)
}
86 changes: 86 additions & 0 deletions agent/command/s3_put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@ package command

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/aws/aws-sdk-go-v2/credentials"
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/evergreen-ci/evergreen/agent/internal"
"github.com/evergreen-ci/evergreen/agent/internal/client"
"github.com/evergreen-ci/evergreen/model"
"github.com/evergreen-ci/evergreen/model/artifact"
"github.com/evergreen-ci/evergreen/model/task"
"github.com/evergreen-ci/evergreen/testutil"
"github.com/evergreen-ci/evergreen/util"
"github.com/evergreen-ci/pail"
"github.com/evergreen-ci/utility"
"github.com/mongodb/grip/send"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -639,3 +646,82 @@ func TestPreservePath(t *testing.T) {
}

}

func TestS3PutConditionalWrite(t *testing.T) {
if skip, _ := strconv.ParseBool(os.Getenv("SKIP_INTEGRATION_TESTS")); skip {
t.Skip("SKIP_INTEGRATION_TESTS is set, skipping integration test")
}

temproot := t.TempDir()

settings := testutil.GetIntegrationFile(t)

firstFilePath := filepath.Join(temproot, "first-file.txt")
secondFilePath := filepath.Join(temproot, "second-file.txt")

payload := []byte("hello world")
require.NoError(t, os.WriteFile(firstFilePath, payload, 0755))
require.NoError(t, os.WriteFile(secondFilePath, []byte("second file"), 0755))

accessKeyID := settings.Expansions["aws_key"]
secretAccessKey := settings.Expansions["aws_secret"]
bucketName := settings.Expansions["bucket"]
region := "us-east-1"

id := utility.RandomString()

remoteFile := fmt.Sprintf("tests/%s/%s", t.Name(), id)

cmd := s3PutFactory()
params := map[string]any{
"aws_key": accessKeyID,
"aws_secret": secretAccessKey,
"local_file": firstFilePath,
"remote_file": remoteFile,
"bucket": bucketName,
"region": region,
"skip_existing": "true",
"content_type": "text/plain",
"permissions": "private",
}

require.NoError(t, cmd.ParseParams(params))

tconf := &internal.TaskConfig{
Task: task.Task{},
WorkDir: temproot,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sender := send.MakeInternalLogger()
logger := client.NewSingleChannelLogHarness("test", sender)
comm := client.NewMock("")

require.NoError(t, cmd.Execute(ctx, comm, logger, tconf))

params["local_file"] = secondFilePath
require.NoError(t, cmd.ParseParams(params))

require.NoError(t, cmd.Execute(ctx, comm, logger, tconf))

creds := credentials.NewStaticCredentialsProvider(accessKeyID, secretAccessKey, "")

opts := pail.S3Options{
Region: region,
Name: bucketName,
Credentials: creds,
}

bucket, err := pail.NewS3Bucket(ctx, opts)
require.NoError(t, err)

got, err := bucket.Get(ctx, remoteFile)
require.NoError(t, err)

content, err := io.ReadAll(got)
require.NoError(t, err)

assert.Equal(t, payload, content)
}
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (

// Agent version to control agent rollover. The format is the calendar date
// (YYYY-MM-DD).
AgentVersion = "2024-11-01"
AgentVersion = "2024-11-08"
)

const (
Expand Down
28 changes: 14 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
module github.com/evergreen-ci/evergreen

go 1.20
go 1.21

require (
github.com/99designs/gqlgen v0.17.49
github.com/PuerkitoBio/rehttp v1.4.0
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2 v1.32.2
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
github.com/aws/aws-sdk-go-v2/service/ec2 v1.168.0
github.com/aws/aws-sdk-go-v2/service/ecs v1.44.1
github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi v1.23.1
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.32.1
github.com/aws/smithy-go v1.20.3
github.com/aws/smithy-go v1.22.0
github.com/cheynewallace/tabby v1.1.1
github.com/docker/docker v24.0.9+incompatible
github.com/docker/go-connections v0.5.0
Expand All @@ -23,7 +23,7 @@ require (
github.com/evergreen-ci/cocoa v0.0.0-20240523192623-2e730fcd1784
github.com/evergreen-ci/gimlet v0.0.0-20241003144629-4e8f8a178646
github.com/evergreen-ci/juniper v0.0.0-20230901183147-c805ea7351aa
github.com/evergreen-ci/pail v0.0.0-20240812165850-4ccf32c50e99
github.com/evergreen-ci/pail v0.0.0-20241105190255-500f37029da3
github.com/evergreen-ci/poplar v0.0.0-20241014191612-891426af27cb
github.com/evergreen-ci/shrub v0.0.0-20231121224157-600e066f9de6
github.com/evergreen-ci/timber v0.0.0-20240509150854-9d66df03b40e
Expand Down Expand Up @@ -85,13 +85,13 @@ require (
github.com/andybalholm/brotli v1.0.3 // indirect
github.com/andygrunwald/go-jira v1.16.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.27.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.8.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ses v1.19.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.29.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect
Expand Down Expand Up @@ -194,7 +194,7 @@ require (

require (
github.com/aws/aws-sdk-go-v2/service/route53 v1.42.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3
github.com/aws/aws-sdk-go-v2/service/ssm v1.52.3
github.com/bradleyfalzon/ghinstallation v1.1.1
github.com/evergreen-ci/evg-lint v0.0.0-20211115144425-3b19c8e83a57
Expand All @@ -208,11 +208,11 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/go-test/deep v1.1.0 // indirect
Expand Down
Loading