diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 3074df7bad..8e2a48a877 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -87,7 +87,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( } qrepConfig := &protos.QRepConfig{ - StagingPath: c.config.S3Path, + StagingPath: c.creds.BucketPath, FlowJobName: req.FlowJobName, DestinationTableIdentifier: strings.ToLower(rawTableIdentifier), } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 50e926e1d6..cf0da791aa 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "database/sql" "fmt" + "net/url" "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -15,6 +16,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/shared" ) type ClickhouseConnector struct { @@ -23,17 +25,21 @@ type ClickhouseConnector struct { tableSchemaMapping map[string]*protos.TableSchema logger log.Logger config *protos.ClickhouseConfig - creds utils.S3PeerCredentials + creds *utils.ClickhouseS3Credentials } -func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error { +func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error { // for validation purposes - s3Client, err := utils.CreateS3Client(creds) + s3Client, err := utils.CreateS3Client(utils.S3PeerCredentials{ + AccessKeyID: creds.AccessKeyID, + SecretAccessKey: creds.SecretAccessKey, + Region: creds.Region, + }) if err != nil { return fmt.Errorf("failed to create S3 client: %w", err) } - validErr := conns3.ValidCheck(ctx, s3Client, bucketUrl, nil) + validErr := conns3.ValidCheck(ctx, s3Client, creds.BucketPath, nil) if validErr != nil { return validErr } @@ -41,6 +47,31 @@ func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredent return nil } +// Creates and drops a dummy table to validate the peer +func ValidateClickhouse(ctx context.Context, conn *sql.DB) error { + validateDummyTableName := fmt.Sprintf("peerdb_validation_%s", shared.RandomString(4)) + // create a table + _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory", + validateDummyTableName)) + if err != nil { + return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err) + } + + // insert a row + _, err = conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName)) + if err != nil { + return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err) + } + + // drop the table + _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", validateDummyTableName)) + if err != nil { + return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err) + } + + return nil +} + func NewClickhouseConnector( ctx context.Context, config *protos.ClickhouseConfig, @@ -51,19 +82,39 @@ func NewClickhouseConnector( return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } + err = ValidateClickhouse(ctx, database) + if err != nil { + return nil, fmt.Errorf("invalidated Clickhouse peer: %w", err) + } + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { logger.Error("failed to create postgres metadata store", "error", err) return nil, err } - s3PeerCreds := utils.S3PeerCredentials{ + var clickhouseS3Creds *utils.ClickhouseS3Credentials + // Get user provided S3 credentials + clickhouseS3Creds = &utils.ClickhouseS3Credentials{ AccessKeyID: config.AccessKeyId, SecretAccessKey: config.SecretAccessKey, Region: config.Region, + BucketPath: config.S3Path, + } + + if clickhouseS3Creds.AccessKeyID == "" && + clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" && + clickhouseS3Creds.BucketPath == "" { + deploymentUID := shared.GetDeploymentUID() + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + bucketPathSuffix := fmt.Sprintf("%s/%s", + url.PathEscape(deploymentUID), url.PathEscape(flowName)) + + // Fallback: Get S3 credentials from environment + clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix) } - validateErr := ValidateS3(ctx, config.S3Path, s3PeerCreds) + validateErr := ValidateS3(ctx, clickhouseS3Creds) if validateErr != nil { return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr) } @@ -73,7 +124,7 @@ func NewClickhouseConnector( pgMetadata: pgMetadata, tableSchemaMapping: nil, config: config, - creds: s3PeerCreds, + creds: clickhouseS3Creds, logger: logger, }, nil } diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 94813e84f1..dd269de9c9 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -34,23 +34,22 @@ func NewClickhouseAvroSyncMethod( } func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, avroFile *avro.AvroFile) error { - stagingPath := s.config.StagingPath - if stagingPath == "" { - stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse" - } + stagingPath := s.connector.creds.BucketPath s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { return err } - awsCreds, err := utils.GetAWSSecrets(s.connector.creds) - avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) + + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, + s.connector.creds.Region, avroFile.FilePath) if err != nil { return err } //nolint:gosec query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", - s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) + s.config.DestinationTableIdentifier, avroFileUrl, + s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) @@ -102,9 +101,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( ) (int, error) { startTime := time.Now() dstTableName := config.DestinationTableIdentifier - - stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Path - + stagingPath := s.connector.creds.BucketPath schema, err := stream.Schema() if err != nil { return -1, fmt.Errorf("failed to get schema from stream: %w", err) @@ -123,8 +120,9 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( if err != nil { return 0, err } - awsCreds, err := utils.GetAWSSecrets(s.connector.creds) - avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) + + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, + s.connector.creds.Region, avroFile.FilePath) if err != nil { return 0, err @@ -132,7 +130,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( selector := strings.Join(schema.GetColumnNames(), ",") //nolint:gosec query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')", - config.DestinationTableIdentifier, selector, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) + config.DestinationTableIdentifier, selector, avroFileUrl, + s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) @@ -168,20 +167,21 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( partitionID string, flowJobName string, ) (*avro.AvroFile, error) { - stagingPath := s.config.StagingPath // "s3://avro-clickhouse" - if stagingPath == "" { - stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse" - } + stagingPath := s.connector.creds.BucketPath ocfWriter := avro.NewPeerDBOCFWriter(stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse) s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { return nil, fmt.Errorf("failed to parse staging path: %w", err) } - s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName + s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) s3AvroFileKey = strings.Trim(s3AvroFileKey, "/") - avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, s.connector.creds) + avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{ + AccessKeyID: s.connector.creds.AccessKeyID, + SecretAccessKey: s.connector.creds.SecretAccessKey, + Region: s.connector.creds.Region, + }) if err != nil { return nil, fmt.Errorf("failed to write records to S3: %w", err) } diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index 63adfa8330..b62522b28c 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -29,6 +29,13 @@ type S3PeerCredentials struct { Endpoint string `json:"endpoint"` } +type ClickhouseS3Credentials struct { + AccessKeyID string `json:"accessKeyId"` + SecretAccessKey string `json:"secretAccessKey"` + Region string `json:"region"` + BucketPath string `json:"bucketPath"` +} + func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) { awsRegion := creds.Region if awsRegion == "" { @@ -72,6 +79,21 @@ func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) { }, nil } +func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials { + awsRegion := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION") + awsKey := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID") + awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY") + awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME") + + awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix) + return &ClickhouseS3Credentials{ + AccessKeyID: awsKey, + SecretAccessKey: awsSecret, + Region: awsRegion, + BucketPath: awsBucketPath, + } +} + type S3BucketAndPrefix struct { Bucket string Prefix string diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 69b4dfda20..df4ed83057 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -70,3 +70,7 @@ func prependUIDToTaskQueueName(taskQueueName string) string { } return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName) } + +func GetDeploymentUID() string { + return peerdbenv.PeerDBDeploymentUID() +} diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index b279e2178b..224655c647 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -244,6 +244,23 @@ export const fetchSchemas = async (peerName: string) => { return schemasRes.schemas; }; +const getDefaultDestinationTable = ( + peerType: DBType | undefined, + schemaName: string, + tableName: string +) => { + if (!peerType) { + return `${schemaName}.${tableName}`; + } + if (dBTypeToJSON(peerType) == 'BIGQUERY') { + return tableName; + } + if (dBTypeToJSON(peerType) == 'CLICKHOUSE') { + return `${schemaName}_${tableName}`; + } + return `${schemaName}.${tableName}`; +}; + export const fetchTables = async ( peerName: string, schemaName: string, @@ -265,11 +282,11 @@ export const fetchTables = async ( for (const tableObject of tableRes) { // setting defaults: // for bigquery, tables are not schema-qualified - const dstName = - peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY' - ? tableObject.tableName - : `${schemaName}.${tableObject.tableName}`; - + const dstName = getDefaultDestinationTable( + peerType, + schemaName, + tableObject.tableName + ); tables.push({ schema: schemaName, source: `${schemaName}.${tableObject.tableName}`, diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index 290eb81b35..211acd69a7 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -229,34 +229,6 @@ export const bqSchema = z.object({ .max(1024, 'DatasetID must be less than 1025 characters'), }); -const urlSchema = z - .string({ - invalid_type_error: 'URL must be a string', - required_error: 'URL is required', - }) - .min(1, { message: 'URL must be non-empty' }) - .refine((url) => url.startsWith('s3://'), { - message: 'URL must start with s3://', - }); - -const accessKeySchema = z - .string({ - invalid_type_error: 'Access Key ID must be a string', - required_error: 'Access Key ID is required', - }) - .min(1, { message: 'Access Key ID must be non-empty' }); - -const secretKeySchema = z - .string({ - invalid_type_error: 'Secret Access Key must be a string', - required_error: 'Secret Access Key is required', - }) - .min(1, { message: 'Secret Access Key must be non-empty' }); - -const regionSchema = z.string({ - invalid_type_error: 'Region must be a string', -}); - export const chSchema = z.object({ host: z .string({ @@ -294,10 +266,46 @@ export const chSchema = z.object({ }) .min(1, 'Password must be non-empty') .max(100, 'Password must be less than 100 characters'), - s3Path: urlSchema, - accessKeyId: accessKeySchema, - secretAccessKey: secretKeySchema, - region: regionSchema.min(1, { message: 'Region must be non-empty' }), + s3Path: z + .string({ invalid_type_error: 'S3 Path must be a string' }) + .optional(), + accessKeyId: z + .string({ invalid_type_error: 'Access Key ID must be a string' }) + .optional(), + secretAccessKey: z + .string({ invalid_type_error: 'Secret Access Key must be a string' }) + .optional(), + region: z + .string({ invalid_type_error: 'Region must be a string' }) + .optional(), +}); + +const urlSchema = z + .string({ + invalid_type_error: 'URL must be a string', + required_error: 'URL is required', + }) + .min(1, { message: 'URL must be non-empty' }) + .refine((url) => url.startsWith('s3://'), { + message: 'URL must start with s3://', + }); + +const accessKeySchema = z + .string({ + invalid_type_error: 'Access Key ID must be a string', + required_error: 'Access Key ID is required', + }) + .min(1, { message: 'Access Key ID must be non-empty' }); + +const secretKeySchema = z + .string({ + invalid_type_error: 'Secret Access Key must be a string', + required_error: 'Secret Access Key is required', + }) + .min(1, { message: 'Secret Access Key must be non-empty' }); + +const regionSchema = z.string({ + invalid_type_error: 'Region must be a string', }); export const s3Schema = z.object({ diff --git a/ui/components/PeerForms/ClickhouseConfig.tsx b/ui/components/PeerForms/ClickhouseConfig.tsx index 9614870a83..a0e85c7076 100644 --- a/ui/components/PeerForms/ClickhouseConfig.tsx +++ b/ui/components/PeerForms/ClickhouseConfig.tsx @@ -78,30 +78,12 @@ export default function ClickhouseForm({ settings, setter }: ConfigProps) { > Transient S3 Stage - {settings .filter((setting) => S3Labels.includes(setting.label)) .map((setting, id) => ( - {setting.label}{' '} - {!setting.optional && ( - - - - )} - - } + label={} action={