Skip to content

Commit

Permalink
Refactor s3 wiring for clickhouse (#1265)
Browse files Browse the repository at this point in the history
Now requires one of the two:
1. Env variables for S3 credentials
2. S3 credentials from clickhouse peer which are now optional in UI

For the former, the bucket path is constructed by
bucket-name/deployment-uid/flowname
  • Loading branch information
Amogh-Bharadwaj authored Feb 13, 2024
1 parent 02ea4ce commit 516e384
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 83 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
}

qrepConfig := &protos.QRepConfig{
StagingPath: c.config.S3Path,
StagingPath: c.creds.BucketPath,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
Expand Down
65 changes: 58 additions & 7 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"database/sql"
"fmt"
"net/url"

"github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/shared"
)

type ClickhouseConnector struct {
Expand All @@ -23,24 +25,53 @@ type ClickhouseConnector struct {
tableSchemaMapping map[string]*protos.TableSchema
logger log.Logger
config *protos.ClickhouseConfig
creds utils.S3PeerCredentials
creds *utils.ClickhouseS3Credentials
}

func ValidateS3(ctx context.Context, bucketUrl string, creds utils.S3PeerCredentials) error {
func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error {
// for validation purposes
s3Client, err := utils.CreateS3Client(creds)
s3Client, err := utils.CreateS3Client(utils.S3PeerCredentials{
AccessKeyID: creds.AccessKeyID,
SecretAccessKey: creds.SecretAccessKey,
Region: creds.Region,
})
if err != nil {
return fmt.Errorf("failed to create S3 client: %w", err)
}

validErr := conns3.ValidCheck(ctx, s3Client, bucketUrl, nil)
validErr := conns3.ValidCheck(ctx, s3Client, creds.BucketPath, nil)
if validErr != nil {
return validErr
}

return nil
}

// Creates and drops a dummy table to validate the peer
func ValidateClickhouse(ctx context.Context, conn *sql.DB) error {
validateDummyTableName := fmt.Sprintf("peerdb_validation_%s", shared.RandomString(4))
// create a table
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory",
validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to create validation table %s: %w", validateDummyTableName, err)
}

// insert a row
_, err = conn.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s VALUES (1)", validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to insert into validation table %s: %w", validateDummyTableName, err)
}

// drop the table
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", validateDummyTableName))
if err != nil {
return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err)
}

return nil
}

func NewClickhouseConnector(
ctx context.Context,
config *protos.ClickhouseConfig,
Expand All @@ -51,19 +82,39 @@ func NewClickhouseConnector(
return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err)
}

err = ValidateClickhouse(ctx, database)
if err != nil {
return nil, fmt.Errorf("invalidated Clickhouse peer: %w", err)
}

pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx)
if err != nil {
logger.Error("failed to create postgres metadata store", "error", err)
return nil, err
}

s3PeerCreds := utils.S3PeerCredentials{
var clickhouseS3Creds *utils.ClickhouseS3Credentials
// Get user provided S3 credentials
clickhouseS3Creds = &utils.ClickhouseS3Credentials{
AccessKeyID: config.AccessKeyId,
SecretAccessKey: config.SecretAccessKey,
Region: config.Region,
BucketPath: config.S3Path,
}

if clickhouseS3Creds.AccessKeyID == "" &&
clickhouseS3Creds.SecretAccessKey == "" && clickhouseS3Creds.Region == "" &&
clickhouseS3Creds.BucketPath == "" {
deploymentUID := shared.GetDeploymentUID()
flowName, _ := ctx.Value(shared.FlowNameKey).(string)
bucketPathSuffix := fmt.Sprintf("%s/%s",
url.PathEscape(deploymentUID), url.PathEscape(flowName))

// Fallback: Get S3 credentials from environment
clickhouseS3Creds = utils.GetClickhouseAWSSecrets(bucketPathSuffix)
}

validateErr := ValidateS3(ctx, config.S3Path, s3PeerCreds)
validateErr := ValidateS3(ctx, clickhouseS3Creds)
if validateErr != nil {
return nil, fmt.Errorf("failed to validate S3 bucket: %w", validateErr)
}
Expand All @@ -73,7 +124,7 @@ func NewClickhouseConnector(
pgMetadata: pgMetadata,
tableSchemaMapping: nil,
config: config,
creds: s3PeerCreds,
creds: clickhouseS3Creds,
logger: logger,
}, nil
}
Expand Down
38 changes: 19 additions & 19 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,22 @@ func NewClickhouseAvroSyncMethod(
}

