Skip to content

Commit

Permalink
Interopability With GCS for S3 QRep (#502)
Browse files Browse the repository at this point in the history
### Support for using GCS buckets in S3 QRep

The S3 Peer now looks like:
```sql
CREATE PEER s3_gcs_interop_peer FROM S3 WITH
(
    url = 's3://<gcs_bucket_name>/<prefix>',
    access_key_id = '<access_key_id>',
    secret_access_key = '<secret>',
    region = 'auto' -- preferable for GCS,
    endpoint = 'https://storage.googleapis.com' -- must for GCS
);

```
Note that everything apart from `url` here is an optional field. If you
leave any/all of the other fields out, they will be picked up from the
`docker-compose.yml` file.
- Set the Access Key ID and Secret to your GCS HMAC keys. [Click here
for more information

](https://cloud.google.com/storage/docs/authentication/managing-hmackeys)
- Set the region to `auto` for automatically detecting region based on
bucket location
- Set the endpoint to https://storage.googleapis.com

The conditions mentioned
[here](https://docs.peerdb.io/sql/commands/create-peer#considerations-4)
still hold - ensure the GCS bucket you specify above exists already.

You can also set the AWS environment variables in `docker-compose.yml`

Fixes #501
  • Loading branch information
Amogh-Bharadwaj authored Oct 11, 2023
1 parent 1fa5903 commit 1716500
Show file tree
Hide file tree
Showing 14 changed files with 415 additions and 93 deletions.
12 changes: 9 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ x-catalog-config: &catalog-config

x-flow-worker-env: &flow-worker-env
TEMPORAL_HOST_PORT: temporal:7233
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-""}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-""}
AWS_REGION: ${AWS_REGION:-""}
# For GCS, these will be your HMAC keys instead
# For more information:
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID:-}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY:-}
# For GCS, set this to "auto" without the quotes
AWS_REGION: ${AWS_REGION:-}
# For GCS, set this as: https://storage.googleapis.com
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
# enables worker profiling using Go's pprof
ENABLE_PROFILING: "true"
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (c *S3Connector) writeToAvroFile(
return 0, fmt.Errorf("failed to parse bucket path: %w", err)
}

s3Key := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID)
s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, jobName, partitionID)
writer := avro.NewPeerDBOCFWriter(c.ctx, stream, avroSchema)
numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3Key)
numRecords, err := writer.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, c.creds)
if err != nil {
return 0, fmt.Errorf("failed to write records to S3: %w", err)
}
Expand Down
31 changes: 30 additions & 1 deletion flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,47 @@ type S3Connector struct {
ctx context.Context
url string
client s3.S3
creds utils.S3PeerCredentials
}

