Skip to content

Commit

Permalink
DEVPROD-12040: Add conditional writes to s3.put
Browse files Browse the repository at this point in the history
This commit updates s3.put to support S3's new conditional writes
feature. The conditional writes feature guarantees that a write will not
succeed if the remote key already exists. This check respects s3's
strong read-after-write data model.

In order to enable this feature, I updated to the latest version of pail.
This required bumping the minimum go version of this project to 1.21.

While adding this feature, I realized that the existing skip_existing
implementation is buggy and has not been working correctly because of the
final check we do for updatedFiles equaling fileList. This commit also
fixes that bug.
  • Loading branch information
drichmdb committed Nov 18, 2024
1 parent 6ccb2df commit b29d749
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 73 deletions.
60 changes: 33 additions & 27 deletions agent/command/s3_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"github.com/evergreen-ci/evergreen"
"github.com/evergreen-ci/evergreen/agent/internal"
"github.com/evergreen-ci/evergreen/agent/internal/client"
Expand Down Expand Up @@ -96,7 +97,9 @@ type s3put struct {
// PatchOnly defaults to false. If set to true, this command will noop without error for non-patch tasks.
PatchOnly string `mapstructure:"patch_only" plugin:"patch_only"`

// SkipExisting, when set to true, will not upload files if they already exist in s3.
// SkipExisting, when set to 'true', will not upload files if they already
// exist in s3. This will not cause the s3.put command to fail. This
// behavior respects s3's strong read-after-write consistency model.
SkipExisting string `mapstructure:"skip_existing" plugin:"expand"`

// workDir sets the working directory relative to which s3put should look for files to upload.
Expand Down Expand Up @@ -349,6 +352,7 @@ func (s3pc *s3put) putWithRetry(ctx context.Context, comm client.Communicator, l
err error
uploadedFiles []string
filesList []string
skippedFiles []string
)

timer := time.NewTimer(0)
Expand Down Expand Up @@ -390,6 +394,7 @@ retryLoop:

// reset to avoid duplicated uploaded references
uploadedFiles = []string{}
skippedFiles = make([]string, 0)

uploadLoop:
for _, fpath := range filesList {
Expand All @@ -410,16 +415,6 @@ retryLoop:

fpath = filepath.Join(filepath.Join(s3pc.workDir, s3pc.LocalFilesIncludeFilterPrefix), fpath)

if s3pc.skipExistingBool {
exists, err := s3pc.remoteFileExists(ctx, remoteName)
if err != nil {
return errors.Wrapf(err, "checking if file '%s' exists", remoteName)
}
if exists {
logger.Task().Infof("Not uploading file '%s' because remote file '%s' already exists. Continuing to upload other files.", fpath, remoteName)
continue uploadLoop
}
}
err = s3pc.bucket.Upload(ctx, remoteName, fpath)
if err != nil {
// retry errors other than "file doesn't exist", which we handle differently based on what
Expand All @@ -440,6 +435,22 @@ retryLoop:
}
}

if s3pc.skipExistingBool {
if s3pc.preservePath {
skippedFiles = append(skippedFiles, remoteName)
} else {
skippedFiles = append(skippedFiles, fpath)
}

var ae smithy.APIError

if errors.As(err, &ae) {
if ae.ErrorCode() == "PreconditionFailed" {
continue uploadLoop
}
}
}

// in all other cases, log an error and retry after an interval.
logger.Task().Error(errors.WithMessage(err, "putting S3 file"))
timer.Reset(backoffCounter.Duration())
Expand Down Expand Up @@ -469,9 +480,11 @@ retryLoop:

logger.Task().WarningWhen(strings.Contains(s3pc.Bucket, "."), "Bucket names containing dots that are created after Sept. 30, 2020 are not guaranteed to have valid attached URLs.")

if len(uploadedFiles) != len(filesList) && !s3pc.skipMissing {
logger.Task().Infof("Attempted to upload %d files, %d successfully uploaded.", len(filesList), len(uploadedFiles))
return errors.Errorf("uploaded %d files of %d requested", len(uploadedFiles), len(filesList))
processedCount := len(skippedFiles) + len(uploadedFiles)

if processedCount != len(filesList) && !s3pc.skipMissing {
logger.Task().Infof("Attempted to upload %d files, %d successfully uploaded.", len(filesList), processedCount)
return errors.Errorf("uploaded %d files of %d requested", processedCount, len(filesList))
}

return nil
Expand Down Expand Up @@ -534,13 +547,19 @@ func (s3pc *s3put) createPailBucket(ctx context.Context, httpClient *http.Client
if s3pc.bucket != nil {
return nil
}

opts := pail.S3Options{
Credentials: pail.CreateAWSCredentials(s3pc.AwsKey, s3pc.AwsSecret, s3pc.AwsSessionToken),
Region: s3pc.Region,
Name: s3pc.Bucket,
Permissions: pail.S3Permissions(s3pc.Permissions),
ContentType: s3pc.ContentType,
}

if s3pc.skipExistingBool {
opts.IfNotExists = true
}

bucket, err := pail.NewS3MultiPartBucketWithHTTPClient(ctx, httpClient, opts)
s3pc.bucket = bucket
return err
Expand All @@ -557,16 +576,3 @@ func (s3pc *s3put) isPublic() bool {
return (s3pc.Visibility == "" || s3pc.Visibility == artifact.Public) &&
(s3pc.Permissions == string(s3Types.BucketCannedACLPublicRead) || s3pc.Permissions == string(s3Types.BucketCannedACLPublicReadWrite))
}

func (s3pc *s3put) remoteFileExists(ctx context.Context, remoteName string) (bool, error) {
opts := pail.S3Options{
Name: s3pc.Bucket,
Credentials: pail.CreateAWSCredentials(s3pc.AwsKey, s3pc.AwsSecret, s3pc.AwsSessionToken),
Region: s3pc.Region,
}
bucket, err := pail.NewS3Bucket(ctx, opts)
if err != nil {
return false, errors.Wrap(err, "creating S3 bucket")
}
return bucket.Exists(ctx, remoteName)
}
86 changes: 86 additions & 0 deletions agent/command/s3_put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@ package command

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/aws/aws-sdk-go-v2/credentials"
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/evergreen-ci/evergreen/agent/internal"
"github.com/evergreen-ci/evergreen/agent/internal/client"
"github.com/evergreen-ci/evergreen/model"
"github.com/evergreen-ci/evergreen/model/artifact"
"github.com/evergreen-ci/evergreen/model/task"
"github.com/evergreen-ci/evergreen/testutil"
"github.com/evergreen-ci/evergreen/util"
"github.com/evergreen-ci/pail"
"github.com/evergreen-ci/utility"
"github.com/mongodb/grip/send"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -639,3 +646,82 @@ func TestPreservePath(t *testing.T) {
}

}

func TestS3PutConditionalWrite(t *testing.T) {
if skip, _ := strconv.ParseBool(os.Getenv("SKIP_INTEGRATION_TESTS")); skip {
t.Skip("SKIP_INTEGRATION_TESTS is set, skipping integration test")
}

temproot := t.TempDir()

settings := testutil.GetIntegrationFile(t)

firstFilePath := filepath.Join(temproot, "first-file.txt")
secondFilePath := filepath.Join(temproot, "second-file.txt")

payload := []byte("hello world")
require.NoError(t, os.WriteFile(firstFilePath, payload, 0755))
require.NoError(t, os.WriteFile(secondFilePath, []byte("second file"), 0755))

accessKeyID := settings.Expansions["aws_key"]
secretAccessKey := settings.Expansions["aws_secret"]
bucketName := settings.Expansions["bucket"]
region := "us-east-1"

id := utility.RandomString()

remoteFile := fmt.Sprintf("tests/%s/%s", t.Name(), id)

cmd := s3PutFactory()
params := map[string]any{
"aws_key": accessKeyID,
"aws_secret": secretAccessKey,
"local_file": firstFilePath,
"remote_file": remoteFile,
"bucket": bucketName,
"region": region,
"skip_existing": "true",
"content_type": "text/plain",
"permissions": "private",
}

require.NoError(t, cmd.ParseParams(params))

tconf := &internal.TaskConfig{
Task: task.Task{},
WorkDir: temproot,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sender := send.MakeInternalLogger()
logger := client.NewSingleChannelLogHarness("test", sender)
comm := client.NewMock("")

require.NoError(t, cmd.Execute(ctx, comm, logger, tconf))

params["local_file"] = secondFilePath
require.NoError(t, cmd.ParseParams(params))

require.NoError(t, cmd.Execute(ctx, comm, logger, tconf))

creds := credentials.NewStaticCredentialsProvider(accessKeyID, secretAccessKey, "")

opts := pail.S3Options{
Region: region,
Name: bucketName,
Credentials: creds,
}

bucket, err := pail.NewS3Bucket(ctx, opts)
require.NoError(t, err)

got, err := bucket.Get(ctx, remoteFile)
require.NoError(t, err)

content, err := io.ReadAll(got)
require.NoError(t, err)

assert.Equal(t, payload, content)
}
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (

// Agent version to control agent rollover. The format is the calendar date
// (YYYY-MM-DD).
AgentVersion = "2024-11-01"
AgentVersion = "2024-11-08"
)

const (
Expand Down
28 changes: 14 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
module github.com/evergreen-ci/evergreen

go 1.20
go 1.21

require (
github.com/99designs/gqlgen v0.17.49
github.com/PuerkitoBio/rehttp v1.4.0
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2 v1.32.2
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
github.com/aws/aws-sdk-go-v2/service/ec2 v1.168.0
github.com/aws/aws-sdk-go-v2/service/ecs v1.44.1
github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi v1.23.1
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.32.1
github.com/aws/smithy-go v1.20.3
github.com/aws/smithy-go v1.22.0
github.com/cheynewallace/tabby v1.1.1
github.com/docker/docker v24.0.9+incompatible
github.com/docker/go-connections v0.5.0
Expand All @@ -23,7 +23,7 @@ require (
github.com/evergreen-ci/cocoa v0.0.0-20240523192623-2e730fcd1784
github.com/evergreen-ci/gimlet v0.0.0-20241003144629-4e8f8a178646
github.com/evergreen-ci/juniper v0.0.0-20230901183147-c805ea7351aa
github.com/evergreen-ci/pail v0.0.0-20240812165850-4ccf32c50e99
github.com/evergreen-ci/pail v0.0.0-20241105190255-500f37029da3
github.com/evergreen-ci/poplar v0.0.0-20241014191612-891426af27cb
github.com/evergreen-ci/shrub v0.0.0-20231121224157-600e066f9de6
github.com/evergreen-ci/timber v0.0.0-20240509150854-9d66df03b40e
Expand Down Expand Up @@ -85,13 +85,13 @@ require (
github.com/andybalholm/brotli v1.0.3 // indirect
github.com/andygrunwald/go-jira v1.16.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.27.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.8.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ses v1.19.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.29.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect
Expand Down Expand Up @@ -194,7 +194,7 @@ require (

require (
github.com/aws/aws-sdk-go-v2/service/route53 v1.42.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3
github.com/aws/aws-sdk-go-v2/service/ssm v1.52.3
github.com/bradleyfalzon/ghinstallation v1.1.1
github.com/evergreen-ci/evg-lint v0.0.0-20211115144425-3b19c8e83a57
Expand All @@ -208,11 +208,11 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/go-test/deep v1.1.0 // indirect
Expand Down
Loading

0 comments on commit b29d749

Please sign in to comment.