Skip to content

Commit

Permalink
refactor s3 wiring for clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 12, 2024
1 parent 02ea4ce commit d995aee
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 80 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
}

qrepConfig := &protos.QRepConfig{
StagingPath: c.config.S3Path,
StagingPath: c.creds.BucketPath,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
Expand Down
65 changes: 55 additions & 10 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
)

type ClickhouseConnector struct {
Expand All @@ -23,24 +24,53 @@ type ClickhouseConnector struct {
tableSchemaMapping map[string]*protos.TableSchema
logger log.Logger
config *protos.ClickhouseConfig
creds utils.S3PeerCredentials
creds *utils.ClickhouseS3Credentials
}

func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error {
func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error {
// for validation purposes
s3Client, err := utils.CreateS3Client(creds)
s3Client, err := utils.CreateS3Client(utils.S3PeerCredentials{
AccessKeyID: creds.AccessKeyID,
SecretAccessKey: creds.SecretAccessKey,
Region: creds.Region,
})
if err != nil {
return fmt.Errorf("failed to create S3 client: %w", err)
}

validErr := conns3.ValidCheck(ctx, s3Client, bucketUrl, nil)
validErr := conns3.ValidCheck(ctx, s3Client, creds.BucketPath, nil)
if validErr != nil {
return validErr
}

return nil
}

// Creates and drops a dummy table to validate the peer
func ValidateClickhouse(ctx context.Context, conn *sql.DB) error {
validateDummyTableName := fmt.Sprintf("peerdb_validation_%s", shared.RandomString(4))
// create a table
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory",
validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err)
}

// insert a row
_, err = conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err)
}

// drop the table
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err)
}

return nil
}

func NewClickhouseConnector(
ctx context.Context,
config *protos.ClickhouseConfig,
Expand All @@ -51,19 +81,34 @@ func NewClickhouseConnector(
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

err = ValidateClickhouse(ctx, database)
if err != nil {
return nil, fmt.Errorf("invalidated Clickhouse peer: %w", err)
}

pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
logger.Error("failed to create postgres metadata store", "error", err)
return nil, err
}

s3PeerCreds := utils.S3PeerCredentials{
AccessKeyID: config.AccessKeyId,
SecretAccessKey: config.SecretAccessKey,
Region: config.Region,
var clickhouseS3Creds *utils.ClickhouseS3Credentials
deploymentUID := shared.GetDeploymentUID()
flowName, _ := ctx.Value(shared.FlowNameKey).(string)
bucketPathSuffix := fmt.Sprintf("%s/%s", deploymentUID, flowName)
// Get S3 credentials from environment
clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix)
if clickhouseS3Creds.AccessKeyID == "" && clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" {
// Fallback: user provided S3 credentials
clickhouseS3Creds = &utils.ClickhouseS3Credentials{
AccessKeyID: config.AccessKeyId,
SecretAccessKey: config.SecretAccessKey,
Region: config.Region,
BucketPath: config.S3Path,
}
}

validateErr := ValidateS3(ctx, config.S3Path, s3PeerCreds)
validateErr := ValidateS3(ctx, clickhouseS3Creds)
if validateErr != nil {
return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr)
}
Expand All @@ -73,7 +118,7 @@ func NewClickhouseConnector(
pgMetadata: pgMetadata,
tableSchemaMapping: nil,
config: config,
creds: s3PeerCreds,
creds: clickhouseS3Creds,
logger: logger,
}, nil
}
Expand Down
32 changes: 14 additions & 18 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,20 @@ func NewClickhouseAvroSyncMethod(
}

func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, avroFile *avro.AvroFile) error {
stagingPath := s.config.StagingPath
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
stagingPath := s.connector.creds.BucketPath
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return err
}
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath)

if err != nil {
return err
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)
s.config.DestinationTableIdentifier, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -102,9 +99,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
) (int, error) {
startTime := time.Now()
dstTableName := config.DestinationTableIdentifier

stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Path

stagingPath := s.connector.creds.BucketPath
schema, err := stream.Schema()
if err != nil {
return -1, fmt.Errorf("failed to get schema from stream: %w", err)
Expand All @@ -123,16 +118,16 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, err
}
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath)

if err != nil {
return 0, err
}
selector := strings.Join(schema.GetColumnNames(), ",")
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')",
config.DestinationTableIdentifier, selector, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)
config.DestinationTableIdentifier, selector, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -168,10 +163,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
partitionID string,
flowJobName string,
) (*avro.AvroFile, error) {
stagingPath := s.config.StagingPath // "s3://avro-clickhouse"
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
stagingPath := s.connector.creds.BucketPath
ocfWriter := avro.NewPeerDBOCFWriter(stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse)
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
Expand All @@ -181,7 +173,11 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName
s3AvroFileKey = strings.Trim(s3AvroFileKey, "/")

avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, s.connector.creds)
avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{
AccessKeyID: s.connector.creds.AccessKeyID,
SecretAccessKey: s.connector.creds.SecretAccessKey,
Region: s.connector.creds.Region,
})
if err != nil {
return nil, fmt.Errorf("failed to write records to S3: %w", err)
}
Expand Down
22 changes: 22 additions & 0 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ type S3PeerCredentials struct {
Endpoint string `json:"endpoint"`
}