func NewS3Connector(ctx context.Context,
s3ProtoConfig *protos.S3Config) (*S3Connector, error) {
s3Client, err := utils.CreateS3Client()
keyID := ""
if s3ProtoConfig.AccessKeyId != nil {
keyID = *s3ProtoConfig.AccessKeyId
}
secretKey := ""
if s3ProtoConfig.SecretAccessKey != nil {
secretKey = *s3ProtoConfig.SecretAccessKey
}
roleArn := ""
if s3ProtoConfig.RoleArn != nil {
roleArn = *s3ProtoConfig.RoleArn
}
region := ""
if s3ProtoConfig.Region != nil {
region = *s3ProtoConfig.Region
}
endpoint := ""
if s3ProtoConfig.Endpoint != nil {
endpoint = *s3ProtoConfig.Endpoint
}
s3PeerCreds := utils.S3PeerCredentials{
AccessKeyID: keyID,
SecretAccessKey: secretKey,
AwsRoleArn: roleArn,
Region: region,
Endpoint: endpoint,
}
s3Client, err := utils.CreateS3Client(s3PeerCreds)
if err != nil {
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}
return &S3Connector{
ctx: ctx,
url: s3ProtoConfig.Url,
client: *s3Client,
creds: s3PeerCreds,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (c *SnowflakeConnector) createStage(stageName string, config *protos.QRepCo
}

func (c *SnowflakeConnector) createExternalStage(stageName string, config *protos.QRepConfig) (string, error) {
awsCreds, err := utils.GetAWSSecrets()
awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{})
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
Expand Down Expand Up @@ -342,7 +342,7 @@ func (c *SnowflakeConnector) dropStage(stagingPath string, job string) error {
log.Infof("Deleting contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job)

// deleting the contents of the bucket with prefix
s3svc, err := utils.CreateS3Client()
s3svc, err := utils.CreateS3Client(utils.S3PeerCredentials{})
if err != nil {
log.WithFields(log.Fields{
"flowName": job,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ func (s *SnowflakeAvroSyncMethod) writeToAvroFile(
return 0, "", fmt.Errorf("failed to parse staging path: %w", err)
}

s3Key := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, s.config.FlowJobName, partitionID)
s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro", s3o.Prefix, s.config.FlowJobName, partitionID)
log.WithFields(log.Fields{
"flowName": flowJobName,
"partitionID": partitionID,
}).Infof("OCF: Writing records to S3")
numRecords, err = ocfWriter.WriteRecordsToS3(s3o.Bucket, s3Key)
numRecords, err = ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{})
if err != nil {
return 0, "", fmt.Errorf("failed to write records to S3: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *PeerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {
return numRows, nil
}

func (p *PeerDBOCFWriter) WriteRecordsToS3(bucketName, key string) (int, error) {
func (p *PeerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (int, error) {
r, w := io.Pipe()
numRowsWritten := make(chan int, 1)
go func() {
Expand All @@ -126,7 +126,7 @@ func (p *PeerDBOCFWriter) WriteRecordsToS3(bucketName, key string) (int, error)
numRowsWritten <- numRows
}()

s3svc, err := utils.CreateS3Client()
s3svc, err := utils.CreateS3Client(s3Creds)
if err != nil {
log.Errorf("failed to create S3 client: %v", err)
return 0, fmt.Errorf("failed to create S3 client: %w", err)
Expand Down
49 changes: 41 additions & 8 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
Expand All @@ -15,17 +16,45 @@ type AWSSecrets struct {
SecretAccessKey string
AwsRoleArn string
Region string
Endpoint string
}

func GetAWSSecrets() (*AWSSecrets, error) {
awsRegion := os.Getenv("AWS_REGION")
type S3PeerCredentials struct {
AccessKeyID string
SecretAccessKey string
AwsRoleArn string
Region string
Endpoint string
}

func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) {
awsRegion := creds.Region
if awsRegion == "" {
awsRegion = os.Getenv("AWS_REGION")
}
if awsRegion == "" {
return nil, fmt.Errorf("AWS_REGION must be set")
}

awsKey := os.Getenv("AWS_ACCESS_KEY_ID")
awsSecret := os.Getenv("AWS_SECRET_ACCESS_KEY")
awsRoleArn := os.Getenv("AWS_ROLE_ARN")
awsEndpoint := creds.Endpoint
if awsEndpoint == "" {
awsEndpoint = os.Getenv("AWS_ENDPOINT")
}

awsKey := creds.AccessKeyID
if awsKey == "" {
awsKey = os.Getenv("AWS_ACCESS_KEY_ID")
}

awsSecret := creds.SecretAccessKey
if awsSecret == "" {
awsSecret = os.Getenv("AWS_SECRET_ACCESS_KEY")
}

awsRoleArn := creds.AwsRoleArn
if awsRoleArn == "" {
awsRoleArn = os.Getenv("AWS_ROLE_ARN")
}

// one of (awsKey and awsSecret) or awsRoleArn must be set
if awsKey == "" && awsSecret == "" && awsRoleArn == "" {
Expand All @@ -37,6 +66,7 @@ func GetAWSSecrets() (*AWSSecrets, error) {
SecretAccessKey: awsSecret,
AwsRoleArn: awsRoleArn,
Region: awsRegion,
Endpoint: awsEndpoint,
}, nil
}

Expand Down Expand Up @@ -66,14 +96,17 @@ func NewS3BucketAndPrefix(s3Path string) (*S3BucketAndPrefix, error) {
}, nil
}

func CreateS3Client() (*s3.S3, error) {
awsSecrets, err := GetAWSSecrets()
func CreateS3Client(s3Creds S3PeerCredentials) (*s3.S3, error) {
awsSecrets, err := GetAWSSecrets(s3Creds)
if err != nil {
return nil, fmt.Errorf("failed to get AWS secrets: %w", err)
}

sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(awsSecrets.Region),
Region: aws.String(awsSecrets.Region),
Endpoint: aws.String(awsSecrets.Endpoint),
Credentials: credentials.NewStaticCredentials(
awsSecrets.AccessKeyID, awsSecrets.SecretAccessKey, ""),
}))

s3svc := s3.New(sess)
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/s3/s3_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type S3TestHelper struct {
}

func NewS3TestHelper() (*S3TestHelper, error) {
client, err := utils.CreateS3Client()
client, err := utils.CreateS3Client(utils.S3PeerCredentials{})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 1716500

Please sign in to comment.