From f0966545d220c9625d915aeae06d69c7b903bffa Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 16 Nov 2023 21:10:40 +0530 Subject: [PATCH] adds more peer validation for s3 --- flow/connectors/external_metadata/store.go | 15 ++++++- flow/connectors/s3/s3.go | 50 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 6ec7c07142..25ed960fee 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -2,6 +2,7 @@ package connmetadata import ( "context" + "fmt" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -23,7 +24,7 @@ type PostgresMetadataStore struct { func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig, schemaName string) (*PostgresMetadataStore, error) { connectionString := utils.GetPGConnectionString(pgConfig) - + log.Info("connection string in external metadata: ", connectionString) pool, err := pgxpool.New(ctx, connectionString) if err != nil { log.Errorf("failed to create connection pool: %v", err) @@ -47,6 +48,18 @@ func (p *PostgresMetadataStore) Close() error { return nil } +func (p *PostgresMetadataStore) Ping() error { + if p.pool == nil { + return fmt.Errorf("metadata db ping failed as pool does not exist") + } + pingErr := p.pool.Ping(p.ctx) + if pingErr != nil { + return fmt.Errorf("metadata db ping failed: %w", pingErr) + } + + return nil +} + func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { // check if schema exists rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index fb6ed77d4b..f8f0898585 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -3,11 +3,13 @@ package conns3 import ( "context" "fmt" + "strings" metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" log "github.com/sirupsen/logrus" ) @@ -61,6 +63,12 @@ func NewS3Connector(ctx context.Context, return nil, err } + validErr := ValidCheck(s3Client, config.Url, pgMetadata) + if validErr != nil { + log.Errorf("failed to validate s3 connector: %v", validErr) + return nil, validErr + } + return &S3Connector{ ctx: ctx, url: config.Url, @@ -85,6 +93,48 @@ func (c *S3Connector) Close() error { return nil } +func ValidCheck(s3Client *s3.S3, bucketUrl string, metadataDB *metadataStore.PostgresMetadataStore) error { + _, listErr := s3Client.ListBuckets(nil) + if listErr != nil { + return fmt.Errorf("failed to list buckets: %w", listErr) + } + + reader := strings.NewReader("hello world") + + bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketUrl) + if parseErr != nil { + return fmt.Errorf("failed to parse bucket url: %w", parseErr) + } + + // Write an empty file and then delete it + // to check if we have write permissions + bucketName := aws.String(bucketPrefix.Bucket) + _, putErr := s3Client.PutObject(&s3.PutObjectInput{ + Bucket: bucketName, + Key: aws.String("peerdb_check"), + Body: reader, + }) + if putErr != nil { + return fmt.Errorf("failed to write to bucket: %w", putErr) + } + + _, delErr := s3Client.DeleteObject(&s3.DeleteObjectInput{ + Bucket: bucketName, + Key: aws.String("peerdb_check"), + }) + if delErr != nil { + return fmt.Errorf("failed to delete from bucket: %w", delErr) + } + + // check if we can ping external metadata + err := metadataDB.Ping() + if err != nil { + return fmt.Errorf("failed to ping external metadata: %w", err) + } + + return nil +} + func (c *S3Connector) ConnectionActive() bool { _, err := c.client.ListBuckets(nil) return err == nil