From deca6fdbce3cee8e2ea3875c645720e9218656ee Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 13 Feb 2024 05:18:25 +0530 Subject: [PATCH] lint and ui tweak for clickhouse --- flow/connectors/clickhouse/clickhouse.go | 3 ++- flow/connectors/clickhouse/qrep_avro_sync.go | 14 ++++++---- flow/connectors/utils/aws.go | 4 ++- ui/app/mirrors/create/handlers.ts | 27 ++++++++++++++++---- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index b3598d7caf..44e931379c 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -98,7 +98,8 @@ func NewClickhouseConnector( bucketPathSuffix := fmt.Sprintf("%s/%s", deploymentUID, flowName) // Get S3 credentials from environment clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix) - if clickhouseS3Creds.AccessKeyID == "" && clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" { + if clickhouseS3Creds.AccessKeyID == "" && + clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" { // Fallback: user provided S3 credentials clickhouseS3Creds = &utils.ClickhouseS3Credentials{ AccessKeyID: config.AccessKeyId, diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 4e02198bff..dd269de9c9 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -40,14 +40,16 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a return err } - avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath) + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, + s.connector.creds.Region, avroFile.FilePath) if err != nil { return err } //nolint:gosec query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", - s.config.DestinationTableIdentifier, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) + s.config.DestinationTableIdentifier, avroFileUrl, + s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) @@ -119,7 +121,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( return 0, err } - avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath) + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, + s.connector.creds.Region, avroFile.FilePath) if err != nil { return 0, err @@ -127,7 +130,8 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( selector := strings.Join(schema.GetColumnNames(), ",") //nolint:gosec query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')", - config.DestinationTableIdentifier, selector, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) + config.DestinationTableIdentifier, selector, avroFileUrl, + s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) @@ -170,7 +174,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( return nil, fmt.Errorf("failed to parse staging path: %w", err) } - s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName + s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) s3AvroFileKey = strings.Trim(s3AvroFileKey, "/") avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{ diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index b62522b28c..df1abba7f3 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -3,6 +3,7 @@ package utils import ( "fmt" "net/http" + "net/url" "os" "strings" "time" @@ -85,7 +86,8 @@ func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials { awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY") awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME") - awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix) + escapedPathSuffix := url.PathEscape(bucketPathSuffix) + awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, escapedPathSuffix) return &ClickhouseS3Credentials{ AccessKeyID: awsKey, SecretAccessKey: awsSecret, diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index b279e2178b..224655c647 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -244,6 +244,23 @@ export const fetchSchemas = async (peerName: string) => { return schemasRes.schemas; }; +const getDefaultDestinationTable = ( + peerType: DBType | undefined, + schemaName: string, + tableName: string +) => { + if (!peerType) { + return `${schemaName}.${tableName}`; + } + if (dBTypeToJSON(peerType) == 'BIGQUERY') { + return tableName; + } + if (dBTypeToJSON(peerType) == 'CLICKHOUSE') { + return `${schemaName}_${tableName}`; + } + return `${schemaName}.${tableName}`; +}; + export const fetchTables = async ( peerName: string, schemaName: string, @@ -265,11 +282,11 @@ export const fetchTables = async ( for (const tableObject of tableRes) { // setting defaults: // for bigquery, tables are not schema-qualified - const dstName = - peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY' - ? tableObject.tableName - : `${schemaName}.${tableObject.tableName}`; - + const dstName = getDefaultDestinationTable( + peerType, + schemaName, + tableObject.tableName + ); tables.push({ schema: schemaName, source: `${schemaName}.${tableObject.tableName}`,