Skip to content

Commit

Permalink
flow: replace aws-sdk-go with aws-sdk-go-v2 (#1115)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 22, 2024
1 parent 785facf commit 6997d5e
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 158 deletions.
30 changes: 20 additions & 10 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"google.golang.org/protobuf/encoding/protojson"
)

Expand Down Expand Up @@ -177,16 +176,27 @@ func (c *ClickhouseConnector) dropStage(stagingPath string, job string) error {
}

// Create a list of all objects with the defined prefix in the bucket
iter := s3manager.NewDeleteListIterator(s3svc, &s3.ListObjectsInput{
pages := s3.NewListObjectsV2Paginator(s3svc, &s3.ListObjectsV2Input{
Bucket: aws.String(s3o.Bucket),
Prefix: aws.String(fmt.Sprintf("%s/%s", s3o.Prefix, job)),
})

// Iterate through the objects in the bucket with the prefix and delete them
s3Client := s3manager.NewBatchDeleteWithClient(s3svc)
if err := s3Client.Delete(aws.BackgroundContext(), iter); err != nil {
c.logger.Error("failed to delete objects from bucket", slog.Any("error", err))
return fmt.Errorf("failed to delete objects from bucket: %w", err)
for pages.HasMorePages() {
page, err := pages.NextPage(c.ctx)
if err != nil {
c.logger.Error("failed to list objects from bucket", slog.Any("error", err))
return fmt.Errorf("failed to list objects from bucket: %w", err)
}

for _, object := range page.Contents {
_, err = s3svc.DeleteObject(c.ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s3o.Bucket),
Key: object.Key,
})
if err != nil {
c.logger.Error("failed to delete objects from bucket", slog.Any("error", err))
return fmt.Errorf("failed to delete objects from bucket: %w", err)
}
}
}

c.logger.Info(fmt.Sprintf("Deleted contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job))
Expand Down
18 changes: 9 additions & 9 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

const (
Expand All @@ -24,7 +24,7 @@ type S3Connector struct {
ctx context.Context
url string
pgMetadata *metadataStore.PostgresMetadataStore
client s3.S3
client s3.Client
creds utils.S3PeerCredentials
logger slog.Logger
}
Expand Down Expand Up @@ -91,8 +91,8 @@ func (c *S3Connector) Close() error {
return c.pgMetadata.Close()
}

func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error {
_, listErr := s3Client.ListBuckets(nil)
func ValidCheck(ctx context.Context, s3Client *s3.Client, bucketURL string, metadataDB *metadataStore.PostgresMetadataStore) error {
_, listErr := s3Client.ListBuckets(ctx, nil)
if listErr != nil {
return fmt.Errorf("failed to list buckets: %w", listErr)
}
Expand All @@ -107,7 +107,7 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos
// Write an empty file and then delete it
// to check if we have write permissions
bucketName := aws.String(bucketPrefix.Bucket)
_, putErr := s3Client.PutObject(&s3.PutObjectInput{
_, putErr := s3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: bucketName,
Key: aws.String(_peerDBCheck),
Body: reader,
Expand All @@ -116,7 +116,7 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos
return fmt.Errorf("failed to write to bucket: %w", putErr)
}

_, delErr := s3Client.DeleteObject(&s3.DeleteObjectInput{
_, delErr := s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: bucketName,
Key: aws.String(_peerDBCheck),
})
Expand All @@ -134,12 +134,12 @@ func ValidCheck(s3Client *s3.S3, bucketURL string, metadataDB *metadataStore.Pos
}

func (c *S3Connector) ConnectionActive() error {
_, listErr := c.client.ListBuckets(nil)
_, listErr := c.client.ListBuckets(c.ctx, nil)
if listErr != nil {
return listErr
}

validErr := ValidCheck(&c.client, c.url, c.pgMetadata)
validErr := ValidCheck(c.ctx, &c.client, c.url, c.pgMetadata)
if validErr != nil {
c.logger.Error("failed to validate s3 connector:", slog.Any("error", validErr))
return validErr
Expand Down
30 changes: 20 additions & 10 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/jackc/pgx/v5/pgtype"
"google.golang.org/protobuf/encoding/protojson"
)
Expand Down Expand Up @@ -338,16 +337,27 @@ func (c *SnowflakeConnector) dropStage(stagingPath string, job string) error {
}

// Create a list of all objects with the defined prefix in the bucket
iter := s3manager.NewDeleteListIterator(s3svc, &s3.ListObjectsInput{
pages := s3.NewListObjectsV2Paginator(s3svc, &s3.ListObjectsV2Input{
Bucket: aws.String(s3o.Bucket),
Prefix: aws.String(fmt.Sprintf("%s/%s", s3o.Prefix, job)),
})

// Iterate through the objects in the bucket with the prefix and delete them
s3Client := s3manager.NewBatchDeleteWithClient(s3svc)
if err := s3Client.Delete(aws.BackgroundContext(), iter); err != nil {
c.logger.Error("failed to delete objects from bucket", slog.Any("error", err))
return fmt.Errorf("failed to delete objects from bucket: %w", err)
for pages.HasMorePages() {
page, err := pages.NextPage(c.ctx)
if err != nil {
c.logger.Error("failed to list objects from bucket", slog.Any("error", err))
return fmt.Errorf("failed to list objects from bucket: %w", err)
}

for _, object := range page.Contents {
_, err = s3svc.DeleteObject(c.ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s3o.Bucket),
Key: object.Key,
})
if err != nil {
c.logger.Error("failed to delete objects from bucket", slog.Any("error", err))
return fmt.Errorf("failed to delete objects from bucket: %w", err)
}
}
}

c.logger.Info(fmt.Sprintf("Deleted contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job))
Expand Down
13 changes: 5 additions & 8 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
Expand Down Expand Up @@ -201,11 +202,7 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}

// Create an uploader with the session and default options
uploader := s3manager.NewUploaderWithClient(s3svc)

// Upload the file to S3.
result, err := uploader.Upload(&s3manager.UploadInput{
_, err = manager.NewUploader(s3svc).Upload(p.ctx, &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: r,
Expand All @@ -216,7 +213,7 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils
return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err)
}

slog.Info("file uploaded to" + result.Location)
slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key))

return &AvroFile{
NumRecords: <-numRowsWritten,
Expand Down
74 changes: 59 additions & 15 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package utils

import (
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go-v2/aws"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

type AWSSecrets struct {
Expand Down Expand Up @@ -89,23 +91,65 @@ func NewS3BucketAndPrefix(s3Path string) (*S3BucketAndPrefix, error) {
}, nil
}

func CreateS3Client(s3Creds S3PeerCredentials) (*s3.S3, error) {
func CreateS3Client(s3Creds S3PeerCredentials) (*s3.Client, error) {
awsSecrets, err := GetAWSSecrets(s3Creds)
if err != nil {
return nil, fmt.Errorf("failed to get AWS secrets: %w", err)
}

config := &aws.Config{
Region: aws.String(awsSecrets.Region),
Endpoint: aws.String(awsSecrets.Endpoint),
options := s3.Options{
Region: awsSecrets.Region,
Credentials: credentials.NewStaticCredentialsProvider(awsSecrets.AccessKeyID, awsSecrets.SecretAccessKey, ""),
}

if s3Creds.AccessKeyID != "" && s3Creds.SecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(s3Creds.AccessKeyID, s3Creds.SecretAccessKey, "")
if awsSecrets.Endpoint != "" {
options.BaseEndpoint = &awsSecrets.Endpoint
if strings.Contains(awsSecrets.Endpoint, "storage.googleapis.com") {
// Assign custom client with our own transport
options.HTTPClient = &http.Client{
Transport: &RecalculateV4Signature{
next: http.DefaultTransport,
signer: v4.NewSigner(),
credentials: options.Credentials,
region: options.Region,
},
}
}
}

sess := session.Must(session.NewSession(config))
return s3.New(options), nil
}

// RecalculateV4Signature allow GCS over S3, removing Accept-Encoding header from sign
// https://stackoverflow.com/a/74382598/1204665
// https://github.com/aws/aws-sdk-go-v2/issues/1816
type RecalculateV4Signature struct {
next http.RoundTripper
signer *v4.Signer
credentials aws.CredentialsProvider
region string
}

func (lt *RecalculateV4Signature) RoundTrip(req *http.Request) (*http.Response, error) {
// store for later use
acceptEncodingValue := req.Header.Get("Accept-Encoding")

// delete the header so the header doesn't account for in the signature
req.Header.Del("Accept-Encoding")

// sign with the same date
timeString := req.Header.Get("X-Amz-Date")
timeDate, _ := time.Parse("20060102T150405Z", timeString)

creds, err := lt.credentials.Retrieve(req.Context())
if err != nil {
return nil, err
}
err = lt.signer.SignHTTP(req.Context(), creds, req, v4.GetPayloadHash(req.Context()), "s3", lt.region, timeDate)
if err != nil {
return nil, err
}
// Reset Accept-Encoding if desired
req.Header.Set("Accept-Encoding", acceptEncodingValue)

s3svc := s3.New(sess)
return s3svc, nil
// follows up the original round tripper
return lt.next.RoundTrip(req)
}
2 changes: 1 addition & 1 deletion flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s PeerFlowE2ETestSuiteS3) Suffix() string {
func tearDownSuite(s PeerFlowE2ETestSuiteS3) {
e2e.TearDownPostgres(s)

err := s.s3Helper.CleanUp()
err := s.s3Helper.CleanUp(context.Background())
if err != nil {
require.Fail(s.t, "failed to clean up s3", err)
}
Expand Down
18 changes: 9 additions & 9 deletions flow/e2e/s3/s3_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ import (
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
)

const (
peerName string = "test_s3_peer"
)

type S3TestHelper struct {
client *s3.S3
client *s3.Client
s3Config *protos.S3Config
bucketName string
prefix string
Expand Down Expand Up @@ -90,10 +90,10 @@ func (h *S3TestHelper) GetPeer() *protos.Peer {
func (h *S3TestHelper) ListAllFiles(
ctx context.Context,
jobName string,
) ([]*s3.Object, error) {
) ([]s3types.Object, error) {
Bucket := h.bucketName
Prefix := fmt.Sprintf("%s/%s/", h.prefix, jobName)
files, err := h.client.ListObjects(&s3.ListObjectsInput{
files, err := h.client.ListObjects(ctx, &s3.ListObjectsInput{
Bucket: &Bucket,
Prefix: &Prefix,
})
Expand All @@ -106,10 +106,10 @@ func (h *S3TestHelper) ListAllFiles(
}

// Delete all generated objects during the test
func (h *S3TestHelper) CleanUp() error {
func (h *S3TestHelper) CleanUp(ctx context.Context) error {
Bucket := h.bucketName
Prefix := h.prefix
files, err := h.client.ListObjects(&s3.ListObjectsInput{
files, err := h.client.ListObjects(ctx, &s3.ListObjectsInput{
Bucket: &Bucket,
Prefix: &Prefix,
})
Expand All @@ -121,11 +121,11 @@ func (h *S3TestHelper) CleanUp() error {
// Delete each object
for _, obj := range files.Contents {
deleteInput := &s3.DeleteObjectInput{
Bucket: aws.String(Bucket),
Bucket: &Bucket,
Key: obj.Key,
}

_, err := h.client.DeleteObject(deleteInput)
_, err := h.client.DeleteObject(ctx, deleteInput)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 6997d5e

Please sign in to comment.