From d995aee7acc3023757ce132c2af8c9ff4ecbd375 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 13 Feb 2024 03:35:01 +0530 Subject: [PATCH 1/6] refactor s3 wiring for clickhouse --- flow/connectors/clickhouse/cdc.go | 2 +- flow/connectors/clickhouse/clickhouse.go | 65 +++++++++++++++--- flow/connectors/clickhouse/qrep_avro_sync.go | 32 ++++----- flow/connectors/utils/aws.go | 22 ++++++ flow/shared/constants.go | 4 ++ ui/app/peers/create/[peerType]/schema.ts | 72 +++++++++++--------- ui/components/PeerForms/ClickhouseConfig.tsx | 20 +----- 7 files changed, 137 insertions(+), 80 deletions(-) diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 3074df7bad..8e2a48a877 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -87,7 +87,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( } qrepConfig := &protos.QRepConfig{ - StagingPath: c.config.S3Path, + StagingPath: c.creds.BucketPath, FlowJobName: req.FlowJobName, DestinationTableIdentifier: strings.ToLower(rawTableIdentifier), } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 50e926e1d6..b3598d7caf 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -15,6 +15,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/shared" ) type ClickhouseConnector struct { @@ -23,17 +24,21 @@ type ClickhouseConnector struct { tableSchemaMapping map[string]*protos.TableSchema logger log.Logger config *protos.ClickhouseConfig - creds utils.S3PeerCredentials + creds *utils.ClickhouseS3Credentials } -func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error { +func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error { // for validation purposes - s3Client, err := utils.CreateS3Client(creds) + s3Client, err := utils.CreateS3Client(utils.S3PeerCredentials{ + AccessKeyID: creds.AccessKeyID, + SecretAccessKey: creds.SecretAccessKey, + Region: creds.Region, + }) if err != nil { return fmt.Errorf("failed to create S3 client: %w", err) } - validErr := conns3.ValidCheck(ctx, s3Client, bucketUrl, nil) + validErr := conns3.ValidCheck(ctx, s3Client, creds.BucketPath, nil) if validErr != nil { return validErr } @@ -41,6 +46,31 @@ func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredent return nil } +// Creates and drops a dummy table to validate the peer +func ValidateClickhouse(ctx context.Context, conn *sql.DB) error { + validateDummyTableName := fmt.Sprintf("peerdb_validation_%s", shared.RandomString(4)) + // create a table + _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory", + validateDummyTableName)) + if err != nil { + return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err) + } + + // insert a row + _, err = conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName)) + if err != nil { + return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err) + } + + // drop the table + _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", validateDummyTableName)) + if err != nil { + return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err) + } + + return nil +} + func NewClickhouseConnector( ctx context.Context, config *protos.ClickhouseConfig, @@ -51,19 +81,34 @@ func NewClickhouseConnector( return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } + err = ValidateClickhouse(ctx, database) + if err != nil { + return nil, fmt.Errorf("invalidated Clickhouse peer: %w", err) + } + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx) if err != nil { logger.Error("failed to create postgres metadata store", "error", err) return nil, err } - s3PeerCreds := utils.S3PeerCredentials{ - AccessKeyID: config.AccessKeyId, - SecretAccessKey: config.SecretAccessKey, - Region: config.Region, + var clickhouseS3Creds *utils.ClickhouseS3Credentials + deploymentUID := shared.GetDeploymentUID() + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + bucketPathSuffix := fmt.Sprintf("%s/%s", deploymentUID, flowName) + // Get S3 credentials from environment + clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix) + if clickhouseS3Creds.AccessKeyID == "" && clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" { + // Fallback: user provided S3 credentials + clickhouseS3Creds = &utils.ClickhouseS3Credentials{ + AccessKeyID: config.AccessKeyId, + SecretAccessKey: config.SecretAccessKey, + Region: config.Region, + BucketPath: config.S3Path, + } } - validateErr := ValidateS3(ctx, config.S3Path, s3PeerCreds) + validateErr := ValidateS3(ctx, clickhouseS3Creds) if validateErr != nil { return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr) } @@ -73,7 +118,7 @@ func NewClickhouseConnector( pgMetadata: pgMetadata, tableSchemaMapping: nil, config: config, - creds: s3PeerCreds, + creds: clickhouseS3Creds, logger: logger, }, nil } diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 94813e84f1..4e02198bff 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -34,23 +34,20 @@ func NewClickhouseAvroSyncMethod( } func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, avroFile *avro.AvroFile) error { - stagingPath := s.config.StagingPath - if stagingPath == "" { - stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse" - } + stagingPath := s.connector.creds.BucketPath s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { return err } - awsCreds, err := utils.GetAWSSecrets(s.connector.creds) - avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) + + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath) if err != nil { return err } //nolint:gosec query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", - s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) + s.config.DestinationTableIdentifier, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) @@ -102,9 +99,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( ) (int, error) { startTime := time.Now() dstTableName := config.DestinationTableIdentifier - - stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Path - + stagingPath := s.connector.creds.BucketPath schema, err := stream.Schema() if err != nil { return -1, fmt.Errorf("failed to get schema from stream: %w", err) @@ -123,8 +118,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( if err != nil { return 0, err } - awsCreds, err := utils.GetAWSSecrets(s.connector.creds) - avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) + + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath) if err != nil { return 0, err @@ -132,7 +127,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( selector := strings.Join(schema.GetColumnNames(), ",") //nolint:gosec query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')", - config.DestinationTableIdentifier, selector, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) + config.DestinationTableIdentifier, selector, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) @@ -168,10 +163,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( partitionID string, flowJobName string, ) (*avro.AvroFile, error) { - stagingPath := s.config.StagingPath // "s3://avro-clickhouse" - if stagingPath == "" { - stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse" - } + stagingPath := s.connector.creds.BucketPath ocfWriter := avro.NewPeerDBOCFWriter(stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse) s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { @@ -181,7 +173,11 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName s3AvroFileKey = strings.Trim(s3AvroFileKey, "/") - avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, s.connector.creds) + avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{ + AccessKeyID: s.connector.creds.AccessKeyID, + SecretAccessKey: s.connector.creds.SecretAccessKey, + Region: s.connector.creds.Region, + }) if err != nil { return nil, fmt.Errorf("failed to write records to S3: %w", err) } diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index 63adfa8330..b62522b28c 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -29,6 +29,13 @@ type S3PeerCredentials struct { Endpoint string `json:"endpoint"` } +type ClickhouseS3Credentials struct { + AccessKeyID string `json:"accessKeyId"` + SecretAccessKey string `json:"secretAccessKey"` + Region string `json:"region"` + BucketPath string `json:"bucketPath"` +} + func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) { awsRegion := creds.Region if awsRegion == "" { @@ -72,6 +79,21 @@ func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) { }, nil } +func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials { + awsRegion := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION") + awsKey := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID") + awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY") + awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME") + + awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix) + return &ClickhouseS3Credentials{ + AccessKeyID: awsKey, + SecretAccessKey: awsSecret, + Region: awsRegion, + BucketPath: awsBucketPath, + } +} + type S3BucketAndPrefix struct { Bucket string Prefix string diff --git a/flow/shared/constants.go b/flow/shared/constants.go index 69b4dfda20..df4ed83057 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -70,3 +70,7 @@ func prependUIDToTaskQueueName(taskQueueName string) string { } return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName) } + +func GetDeploymentUID() string { + return peerdbenv.PeerDBDeploymentUID() +} diff --git a/ui/app/peers/create/[peerType]/schema.ts b/ui/app/peers/create/[peerType]/schema.ts index 290eb81b35..211acd69a7 100644 --- a/ui/app/peers/create/[peerType]/schema.ts +++ b/ui/app/peers/create/[peerType]/schema.ts @@ -229,34 +229,6 @@ export const bqSchema = z.object({ .max(1024, 'DatasetID must be less than 1025 characters'), }); -const urlSchema = z - .string({ - invalid_type_error: 'URL must be a string', - required_error: 'URL is required', - }) - .min(1, { message: 'URL must be non-empty' }) - .refine((url) => url.startsWith('s3://'), { - message: 'URL must start with s3://', - }); - -const accessKeySchema = z - .string({ - invalid_type_error: 'Access Key ID must be a string', - required_error: 'Access Key ID is required', - }) - .min(1, { message: 'Access Key ID must be non-empty' }); - -const secretKeySchema = z - .string({ - invalid_type_error: 'Secret Access Key must be a string', - required_error: 'Secret Access Key is required', - }) - .min(1, { message: 'Secret Access Key must be non-empty' }); - -const regionSchema = z.string({ - invalid_type_error: 'Region must be a string', -}); - export const chSchema = z.object({ host: z .string({ @@ -294,10 +266,46 @@ export const chSchema = z.object({ }) .min(1, 'Password must be non-empty') .max(100, 'Password must be less than 100 characters'), - s3Path: urlSchema, - accessKeyId: accessKeySchema, - secretAccessKey: secretKeySchema, - region: regionSchema.min(1, { message: 'Region must be non-empty' }), + s3Path: z + .string({ invalid_type_error: 'S3 Path must be a string' }) + .optional(), + accessKeyId: z + .string({ invalid_type_error: 'Access Key ID must be a string' }) + .optional(), + secretAccessKey: z + .string({ invalid_type_error: 'Secret Access Key must be a string' }) + .optional(), + region: z + .string({ invalid_type_error: 'Region must be a string' }) + .optional(), +}); + +const urlSchema = z + .string({ + invalid_type_error: 'URL must be a string', + required_error: 'URL is required', + }) + .min(1, { message: 'URL must be non-empty' }) + .refine((url) => url.startsWith('s3://'), { + message: 'URL must start with s3://', + }); + +const accessKeySchema = z + .string({ + invalid_type_error: 'Access Key ID must be a string', + required_error: 'Access Key ID is required', + }) + .min(1, { message: 'Access Key ID must be non-empty' }); + +const secretKeySchema = z + .string({ + invalid_type_error: 'Secret Access Key must be a string', + required_error: 'Secret Access Key is required', + }) + .min(1, { message: 'Secret Access Key must be non-empty' }); + +const regionSchema = z.string({ + invalid_type_error: 'Region must be a string', }); export const s3Schema = z.object({ diff --git a/ui/components/PeerForms/ClickhouseConfig.tsx b/ui/components/PeerForms/ClickhouseConfig.tsx index 9614870a83..c1a7ab93e4 100644 --- a/ui/components/PeerForms/ClickhouseConfig.tsx +++ b/ui/components/PeerForms/ClickhouseConfig.tsx @@ -78,30 +78,12 @@ export default function ClickhouseForm({ settings, setter }: ConfigProps) { > Transient S3 Stage - {settings .filter((setting) => S3Labels.includes(setting.label)) .map((setting, id) => ( - {setting.label}{' '} - {!setting.optional && ( - - - - )} - - } + label={} action={
Date: Tue, 13 Feb 2024 05:18:25 +0530 Subject: [PATCH 2/6] lint and ui tweak for clickhouse --- flow/connectors/clickhouse/clickhouse.go | 3 ++- flow/connectors/clickhouse/qrep_avro_sync.go | 14 ++++++---- flow/connectors/utils/aws.go | 4 ++- ui/app/mirrors/create/handlers.ts | 27 ++++++++++++++++---- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index b3598d7caf..44e931379c 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -98,7 +98,8 @@ func NewClickhouseConnector( bucketPathSuffix := fmt.Sprintf("%s/%s", deploymentUID, flowName) // Get S3 credentials from environment clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix) - if clickhouseS3Creds.AccessKeyID == "" && clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" { + if clickhouseS3Creds.AccessKeyID == "" && + clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" { // Fallback: user provided S3 credentials clickhouseS3Creds = &utils.ClickhouseS3Credentials{ AccessKeyID: config.AccessKeyId, diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 4e02198bff..dd269de9c9 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -40,14 +40,16 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a return err } - avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath) + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, + s.connector.creds.Region, avroFile.FilePath) if err != nil { return err } //nolint:gosec query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", - s.config.DestinationTableIdentifier, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) + s.config.DestinationTableIdentifier, avroFileUrl, + s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) @@ -119,7 +121,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( return 0, err } - avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath) + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, + s.connector.creds.Region, avroFile.FilePath) if err != nil { return 0, err @@ -127,7 +130,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( selector := strings.Join(schema.GetColumnNames(), ",") //nolint:gosec query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')", - config.DestinationTableIdentifier, selector, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) + config.DestinationTableIdentifier, selector, avroFileUrl, + s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) @@ -170,7 +174,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( return nil, fmt.Errorf("failed to parse staging path: %w", err) } - s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName + s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) s3AvroFileKey = strings.Trim(s3AvroFileKey, "/") avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{ diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index b62522b28c..df1abba7f3 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -3,6 +3,7 @@ package utils import ( "fmt" "net/http" + "net/url" "os" "strings" "time" @@ -85,7 +86,8 @@ func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials { awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY") awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME") - awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix) + escapedPathSuffix := url.PathEscape(bucketPathSuffix) + awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, escapedPathSuffix) return &ClickhouseS3Credentials{ AccessKeyID: awsKey, SecretAccessKey: awsSecret, diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index b279e2178b..224655c647 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -244,6 +244,23 @@ export const fetchSchemas = async (peerName: string) => { return schemasRes.schemas; }; +const getDefaultDestinationTable = ( + peerType: DBType | undefined, + schemaName: string, + tableName: string +) => { + if (!peerType) { + return `${schemaName}.${tableName}`; + } + if (dBTypeToJSON(peerType) == 'BIGQUERY') { + return tableName; + } + if (dBTypeToJSON(peerType) == 'CLICKHOUSE') { + return `${schemaName}_${tableName}`; + } + return `${schemaName}.${tableName}`; +}; + export const fetchTables = async ( peerName: string, schemaName: string, @@ -265,11 +282,11 @@ export const fetchTables = async ( for (const tableObject of tableRes) { // setting defaults: // for bigquery, tables are not schema-qualified - const dstName = - peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY' - ? tableObject.tableName - : `${schemaName}.${tableObject.tableName}`; - + const dstName = getDefaultDestinationTable( + peerType, + schemaName, + tableObject.tableName + ); tables.push({ schema: schemaName, source: `${schemaName}.${tableObject.tableName}`, From 222f03f168bb509a944d7817696a8ab8a3be6faa Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 13 Feb 2024 06:04:22 +0530 Subject: [PATCH 3/6] cleanup --- flow/connectors/clickhouse/clickhouse.go | 26 +++++++++++--------- flow/connectors/utils/aws.go | 4 +-- ui/components/PeerForms/ClickhouseConfig.tsx | 2 +- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 44e931379c..83fdcc76cd 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "database/sql" "fmt" + "net/url" "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -95,18 +96,21 @@ func NewClickhouseConnector( var clickhouseS3Creds *utils.ClickhouseS3Credentials deploymentUID := shared.GetDeploymentUID() flowName, _ := ctx.Value(shared.FlowNameKey).(string) - bucketPathSuffix := fmt.Sprintf("%s/%s", deploymentUID, flowName) - // Get S3 credentials from environment - clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix) + bucketPathSuffix := fmt.Sprintf("%s/%s", url.PathEscape(deploymentUID), flowName) + + // Get user provided S3 credentials + clickhouseS3Creds = &utils.ClickhouseS3Credentials{ + AccessKeyID: config.AccessKeyId, + SecretAccessKey: config.SecretAccessKey, + Region: config.Region, + BucketPath: config.S3Path, + } + if clickhouseS3Creds.AccessKeyID == "" && - clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" { - // Fallback: user provided S3 credentials - clickhouseS3Creds = &utils.ClickhouseS3Credentials{ - AccessKeyID: config.AccessKeyId, - SecretAccessKey: config.SecretAccessKey, - Region: config.Region, - BucketPath: config.S3Path, - } + clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" && + clickhouseS3Creds.BucketPath == "" { + // Fallback: Get S3 credentials from environment + clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix) } validateErr := ValidateS3(ctx, clickhouseS3Creds) diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index df1abba7f3..b62522b28c 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -3,7 +3,6 @@ package utils import ( "fmt" "net/http" - "net/url" "os" "strings" "time" @@ -86,8 +85,7 @@ func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials { awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY") awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME") - escapedPathSuffix := url.PathEscape(bucketPathSuffix) - awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, escapedPathSuffix) + awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix) return &ClickhouseS3Credentials{ AccessKeyID: awsKey, SecretAccessKey: awsSecret, diff --git a/ui/components/PeerForms/ClickhouseConfig.tsx b/ui/components/PeerForms/ClickhouseConfig.tsx index c1a7ab93e4..a0e85c7076 100644 --- a/ui/components/PeerForms/ClickhouseConfig.tsx +++ b/ui/components/PeerForms/ClickhouseConfig.tsx @@ -83,7 +83,7 @@ export default function ClickhouseForm({ settings, setter }: ConfigProps) { .map((setting, id) => ( {setting.label} } + label={} action={
Date: Tue, 13 Feb 2024 06:12:44 +0530 Subject: [PATCH 4/6] minor change --- flow/connectors/clickhouse/clickhouse.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 83fdcc76cd..db9547a949 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -96,7 +96,8 @@ func NewClickhouseConnector( var clickhouseS3Creds *utils.ClickhouseS3Credentials deploymentUID := shared.GetDeploymentUID() flowName, _ := ctx.Value(shared.FlowNameKey).(string) - bucketPathSuffix := fmt.Sprintf("%s/%s", url.PathEscape(deploymentUID), flowName) + bucketPathSuffix := fmt.Sprintf("%s/%s", + url.PathEscape(deploymentUID), url.PathEscape(flowName)) // Get user provided S3 credentials clickhouseS3Creds = &utils.ClickhouseS3Credentials{ From 42747919ec86dc39bcf1b3ed16e536f216a50114 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 13 Feb 2024 06:29:27 +0530 Subject: [PATCH 5/6] move variables closer to where theyre used --- flow/connectors/clickhouse/clickhouse.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index db9547a949..84625ff706 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -2,7 +2,6 @@ package connclickhouse import ( "context" - "crypto/tls" "database/sql" "fmt" "net/url" @@ -94,11 +93,6 @@ func NewClickhouseConnector( } var clickhouseS3Creds *utils.ClickhouseS3Credentials - deploymentUID := shared.GetDeploymentUID() - flowName, _ := ctx.Value(shared.FlowNameKey).(string) - bucketPathSuffix := fmt.Sprintf("%s/%s", - url.PathEscape(deploymentUID), url.PathEscape(flowName)) - // Get user provided S3 credentials clickhouseS3Creds = &utils.ClickhouseS3Credentials{ AccessKeyID: config.AccessKeyId, @@ -110,6 +104,11 @@ func NewClickhouseConnector( if clickhouseS3Creds.AccessKeyID == "" && clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" && clickhouseS3Creds.BucketPath == "" { + deploymentUID := shared.GetDeploymentUID() + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + bucketPathSuffix := fmt.Sprintf("%s/%s", + url.PathEscape(deploymentUID), url.PathEscape(flowName)) + // Fallback: Get S3 credentials from environment clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix) } @@ -137,7 +136,7 @@ func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, err Username: config.User, Password: config.Password, }, - TLS: &tls.Config{MinVersion: tls.VersionTLS13}, + // TLS: &tls.Config{MinVersion: tls.VersionTLS13}, Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4}, ClientInfo: clickhouse.ClientInfo{ Products: []struct { From 5fdd05d365e15cad3461a7e56a7989ee49c047fd Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 13 Feb 2024 06:30:37 +0530 Subject: [PATCH 6/6] remove comment --- flow/connectors/clickhouse/clickhouse.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 84625ff706..cf0da791aa 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -2,6 +2,7 @@ package connclickhouse import ( "context" + "crypto/tls" "database/sql" "fmt" "net/url" @@ -136,7 +137,7 @@ func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, err Username: config.User, Password: config.Password, }, - // TLS: &tls.Config{MinVersion: tls.VersionTLS13}, + TLS: &tls.Config{MinVersion: tls.VersionTLS13}, Compression: &clickhouse.Compression{Method: clickhouse.CompressionLZ4}, ClientInfo: clickhouse.ClientInfo{ Products: []struct {