func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, avroFile *avro.AvroFile) error {
stagingPath := s.config.StagingPath
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
stagingPath := s.connector.creds.BucketPath
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return err
}
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket,
s.connector.creds.Region, avroFile.FilePath)

if err != nil {
return err
}
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')",
s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)
s.config.DestinationTableIdentifier, avroFileUrl,
s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -102,9 +101,7 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
) (int, error) {
startTime := time.Now()
dstTableName := config.DestinationTableIdentifier

stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Path

stagingPath := s.connector.creds.BucketPath
schema, err := stream.Schema()
if err != nil {
return -1, fmt.Errorf("failed to get schema from stream: %w", err)
Expand All @@ -123,16 +120,18 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, err
}
awsCreds, err := utils.GetAWSSecrets(s.connector.creds)
avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath)

avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket,
s.connector.creds.Region, avroFile.FilePath)

if err != nil {
return 0, err
}
selector := strings.Join(schema.GetColumnNames(), ",")
//nolint:gosec
query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')",
config.DestinationTableIdentifier, selector, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey)
config.DestinationTableIdentifier, selector, avroFileUrl,
s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)

Expand Down Expand Up @@ -168,20 +167,21 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile(
partitionID string,
flowJobName string,
) (*avro.AvroFile, error) {
stagingPath := s.config.StagingPath // "s3://avro-clickhouse"
if stagingPath == "" {
stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Path // "s3://avro-clickhouse"
}
stagingPath := s.connector.creds.BucketPath
ocfWriter := avro.NewPeerDBOCFWriter(stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse)
s3o, err := utils.NewS3BucketAndPrefix(stagingPath)
if err != nil {
return nil, fmt.Errorf("failed to parse staging path: %w", err)
}

s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName
s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID)
s3AvroFileKey = strings.Trim(s3AvroFileKey, "/")

avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, s.connector.creds)
avroFile, err := ocfWriter.WriteRecordsToS3(ctx, s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{
AccessKeyID: s.connector.creds.AccessKeyID,
SecretAccessKey: s.connector.creds.SecretAccessKey,
Region: s.connector.creds.Region,
})
if err != nil {
return nil, fmt.Errorf("failed to write records to S3: %w", err)
}
Expand Down
22 changes: 22 additions & 0 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ type S3PeerCredentials struct {
Endpoint string `json:"endpoint"`
}

type ClickhouseS3Credentials struct {
AccessKeyID string `json:"accessKeyId"`
SecretAccessKey string `json:"secretAccessKey"`
Region string `json:"region"`
BucketPath string `json:"bucketPath"`
}

func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) {
awsRegion := creds.Region
if awsRegion == "" {
Expand Down Expand Up @@ -72,6 +79,21 @@ func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) {
}, nil
}

func GetClickhouseAWSSecrets(bucketPathSuffix string) *ClickhouseS3Credentials {
awsRegion := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_REGION")
awsKey := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_ACCESS_KEY_ID")
awsSecret := os.Getenv("PEERDB_CLICKHOUSE_AWS_CREDENTIALS_AWS_SECRET_ACCESS_KEY")
awsBucketName := os.Getenv("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME")

awsBucketPath := fmt.Sprintf("s3://%s/%s", awsBucketName, bucketPathSuffix)
return &ClickhouseS3Credentials{
AccessKeyID: awsKey,
SecretAccessKey: awsSecret,
Region: awsRegion,
BucketPath: awsBucketPath,
}
}

type S3BucketAndPrefix struct {
Bucket string
Prefix string
Expand Down
4 changes: 4 additions & 0 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,7 @@ func prependUIDToTaskQueueName(taskQueueName string) string {
}
return fmt.Sprintf("%s-%s", deploymentUID, taskQueueName)
}