type ClickhouseS3Credentials struct {
AccessKeyID string `json:"accessKeyId"`
SecretAccessKey string `json:"secretAccessKey"`
Region string `json:"region"`
BucketPath string `json:"bucketPath"`
}

func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) {
awsRegion := creds.Region
if awsRegion == "" {
Expand Down Expand Up @@ -72,6 +79,21 @@ func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) {
}, nil
}

func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials {
awsRegion := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION")
awsKey := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID")
awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY")
awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME")

awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix)
return &ClickhouseS3Credentials{
AccessKeyID: awsKey,
SecretAccessKey: awsSecret,
Region: awsRegion,
BucketPath: awsBucketPath,
}
}

type S3BucketAndPrefix struct {
Bucket string
Prefix string
Expand Down
4 changes: 4 additions & 0 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,7 @@ func prependUIDToTaskQueueName(taskQueueName string) string {
}
return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName)
}

func GetDeploymentUID() string {
return peerdbenv.PeerDBDeploymentUID()
}
72 changes: 40 additions & 32 deletions ui/app/peers/create/[peerType]/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,34 +229,6 @@ export const bqSchema = z.object({
.max(1024, 'DatasetID must be less than 1025 characters'),
});

const urlSchema = z
.string({
invalid_type_error: 'URL must be a string',
required_error: 'URL is required',
})
.min(1, { message: 'URL must be non-empty' })
.refine((url) => url.startsWith('s3://'), {
message: 'URL must start with s3://',
});

const accessKeySchema = z
.string({
invalid_type_error: 'Access Key ID must be a string',
required_error: 'Access Key ID is required',
})
.min(1, { message: 'Access Key ID must be non-empty' });

const secretKeySchema = z
.string({
invalid_type_error: 'Secret Access Key must be a string',
required_error: 'Secret Access Key is required',
})
.min(1, { message: 'Secret Access Key must be non-empty' });

const regionSchema = z.string({
invalid_type_error: 'Region must be a string',
});

export const chSchema = z.object({
host: z
.string({
Expand Down Expand Up @@ -294,10 +266,46 @@ export const chSchema = z.object({
})
.min(1, 'Password must be non-empty')
.max(100, 'Password must be less than 100 characters'),
s3Path: urlSchema,
accessKeyId: accessKeySchema,
secretAccessKey: secretKeySchema,
region: regionSchema.min(1, { message: 'Region must be non-empty' }),
s3Path: z
.string({ invalid_type_error: 'S3 Path must be a string' })
.optional(),
accessKeyId: z
.string({ invalid_type_error: 'Access Key ID must be a string' })
.optional(),
secretAccessKey: z
.string({ invalid_type_error: 'Secret Access Key must be a string' })
.optional(),
region: z
.string({ invalid_type_error: 'Region must be a string' })
.optional(),
});

const urlSchema = z
.string({
invalid_type_error: 'URL must be a string',
required_error: 'URL is required',
})
.min(1, { message: 'URL must be non-empty' })
.refine((url) => url.startsWith('s3://'), {
message: 'URL must start with s3://',
});

const accessKeySchema = z
.string({
invalid_type_error: 'Access Key ID must be a string',
required_error: 'Access Key ID is required',
})
.min(1, { message: 'Access Key ID must be non-empty' });

const secretKeySchema = z
.string({
invalid_type_error: 'Secret Access Key must be a string',
required_error: 'Secret Access Key is required',
})
.min(1, { message: 'Secret Access Key must be non-empty' });

const regionSchema = z.string({
invalid_type_error: 'Region must be a string',
});

export const s3Schema = z.object({
Expand Down
20 changes: 1 addition & 19 deletions ui/components/PeerForms/ClickhouseConfig.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,12 @@ export default function ClickhouseForm({ settings, setter }: ConfigProps) {
>
Transient S3 Stage
</Label>
<Label>
Please provide an S3 object URL and access credentials to store our
intermediate staging files.
</Label>
{settings
.filter((setting) => S3Labels.includes(setting.label))
.map((setting, id) => (
<RowWithTextField
key={id}
label={
<Label>
{setting.label}{' '}
{!setting.optional && (
<Tooltip
style={{ width: '100%' }}
content={'This is a required field.'}
>
<Label colorName='lowContrast' colorSet='destructive'>
*
</Label>
</Tooltip>
)}
</Label>
}
label={<Label>{setting.label} </Label>}
action={
<div
style={{
Expand Down

0 comments on commit d995aee

Please sign in to comment.