Skip to content

Commit

Permalink
lint and ui tweak for clickhouse
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 12, 2024
1 parent d995aee commit deca6fd
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
3 changes: 2 additions & 1 deletion flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func NewClickhouseConnector(
bucketPathSuffix := fmt.Sprintf("%s/%s", deploymentUID, flowName)
// Get S3 credentials from environment
clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix)
if clickhouseS3Creds.AccessKeyID == "" && clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" {
if clickhouseS3Creds.AccessKeyID == "" &&
clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" {
// Fallback: user provided S3 credentials
clickhouseS3Creds = &utils.ClickhouseS3Credentials{
AccessKeyID: config.AccessKeyId,
Expand Down
14 changes: 9 additions & 5 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, a
return err
}

avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket,
s.connector.creds.Region, avroFile.FilePath)

if err != nil {
return err
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)
s.config.DestinationTableIdentifier, avroFileUrl,
s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -119,15 +121,17 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
return 0, err
}

avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, s.connector.creds.Region, avroFile.FilePath)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket,
s.connector.creds.Region, avroFile.FilePath)

if err != nil {
return 0, err
}
selector := strings.Join(schema.GetColumnNames(), ",")
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')",
config.DestinationTableIdentifier, selector, avroFileUrl, s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)
config.DestinationTableIdentifier, selector, avroFileUrl,
s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -170,7 +174,7 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
return nil, fmt.Errorf("failed to parse staging path: %w", err)
}

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName
s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID)
s3AvroFileKey = strings.Trim(s3AvroFileKey, "/")

avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"fmt"
"net/http"
"net/url"
"os"
"strings"
"time"
Expand Down Expand Up @@ -85,7 +86,8 @@ func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials {
awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY")
awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME")

awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix)
escapedPathSuffix := url.PathEscape(bucketPathSuffix)
awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, escapedPathSuffix)
return &ClickhouseS3Credentials{
AccessKeyID: awsKey,
SecretAccessKey: awsSecret,
Expand Down
27 changes: 22 additions & 5 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,23 @@ export const fetchSchemas = async (peerName: string) => {
return schemasRes.schemas;
};

const getDefaultDestinationTable = (
peerType: DBType | undefined,
schemaName: string,
tableName: string
) => {
if (!peerType) {
return `${schemaName}.${tableName}`;
}
if (dBTypeToJSON(peerType) == 'BIGQUERY') {
return tableName;
}
if (dBTypeToJSON(peerType) == 'CLICKHOUSE') {
return `${schemaName}_${tableName}`;
}
return `${schemaName}.${tableName}`;
};

export const fetchTables = async (
peerName: string,
schemaName: string,
Expand All @@ -265,11 +282,11 @@ export const fetchTables = async (
for (const tableObject of tableRes) {
// setting defaults:
// for bigquery, tables are not schema-qualified
const dstName =
peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY'
? tableObject.tableName
: `${schemaName}.${tableObject.tableName}`;

const dstName = getDefaultDestinationTable(
peerType,
schemaName,
tableObject.tableName
);
tables.push({
schema: schemaName,
source: `${schemaName}.${tableObject.tableName}`,
Expand Down

0 comments on commit deca6fd

Please sign in to comment.