Skip to content

Commit

Permalink
move putremoves3 to utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Apr 26, 2024
1 parent c8db76a commit ad2eef5
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 37 deletions.
3 changes: 1 addition & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"golang.org/x/mod/semver"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
conns3 "github.com/PeerDB-io/peer-flow/connectors/s3"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
Expand Down Expand Up @@ -45,7 +44,7 @@ func ValidateS3(ctx context.Context, creds *utils.ClickHouseS3Credentials) error
return fmt.Errorf("failed to create S3 bucket and prefix: %w", err)
}

return conns3.PutAndRemoveS3(ctx, s3Client, object.Bucket, object.Prefix)
return utils.PutAndRemoveS3(ctx, s3Client, object.Bucket, object.Prefix)
}

// Creates and drops a dummy table to validate the peer
Expand Down
36 changes: 1 addition & 35 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
"go.temporal.io/sdk/log"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
Expand All @@ -19,10 +16,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
)

const (
_peerDBCheck = "peerdb_check"
)

type S3Connector struct {
*metadataStore.PostgresMetadata
logger log.Logger
Expand Down Expand Up @@ -77,40 +70,13 @@ func (c *S3Connector) Close() error {
return nil
}

// Write an empty file and then delete it
// to check if we have write permissions
func PutAndRemoveS3(ctx context.Context, client *s3.Client, bucket string, prefix string) error {
reader := strings.NewReader(time.Now().Format(time.RFC3339))
bucketName := aws.String(bucket)
temporaryObjectPath := prefix + "/" + _peerDBCheck + uuid.New().String()
temporaryObjectPath = strings.TrimPrefix(temporaryObjectPath, "/")
_, putErr := client.PutObject(ctx, &s3.PutObjectInput{
Bucket: bucketName,
Key: aws.String(temporaryObjectPath),
Body: reader,
})
if putErr != nil {
return fmt.Errorf("failed to write to bucket: %w", putErr)
}

_, delErr := client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: bucketName,
Key: aws.String(temporaryObjectPath),
})
if delErr != nil {
return fmt.Errorf("failed to delete from bucket: %w", delErr)
}

return nil
}

func (c *S3Connector) ValidateCheck(ctx context.Context) error {
bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(c.url)
if parseErr != nil {
return fmt.Errorf("failed to parse bucket url: %w", parseErr)
}

return PutAndRemoveS3(ctx, &c.client, bucketPrefix.Bucket, bucketPrefix.Prefix)
return utils.PutAndRemoveS3(ctx, &c.client, bucketPrefix.Bucket, bucketPrefix.Prefix)
}

func (c *S3Connector) ConnectionActive(ctx context.Context) error {
Expand Down
32 changes: 32 additions & 0 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/s3"
smithyendpoints "github.com/aws/smithy-go/endpoints"
"github.com/google/uuid"

"github.com/PeerDB-io/peer-flow/logger"
)

const (
_peerDBCheck = "peerdb_check"
)

var s3CompatibleServiceEndpointPattern = regexp.MustCompile(`^https?://[a-zA-Z0-9.-]+(:\d+)?$`)

type AWSSecrets struct {
Expand Down Expand Up @@ -319,3 +324,30 @@ func (lt *RecalculateV4Signature) RoundTrip(req *http.Request) (*http.Response,
// follows up the original round tripper
return lt.next.RoundTrip(req)
}

// Write an empty file and then delete it
// to check if we have write permissions
func PutAndRemoveS3(ctx context.Context, client *s3.Client, bucket string, prefix string) error {
reader := strings.NewReader(time.Now().Format(time.RFC3339))
bucketName := aws.String(bucket)
temporaryObjectPath := prefix + "/" + _peerDBCheck + uuid.New().String()
temporaryObjectPath = strings.TrimPrefix(temporaryObjectPath, "/")
_, putErr := client.PutObject(ctx, &s3.PutObjectInput{
Bucket: bucketName,
Key: aws.String(temporaryObjectPath),
Body: reader,
})
if putErr != nil {
return fmt.Errorf("failed to write to bucket: %w", putErr)
}

_, delErr := client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: bucketName,
Key: aws.String(temporaryObjectPath),
})
if delErr != nil {
return fmt.Errorf("failed to delete from bucket: %w", delErr)
}

return nil
}

0 comments on commit ad2eef5

Please sign in to comment.