Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor s3 wiring for clickhouse #1265

Merged
merged 6 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
}

qrepConfig := &protos.QRepConfig{
StagingPath: c.config.S3Path,
StagingPath: c.creds.BucketPath,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
Expand Down
66 changes: 56 additions & 10 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
)

type ClickhouseConnector struct {
Expand All @@ -23,24 +24,53 @@ type ClickhouseConnector struct {
tableSchemaMapping map[string]*protos.TableSchema
logger log.Logger
config *protos.ClickhouseConfig
creds utils.S3PeerCredentials
creds *utils.ClickhouseS3Credentials
}

func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error {
func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error {
// for validation purposes
s3Client, err := utils.CreateS3Client(creds)
s3Client, err := utils.CreateS3Client(utils.S3PeerCredentials{
AccessKeyID: creds.AccessKeyID,
SecretAccessKey: creds.SecretAccessKey,
Region: creds.Region,
})
if err != nil {
return fmt.Errorf("failed to create S3 client: %w", err)
}

validErr := conns3.ValidCheck(ctx, s3Client, bucketUrl, nil)
validErr := conns3.ValidCheck(ctx, s3Client, creds.BucketPath, nil)
if validErr != nil {
return validErr
}

return nil
}

// Creates and drops a dummy table to validate the peer
func ValidateClickhouse(ctx context.Context, conn *sql.DB) error {
validateDummyTableName := fmt.Sprintf("peerdb_validation_%s", shared.RandomString(4))
// create a table
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory",
validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err)
}

// insert a row
_, err = conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err)
}

// drop the table
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err)
}

return nil
}

func NewClickhouseConnector(
ctx context.Context,
config *protos.ClickhouseConfig,
Expand All @@ -51,19 +81,35 @@ func NewClickhouseConnector(
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

err = ValidateClickhouse(ctx, database)
if err != nil {
return nil, fmt.Errorf("invalidated Clickhouse peer: %w", err)
}

pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
logger.Error("failed to create postgres metadata store", "error", err)
return nil, err
}

s3PeerCreds := utils.S3PeerCredentials{
AccessKeyID: config.AccessKeyId,
SecretAccessKey: config.SecretAccessKey,
Region: config.Region,
var clickhouseS3Creds *utils.ClickhouseS3Credentials
deploymentUID := shared.GetDeploymentUID()
flowName, _ := ctx.Value(shared.FlowNameKey).(string)
bucketPathSuffix := fmt.Sprintf("%s/%s", deploymentUID, flowName)
// Get S3 credentials from environment
clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix)
if clickhouseS3Creds.AccessKeyID == "" &&
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" {
// Fallback: user provided S3 credentials
clickhouseS3Creds = &utils.ClickhouseS3Credentials{
AccessKeyID: config.AccessKeyId,
SecretAccessKey: config.SecretAccessKey,
Region: config.Region,
BucketPath: config.S3Path,
}
}

validateErr := ValidateS3(ctx, config.S3Path, s3PeerCreds)
validateErr := ValidateS3(ctx, clickhouseS3Creds)
if validateErr != nil {
return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr)
}
Expand All @@ -73,7 +119,7 @@ func NewClickhouseConnector(
pgMetadata: pgMetadata,
tableSchemaMapping: nil,
config: config,
creds: s3PeerCreds,
creds: clickhouseS3Creds,
logger: logger,
}, nil
}
Expand Down
38 changes: 19 additions & 19 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,22 @@ func NewClickhouseAvroSyncMethod(
}

func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, avroFile *avro.AvroFile) error {
stagingPath := s.config.StagingPath
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
stagingPath := s.connector.creds.BucketPath
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return err
}
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket,
s.connector.creds.Region, avroFile.FilePath)

if err != nil {
return err
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)
s.config.DestinationTableIdentifier, avroFileUrl,
s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -102,9 +101,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
) (int, error) {
startTime := time.Now()
dstTableName := config.DestinationTableIdentifier

stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Path

stagingPath := s.connector.creds.BucketPath
schema, err := stream.Schema()
if err != nil {
return -1, fmt.Errorf("failed to get schema from stream: %w", err)
Expand All @@ -123,16 +120,18 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, err
}
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket,
s.connector.creds.Region, avroFile.FilePath)

if err != nil {
return 0, err
}
selector := strings.Join(schema.GetColumnNames(), ",")
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')",
config.DestinationTableIdentifier, selector, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)
config.DestinationTableIdentifier, selector, avroFileUrl,
s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -168,20 +167,21 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
partitionID string,
flowJobName string,
) (*avro.AvroFile, error) {
stagingPath := s.config.StagingPath // "s3://avro-clickhouse"
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
stagingPath := s.connector.creds.BucketPath
ocfWriter := avro.NewPeerDBOCFWriter(stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse)
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return nil, fmt.Errorf("failed to parse staging path: %w", err)
}

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName
s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID)
s3AvroFileKey = strings.Trim(s3AvroFileKey, "/")

avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, s.connector.creds)
avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{
AccessKeyID: s.connector.creds.AccessKeyID,
SecretAccessKey: s.connector.creds.SecretAccessKey,
Region: s.connector.creds.Region,
})
if err != nil {
return nil, fmt.Errorf("failed to write records to S3: %w", err)
}
Expand Down
24 changes: 24 additions & 0 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"fmt"
"net/http"
"net/url"
"os"
"strings"
"time"
Expand All @@ -29,6 +30,13 @@ type S3PeerCredentials struct {
Endpoint string `json:"endpoint"`
}

type ClickhouseS3Credentials struct {
AccessKeyID string `json:"accessKeyId"`
SecretAccessKey string `json:"secretAccessKey"`
Region string `json:"region"`
BucketPath string `json:"bucketPath"`
}

func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) {
awsRegion := creds.Region
if awsRegion == "" {
Expand Down Expand Up @@ -72,6 +80,22 @@ func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) {
}, nil
}

func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials {
awsRegion := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION")
awsKey := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID")
awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY")
awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME")

escapedPathSuffix := url.PathEscape(bucketPathSuffix)
awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, escapedPathSuffix)
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
return &ClickhouseS3Credentials{
AccessKeyID: awsKey,
SecretAccessKey: awsSecret,
Region: awsRegion,
BucketPath: awsBucketPath,
}
}

type S3BucketAndPrefix struct {
Bucket string
Prefix string
Expand Down
4 changes: 4 additions & 0 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,7 @@ func prependUIDToTaskQueueName(taskQueueName string) string {
}
return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName)
}

func GetDeploymentUID() string {
return peerdbenv.PeerDBDeploymentUID()
}
27 changes: 22 additions & 5 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,23 @@ export const fetchSchemas = async (peerName: string) => {
return schemasRes.schemas;
};

const getDefaultDestinationTable = (
peerType: DBType | undefined,
schemaName: string,
tableName: string
) => {
if (!peerType) {
return `${schemaName}.${tableName}`;
}
if (dBTypeToJSON(peerType) == 'BIGQUERY') {
return tableName;
}
if (dBTypeToJSON(peerType) == 'CLICKHOUSE') {
return `${schemaName}_${tableName}`;
}
return `${schemaName}.${tableName}`;
};

export const fetchTables = async (
peerName: string,
schemaName: string,
Expand All @@ -265,11 +282,11 @@ export const fetchTables = async (
for (const tableObject of tableRes) {
// setting defaults:
// for bigquery, tables are not schema-qualified
const dstName =
peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY'
? tableObject.tableName
: `${schemaName}.${tableObject.tableName}`;

const dstName = getDefaultDestinationTable(
peerType,
schemaName,
tableObject.tableName
);
tables.push({
schema: schemaName,
source: `${schemaName}.${tableObject.tableName}`,
Expand Down
72 changes: 40 additions & 32 deletions ui/app/peers/create/[peerType]/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,34 +229,6 @@ export const bqSchema = z.object({
.max(1024, 'DatasetID must be less than 1025 characters'),
});

const urlSchema = z
.string({
invalid_type_error: 'URL must be a string',
required_error: 'URL is required',
})
.min(1, { message: 'URL must be non-empty' })
.refine((url) => url.startsWith('s3://'), {
message: 'URL must start with s3://',
});

const accessKeySchema = z
.string({
invalid_type_error: 'Access Key ID must be a string',
required_error: 'Access Key ID is required',
})
.min(1, { message: 'Access Key ID must be non-empty' });

const secretKeySchema = z
.string({
invalid_type_error: 'Secret Access Key must be a string',
required_error: 'Secret Access Key is required',
})
.min(1, { message: 'Secret Access Key must be non-empty' });

const regionSchema = z.string({
invalid_type_error: 'Region must be a string',
});

export const chSchema = z.object({
host: z
.string({
Expand Down Expand Up @@ -294,10 +266,46 @@ export const chSchema = z.object({
})
.min(1, 'Password must be non-empty')
.max(100, 'Password must be less than 100 characters'),
s3Path: urlSchema,
accessKeyId: accessKeySchema,
secretAccessKey: secretKeySchema,
region: regionSchema.min(1, { message: 'Region must be non-empty' }),
s3Path: z
.string({ invalid_type_error: 'S3 Path must be a string' })
.optional(),
accessKeyId: z
.string({ invalid_type_error: 'Access Key ID must be a string' })
.optional(),
secretAccessKey: z
.string({ invalid_type_error: 'Secret Access Key must be a string' })
.optional(),
region: z
.string({ invalid_type_error: 'Region must be a string' })
.optional(),
});

const urlSchema = z
.string({
invalid_type_error: 'URL must be a string',
required_error: 'URL is required',
})
.min(1, { message: 'URL must be non-empty' })
.refine((url) => url.startsWith('s3://'), {
message: 'URL must start with s3://',
});

const accessKeySchema = z
.string({
invalid_type_error: 'Access Key ID must be a string',
required_error: 'Access Key ID is required',
})
.min(1, { message: 'Access Key ID must be non-empty' });

const secretKeySchema = z
.string({
invalid_type_error: 'Secret Access Key must be a string',
required_error: 'Secret Access Key is required',
})
.min(1, { message: 'Secret Access Key must be non-empty' });

const regionSchema = z.string({
invalid_type_error: 'Region must be a string',
});

export const s3Schema = z.object({
Expand Down
Loading
Loading