diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go
index 3074df7bad..8e2a48a877 100644
--- a/flow/connectors/clickhouse/cdc.go
+++ b/flow/connectors/clickhouse/cdc.go
@@ -87,7 +87,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
}
qrepConfig := &protos.QRepConfig{
- StagingPath: c.config.S3Path,
+ StagingPath: c.creds.BucketPath,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go
index 50e926e1d6..cf0da791aa 100644
--- a/flow/connectors/clickhouse/clickhouse.go
+++ b/flow/connectors/clickhouse/clickhouse.go
@@ -5,6 +5,7 @@ import (
"crypto/tls"
"database/sql"
"fmt"
+ "net/url"
"github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
@@ -15,6 +16,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
+ "github.com/PeerDB-io/peer-flow/shared"
)
type ClickhouseConnector struct {
@@ -23,17 +25,21 @@ type ClickhouseConnector struct {
tableSchemaMapping map[string]*protos.TableSchema
logger log.Logger
config *protos.ClickhouseConfig
- creds utils.S3PeerCredentials
+ creds *utils.ClickhouseS3Credentials
}
-func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error {
+func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error {
// for validation purposes
- s3Client, err := utils.CreateS3Client(creds)
+ s3Client, err := utils.CreateS3Client(utils.S3PeerCredentials{
+ AccessKeyID: creds.AccessKeyID,
+ SecretAccessKey: creds.SecretAccessKey,
+ Region: creds.Region,
+ })
if err != nil {
return fmt.Errorf("failed to create S3 client: %w", err)
}
- validErr := conns3.ValidCheck(ctx, s3Client, bucketUrl, nil)
+ validErr := conns3.ValidCheck(ctx, s3Client, creds.BucketPath, nil)
if validErr != nil {
return validErr
}
@@ -41,6 +47,31 @@ func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredent
return nil
}
+// Creates and drops a dummy table to validate the peer
+func ValidateClickhouse(ctx context.Context, conn *sql.DB) error {
+ validateDummyTableName := fmt.Sprintf("peerdb_validation_%s", shared.RandomString(4))
+ // create a table
+ _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory",
+ validateDummyTableName))
+ if err != nil {
+ return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err)
+ }
+
+ // insert a row
+ _, err = conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName))
+ if err != nil {
+ return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err)
+ }
+
+ // drop the table
+ _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", validateDummyTableName))
+ if err != nil {
+ return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err)
+ }
+
+ return nil
+}
+
func NewClickhouseConnector(
ctx context.Context,
config *protos.ClickhouseConfig,
@@ -51,19 +82,39 @@ func NewClickhouseConnector(
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}
+ err = ValidateClickhouse(ctx, database)
+ if err != nil {
+ return nil, fmt.Errorf("invalidated Clickhouse peer: %w", err)
+ }
+
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
logger.Error("failed to create postgres metadata store", "error", err)
return nil, err
}
- s3PeerCreds := utils.S3PeerCredentials{
+ var clickhouseS3Creds *utils.ClickhouseS3Credentials
+ // Get user provided S3 credentials
+ clickhouseS3Creds = &utils.ClickhouseS3Credentials{
AccessKeyID: config.AccessKeyId,
SecretAccessKey: config.SecretAccessKey,
Region: config.Region,
+ BucketPath: config.S3Path,
+ }
+
+ if clickhouseS3Creds.AccessKeyID == "" &&
+ clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" &&
+ clickhouseS3Creds.BucketPath == "" {
+ deploymentUID := shared.GetDeploymentUID()
+ flowName, _ := ctx.Value(shared.FlowNameKey).(string)
+ bucketPathSuffix := fmt.Sprintf("%s/%s",
+ url.PathEscape(deploymentUID), url.PathEscape(flowName))
+
+ // Fallback: Get S3 credentials from environment
+ clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix)
}
- validateErr := ValidateS3(ctx, config.S3Path, s3PeerCreds)
+ validateErr := ValidateS3(ctx, clickhouseS3Creds)
if validateErr != nil {
return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr)
}
@@ -73,7 +124,7 @@ func NewClickhouseConnector(
pgMetadata: pgMetadata,
tableSchemaMapping: nil,
config: config,
- creds: s3PeerCreds,
+ creds: clickhouseS3Creds,
logger: logger,
}, nil
}
diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go
index 94813e84f1..dd269de9c9 100644
--- a/flow/connectors/clickhouse/qrep_avro_sync.go
+++ b/flow/connectors/clickhouse/qrep_avro_sync.go
@@ -34,23 +34,22 @@ func NewClickhouseAvroSyncMethod(
}
func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, avroFile *avro.AvroFile) error {
- stagingPath := s.config.StagingPath
- if stagingPath == "" {
- stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
- }
+ stagingPath := s.connector.creds.BucketPath
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return err
}
- awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
- avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)
+
+ avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket,
+ s.connector.creds.Region, avroFile.FilePath)
if err != nil {
return err
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')",
- s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)
+ s.config.DestinationTableIdentifier, avroFileUrl,
+ s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)
_, err = s.connector.database.ExecContext(ctx, query)
@@ -102,9 +101,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
) (int, error) {
startTime := time.Now()
dstTableName := config.DestinationTableIdentifier
-
- stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Path
-
+ stagingPath := s.connector.creds.BucketPath
schema, err := stream.Schema()
if err != nil {
return -1, fmt.Errorf("failed to get schema from stream: %w", err)
@@ -123,8 +120,9 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, err
}
- awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
- avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)
+
+ avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket,
+ s.connector.creds.Region, avroFile.FilePath)
if err != nil {
return 0, err
@@ -132,7 +130,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
selector := strings.Join(schema.GetColumnNames(), ",")
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')",
- config.DestinationTableIdentifier, selector, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)
+ config.DestinationTableIdentifier, selector, avroFileUrl,
+ s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)
_, err = s.connector.database.ExecContext(ctx, query)
@@ -168,20 +167,21 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
partitionID string,
flowJobName string,
) (*avro.AvroFile, error) {
- stagingPath := s.config.StagingPath // "s3://avro-clickhouse"
- if stagingPath == "" {
- stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
- }
+ stagingPath := s.connector.creds.BucketPath
ocfWriter := avro.NewPeerDBOCFWriter(stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse)
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return nil, fmt.Errorf("failed to parse staging path: %w", err)
}
- s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName
+ s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID)
s3AvroFileKey = strings.Trim(s3AvroFileKey, "/")
- avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, s.connector.creds)
+ avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{
+ AccessKeyID: s.connector.creds.AccessKeyID,
+ SecretAccessKey: s.connector.creds.SecretAccessKey,
+ Region: s.connector.creds.Region,
+ })
if err != nil {
return nil, fmt.Errorf("failed to write records to S3: %w", err)
}
diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go
index 63adfa8330..b62522b28c 100644
--- a/flow/connectors/utils/aws.go
+++ b/flow/connectors/utils/aws.go
@@ -29,6 +29,13 @@ type S3PeerCredentials struct {
Endpoint string `json:"endpoint"`
}
+type ClickhouseS3Credentials struct {
+ AccessKeyID string `json:"accessKeyId"`
+ SecretAccessKey string `json:"secretAccessKey"`
+ Region string `json:"region"`
+ BucketPath string `json:"bucketPath"`
+}
+
func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) {
awsRegion := creds.Region
if awsRegion == "" {
@@ -72,6 +79,21 @@ func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) {
}, nil
}
+func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials {
+ awsRegion := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION")
+ awsKey := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID")
+ awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY")
+ awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME")
+
+ awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix)
+ return &ClickhouseS3Credentials{
+ AccessKeyID: awsKey,
+ SecretAccessKey: awsSecret,
+ Region: awsRegion,
+ BucketPath: awsBucketPath,
+ }
+}
+
type S3BucketAndPrefix struct {
Bucket string
Prefix string
diff --git a/flow/shared/constants.go b/flow/shared/constants.go
index 69b4dfda20..df4ed83057 100644
--- a/flow/shared/constants.go
+++ b/flow/shared/constants.go
@@ -70,3 +70,7 @@ func prependUIDToTaskQueueName(taskQueueName string) string {
}
return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName)
}
+
+func GetDeploymentUID() string {
+ return peerdbenv.PeerDBDeploymentUID()
+}
diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts
index b279e2178b..224655c647 100644
--- a/ui/app/mirrors/create/handlers.ts
+++ b/ui/app/mirrors/create/handlers.ts
@@ -244,6 +244,23 @@ export const fetchSchemas = async (peerName: string) => {
return schemasRes.schemas;
};
+const getDefaultDestinationTable = (
+ peerType: DBType | undefined,
+ schemaName: string,
+ tableName: string
+) => {
+ if (!peerType) {
+ return `${schemaName}.${tableName}`;
+ }
+ if (dBTypeToJSON(peerType) == 'BIGQUERY') {
+ return tableName;
+ }
+ if (dBTypeToJSON(peerType) == 'CLICKHOUSE') {
+ return `${schemaName}_${tableName}`;
+ }
+ return `${schemaName}.${tableName}`;
+};
+
export const fetchTables = async (
peerName: string,
schemaName: string,
@@ -265,11 +282,11 @@ export const fetchTables = async (
for (const tableObject of tableRes) {
// setting defaults:
// for bigquery, tables are not schema-qualified
- const dstName =
- peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY'
- ? tableObject.tableName
- : `${schemaName}.${tableObject.tableName}`;
-
+ const dstName = getDefaultDestinationTable(
+ peerType,
+ schemaName,
+ tableObject.tableName
+ );
tables.push({
schema: schemaName,
source: `${schemaName}.${tableObject.tableName}`,
diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts
index 290eb81b35..211acd69a7 100644
--- a/ui/app/peers/create/[peerType]/schema.ts
+++ b/ui/app/peers/create/[peerType]/schema.ts
@@ -229,34 +229,6 @@ export const bqSchema = z.object({
.max(1024, 'DatasetID must be less than 1025 characters'),
});
-const urlSchema = z
- .string({
- invalid_type_error: 'URL must be a string',
- required_error: 'URL is required',
- })
- .min(1, { message: 'URL must be non-empty' })
- .refine((url) => url.startsWith('s3://'), {
- message: 'URL must start with s3://',
- });
-
-const accessKeySchema = z
- .string({
- invalid_type_error: 'Access Key ID must be a string',
- required_error: 'Access Key ID is required',
- })
- .min(1, { message: 'Access Key ID must be non-empty' });
-
-const secretKeySchema = z
- .string({
- invalid_type_error: 'Secret Access Key must be a string',
- required_error: 'Secret Access Key is required',
- })
- .min(1, { message: 'Secret Access Key must be non-empty' });
-
-const regionSchema = z.string({
- invalid_type_error: 'Region must be a string',
-});
-
export const chSchema = z.object({
host: z
.string({
@@ -294,10 +266,46 @@ export const chSchema = z.object({
})
.min(1, 'Password must be non-empty')
.max(100, 'Password must be less than 100 characters'),
- s3Path: urlSchema,
- accessKeyId: accessKeySchema,
- secretAccessKey: secretKeySchema,
- region: regionSchema.min(1, { message: 'Region must be non-empty' }),
+ s3Path: z
+ .string({ invalid_type_error: 'S3 Path must be a string' })
+ .optional(),
+ accessKeyId: z
+ .string({ invalid_type_error: 'Access Key ID must be a string' })
+ .optional(),
+ secretAccessKey: z
+ .string({ invalid_type_error: 'Secret Access Key must be a string' })
+ .optional(),
+ region: z
+ .string({ invalid_type_error: 'Region must be a string' })
+ .optional(),
+});
+
+const urlSchema = z
+ .string({
+ invalid_type_error: 'URL must be a string',
+ required_error: 'URL is required',
+ })
+ .min(1, { message: 'URL must be non-empty' })
+ .refine((url) => url.startsWith('s3://'), {
+ message: 'URL must start with s3://',
+ });
+
+const accessKeySchema = z
+ .string({
+ invalid_type_error: 'Access Key ID must be a string',
+ required_error: 'Access Key ID is required',
+ })
+ .min(1, { message: 'Access Key ID must be non-empty' });
+
+const secretKeySchema = z
+ .string({
+ invalid_type_error: 'Secret Access Key must be a string',
+ required_error: 'Secret Access Key is required',
+ })
+ .min(1, { message: 'Secret Access Key must be non-empty' });
+
+const regionSchema = z.string({
+ invalid_type_error: 'Region must be a string',
});
export const s3Schema = z.object({
diff --git a/ui/components/PeerForms/ClickhouseConfig.tsx b/ui/components/PeerForms/ClickhouseConfig.tsx
index 9614870a83..a0e85c7076 100644
--- a/ui/components/PeerForms/ClickhouseConfig.tsx
+++ b/ui/components/PeerForms/ClickhouseConfig.tsx
@@ -78,30 +78,12 @@ export default function ClickhouseForm({ settings, setter }: ConfigProps) {
>
Transient S3 Stage
-
{settings
.filter((setting) => S3Labels.includes(setting.label))
.map((setting, id) => (
- {setting.label}{' '}
- {!setting.optional && (
-
-
-
- )}
-
- }
+ label={}
action={