Skip to content

Commit

Permalink
adds more peer validation for s3
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 16, 2023
1 parent 1dbd8f8 commit 28a0270
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
15 changes: 14 additions & 1 deletion flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connmetadata

import (
"context"
"fmt"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -23,7 +24,7 @@ type PostgresMetadataStore struct {
func NewPostgresMetadataStore(ctx context.Context, pgConfig *protos.PostgresConfig,
schemaName string) (*PostgresMetadataStore, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

log.Info("connection string in external metadata: ", connectionString)
pool, err := pgxpool.New(ctx, connectionString)
if err != nil {
log.Errorf("failed to create connection pool: %v", err)
Expand All @@ -47,6 +48,18 @@ func (p *PostgresMetadataStore) Close() error {
return nil
}

func (p *PostgresMetadataStore) Ping() error {
if p.pool == nil {
return fmt.Errorf("metadata db ping failed as pool does not exist")
}
pingErr := p.pool.Ping(p.ctx)
if pingErr != nil {
return fmt.Errorf("metadata db ping failed: %w", pingErr)
}

return nil
}

func (p *PostgresMetadataStore) NeedsSetupMetadata() bool {
// check if schema exists
rows := p.pool.QueryRow(p.ctx, "SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = $1", p.schemaName)
Expand Down
50 changes: 50 additions & 0 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package conns3
import (
"context"
"fmt"
"strings"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -61,6 +63,12 @@ func NewS3Connector(ctx context.Context,
return nil, err
}

validErr := ValidCheck(s3Client, config.Url, pgMetadata)
if validErr != nil {
log.Errorf("failed to validate s3 connector: %v", validErr)
return nil, validErr
}

return &S3Connector{
ctx: ctx,
url: config.Url,
Expand All @@ -85,6 +93,48 @@ func (c *S3Connector) Close() error {
return nil
}

func ValidCheck(s3Client *s3.S3, bucketUrl string, metadataDB *metadataStore.PostgresMetadataStore) error {
_, listErr := s3Client.ListBuckets(nil)
if listErr != nil {
return fmt.Errorf("failed to list buckets: %w", listErr)
}

reader := strings.NewReader("hello world")

bucketPrefix, parseErr := utils.NewS3BucketAndPrefix(bucketUrl)
if parseErr != nil {
return fmt.Errorf("failed to parse bucket url: %w", parseErr)
}

// Write an empty file and then delete it
// to check if we have write permissions
bucketName := aws.String(bucketPrefix.Bucket)
_, putErr := s3Client.PutObject(&s3.PutObjectInput{
Bucket: bucketName,
Key: aws.String("peerdb_check"),
Body: reader,
})
if putErr != nil {
return fmt.Errorf("failed to write to bucket: %w", putErr)
}

_, delErr := s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: bucketName,
Key: aws.String("peerdb_check"),
})
if delErr != nil {
return fmt.Errorf("failed to delete from bucket: %w", delErr)
}

// check if we can ping external metadata
err := metadataDB.Ping()
if err != nil {
return fmt.Errorf("failed to ping external metadata: %w", err)
}

return nil
}

func (c *S3Connector) ConnectionActive() bool {
_, err := c.client.ListBuckets(nil)
return err == nil
Expand Down

0 comments on commit 28a0270

Please sign in to comment.