func GetDeploymentUID() string {
return peerdbenv.PeerDBDeploymentUID()
}
27 changes: 22 additions & 5 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,23 @@ export const fetchSchemas = async (peerName: string) => {
return schemasRes.schemas;
};

const getDefaultDestinationTable = (
peerType: DBType | undefined,
schemaName: string,
tableName: string
) => {
if (!peerType) {
return `${schemaName}.${tableName}`;
}
if (dBTypeToJSON(peerType) == 'BIGQUERY') {
return tableName;
}
if (dBTypeToJSON(peerType) == 'CLICKHOUSE') {
return `${schemaName}_${tableName}`;
}
return `${schemaName}.${tableName}`;
};

export const fetchTables = async (
peerName: string,
schemaName: string,
Expand All @@ -265,11 +282,11 @@ export const fetchTables = async (
for (const tableObject of tableRes) {
// setting defaults:
// for bigquery, tables are not schema-qualified
const dstName =
peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY'
? tableObject.tableName
: `${schemaName}.${tableObject.tableName}`;

const dstName = getDefaultDestinationTable(
peerType,
schemaName,
tableObject.tableName
);
tables.push({
schema: schemaName,
source: `${schemaName}.${tableObject.tableName}`,
Expand Down
72 changes: 40 additions & 32 deletions ui/app/peers/create/[peerType]/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,34 +229,6 @@ export const bqSchema = z.object({
.max(1024, 'DatasetID must be less than 1025 characters'),
});

const urlSchema = z
.string({
invalid_type_error: 'URL must be a string',
required_error: 'URL is required',
})
.min(1, { message: 'URL must be non-empty' })
.refine((url) => url.startsWith('s3://'), {
message: 'URL must start with s3://',
});

const accessKeySchema = z
.string({
invalid_type_error: 'Access Key ID must be a string',
required_error: 'Access Key ID is required',
})
.min(1, { message: 'Access Key ID must be non-empty' });

const secretKeySchema = z
.string({
invalid_type_error: 'Secret Access Key must be a string',
required_error: 'Secret Access Key is required',
})
.min(1, { message: 'Secret Access Key must be non-empty' });

const regionSchema = z.string({
invalid_type_error: 'Region must be a string',
});

export const chSchema = z.object({
host: z
.string({
Expand Down Expand Up @@ -294,10 +266,46 @@ export const chSchema = z.object({
})
.min(1, 'Password must be non-empty')
.max(100, 'Password must be less than 100 characters'),
s3Path: urlSchema,
accessKeyId: accessKeySchema,
secretAccessKey: secretKeySchema,
region: regionSchema.min(1, { message: 'Region must be non-empty' }),
s3Path: z
.string({ invalid_type_error: 'S3 Path must be a string' })
.optional(),
accessKeyId: z
.string({ invalid_type_error: 'Access Key ID must be a string' })
.optional(),
secretAccessKey: z
.string({ invalid_type_error: 'Secret Access Key must be a string' })
.optional(),
region: z
.string({ invalid_type_error: 'Region must be a string' })
.optional(),
});

const urlSchema = z
.string({
invalid_type_error: 'URL must be a string',
required_error: 'URL is required',
})
.min(1, { message: 'URL must be non-empty' })
.refine((url) => url.startsWith('s3://'), {
message: 'URL must start with s3://',
});

const accessKeySchema = z
.string({
invalid_type_error: 'Access Key ID must be a string',
required_error: 'Access Key ID is required',
})
.min(1, { message: 'Access Key ID must be non-empty' });

const secretKeySchema = z
.string({
invalid_type_error: 'Secret Access Key must be a string',
required_error: 'Secret Access Key is required',
})
.min(1, { message: 'Secret Access Key must be non-empty' });

const regionSchema = z.string({
invalid_type_error: 'Region must be a string',
});

export const s3Schema = z.object({
Expand Down
Loading

0 comments on commit 516e384

Please sign in to comment.