diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index de3f2bcfeb..3c19feec0c 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -299,12 +299,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, TableMappings: input.FlowConnectionConfigs.TableMappings, StagingPath: input.FlowConnectionConfigs.CdcStagingPath, }) - res.RelationMessageMapping = input.RelationMessageMapping if err != nil { slog.Warn("failed to push records", slog.Any("error", err)) a.Alerter.LogFlowError(ctx, flowName, err) return nil, fmt.Errorf("failed to push records: %w", err) } + res.RelationMessageMapping = input.RelationMessageMapping err = errGroup.Wait() if err != nil { diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go new file mode 100644 index 0000000000..f867d9d96f --- /dev/null +++ b/flow/connectors/clickhouse/cdc.go @@ -0,0 +1,198 @@ +package connclickhouse + +import ( + "database/sql" + "fmt" + "log/slog" + "regexp" + "strings" + + _ "github.com/ClickHouse/clickhouse-go/v2" + _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" +) + +const ( + checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = ? AND name = ?) AS table_exists;` + mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS" +) + +// getRawTableName returns the raw table name for the given table identifier. +func (c *ClickhouseConnector) getRawTableName(flowJobName string) string { + // replace all non-alphanumeric characters with _ + flowJobName = regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(flowJobName, "_") + return fmt.Sprintf("_peerdb_raw_%s", flowJobName) +} + +func (c *ClickhouseConnector) checkIfTableExists(databaseName string, tableIdentifier string) (bool, error) { + var result sql.NullInt32 + err := c.database.QueryRowContext(c.ctx, checkIfTableExistsSQL, databaseName, tableIdentifier).Scan(&result) + if err != nil { + return false, fmt.Errorf("error while reading result row: %w", err) + } + + if !result.Valid { + return false, fmt.Errorf("[clickhouse] checkIfTableExists: result is not valid") + } + + return result.Int32 == 1, nil +} + +type MirrorJobRow struct { + MirrorJobName string + Offset int + SyncBatchID int + NormalizeBatchID int +} + +func (c *ClickhouseConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { + rawTableName := c.getRawTableName(req.FlowJobName) + + createRawTableSQL := `CREATE TABLE IF NOT EXISTS %s ( + _peerdb_uid String NOT NULL, + _peerdb_timestamp Int64 NOT NULL, + _peerdb_destination_table_name String NOT NULL, + _peerdb_data String NOT NULL, + _peerdb_record_type Int NOT NULL, + _peerdb_match_data String, + _peerdb_batch_id Int, + _peerdb_unchanged_toast_columns String + ) ENGINE = ReplacingMergeTree ORDER BY _peerdb_uid;` + + _, err := c.database.ExecContext(c.ctx, + fmt.Sprintf(createRawTableSQL, rawTableName)) + if err != nil { + return nil, fmt.Errorf("unable to create raw table: %w", err) + } + return &protos.CreateRawTableOutput{ + TableIdentifier: rawTableName, + }, nil +} + +func (c *ClickhouseConnector) syncRecordsViaAvro( + req *model.SyncRecordsRequest, + rawTableIdentifier string, + syncBatchID int64, +) (*model.SyncResponse, error) { + tableNameRowsMapping := make(map[string]uint32) + streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) + streamRes, err := utils.RecordsToRawTableStream(streamReq) + if err != nil { + return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) + } + + qrepConfig := &protos.QRepConfig{ + StagingPath: c.config.S3Integration, + FlowJobName: req.FlowJobName, + DestinationTableIdentifier: strings.ToLower(rawTableIdentifier), + } + avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c) + destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier) + if err != nil { + return nil, err + } + + numRecords, err := avroSyncer.SyncRecords(destinationTableSchema, streamRes.Stream, req.FlowJobName) + if err != nil { + return nil, err + } + + err = c.ReplayTableSchemaDeltas(req.FlowJobName, req.Records.SchemaDeltas) + if err != nil { + return nil, fmt.Errorf("failed to sync schema changes: %w", err) + } + + lastCheckpoint, err := req.Records.GetLastCheckpoint() + if err != nil { + return nil, err + } + + return &model.SyncResponse{ + LastSyncedCheckpointID: lastCheckpoint, + NumRecordsSynced: int64(numRecords), + CurrentSyncBatchID: syncBatchID, + TableNameRowsMapping: tableNameRowsMapping, + TableSchemaDeltas: req.Records.SchemaDeltas, + }, nil +} + +func (c *ClickhouseConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { + rawTableName := c.getRawTableName(req.FlowJobName) + c.logger.Info(fmt.Sprintf("pushing records to Clickhouse table %s", rawTableName)) + + res, err := c.syncRecordsViaAvro(req, rawTableName, req.SyncBatchID) + if err != nil { + return nil, err + } + + lastCheckpoint, err := req.Records.GetLastCheckpoint() + if err != nil { + return nil, fmt.Errorf("failed to get last checkpoint: %w", err) + } + + err = c.SetLastOffset(req.FlowJobName, lastCheckpoint) + if err != nil { + c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err)) + return nil, err + } + err = c.pgMetadata.IncrementID(req.FlowJobName) + if err != nil { + c.logger.Error("failed to increment id", slog.Any("error", err)) + return nil, err + } + + return res, nil +} + +func (c *ClickhouseConnector) SyncFlowCleanup(jobName string) error { + err := c.pgMetadata.DropMetadata(jobName) + if err != nil { + return err + } + return nil +} + +// ReplayTableSchemaDeltas changes a destination table to match the schema at source +// This could involve adding or dropping multiple columns. +func (c *ClickhouseConnector) ReplayTableSchemaDeltas(flowJobName string, + schemaDeltas []*protos.TableSchemaDelta, +) error { + return nil +} + +// external +func (c *ClickhouseConnector) NeedsSetupMetadataTables() bool { + return c.pgMetadata.NeedsSetupMetadata() +} + +func (c *ClickhouseConnector) SetupMetadataTables() error { + err := c.pgMetadata.SetupMetadata() + if err != nil { + c.logger.Error("failed to setup metadata tables", slog.Any("error", err)) + return err + } + + return nil +} + +func (c *ClickhouseConnector) GetLastSyncBatchID(jobName string) (int64, error) { + return c.pgMetadata.GetLastBatchID(jobName) +} + +func (c *ClickhouseConnector) GetLastOffset(jobName string) (int64, error) { + return c.pgMetadata.FetchLastOffset(jobName) +} + +// update offset for a job +func (c *ClickhouseConnector) SetLastOffset(jobName string, offset int64) error { + err := c.pgMetadata.UpdateLastOffset(jobName, offset) + if err != nil { + c.logger.Error("failed to update last offset: ", slog.Any("error", err)) + return err + } + + return nil +} diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 6b3740935a..51dd545f7f 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -9,6 +9,7 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" ) @@ -16,8 +17,10 @@ import ( type ClickhouseConnector struct { ctx context.Context database *sql.DB + pgMetadata *metadataStore.PostgresMetadataStore tableSchemaMapping map[string]*protos.TableSchema logger slog.Logger + config *protos.ClickhouseConfig } func NewClickhouseConnector(ctx context.Context, @@ -28,12 +31,22 @@ func NewClickhouseConnector(ctx context.Context, return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) } + metadataSchemaName := "peerdb_s3_metadata" // #nosec G101 + pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, + clickhouseProtoConfig.GetMetadataDb(), metadataSchemaName) + if err != nil { + slog.ErrorContext(ctx, "failed to create postgres metadata store", slog.Any("error", err)) + return nil, err + } + flowName, _ := ctx.Value(shared.FlowNameKey).(string) return &ClickhouseConnector{ ctx: ctx, database: database, + pgMetadata: pgMetadata, tableSchemaMapping: nil, logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + config: clickhouseProtoConfig, }, nil } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go new file mode 100644 index 0000000000..f0aac33fd2 --- /dev/null +++ b/flow/connectors/clickhouse/normalize.go @@ -0,0 +1,267 @@ +package connclickhouse + +import ( + "database/sql" + "fmt" + "strconv" + "strings" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" +) + +const ( + signColName = "_peerdb_is_deleted" + signColType = "Int8" + versionColName = "_peerdb_version" + versionColType = "Int64" +) + +func (c *ClickhouseConnector) SetupNormalizedTables( + req *protos.SetupNormalizedTableBatchInput, +) (*protos.SetupNormalizedTableBatchOutput, error) { + tableExistsMapping := make(map[string]bool) + for tableIdentifier, tableSchema := range req.TableNameSchemaMapping { + tableAlreadyExists, err := c.checkIfTableExists(c.config.Database, tableIdentifier) + if err != nil { + return nil, fmt.Errorf("error occurred while checking if normalized table exists: %w", err) + } + if tableAlreadyExists { + tableExistsMapping[tableIdentifier] = true + continue + } + + normalizedTableCreateSQL, err := generateCreateTableSQLForNormalizedTable( + tableIdentifier, + tableSchema, + req.SoftDeleteColName, + req.SyncedAtColName, + ) + if err != nil { + return nil, fmt.Errorf("error while generating create table sql for normalized table: %w", err) + } + + _, err = c.database.ExecContext(c.ctx, normalizedTableCreateSQL) + if err != nil { + return nil, fmt.Errorf("[sf] error while creating normalized table: %w", err) + } + tableExistsMapping[tableIdentifier] = false + } + + return &protos.SetupNormalizedTableBatchOutput{ + TableExistsMapping: tableExistsMapping, + }, nil +} + +func generateCreateTableSQLForNormalizedTable( + normalizedTable string, + tableSchema *protos.TableSchema, + _ string, // softDeleteColName + syncedAtColName string, +) (string, error) { + var stmtBuilder strings.Builder + stmtBuilder.WriteString(fmt.Sprintf("CREATE TABLE `%s` (", normalizedTable)) + + nc := len(tableSchema.ColumnNames) + for i := 0; i < nc; i++ { + colName := tableSchema.ColumnNames[i] + colType := qvalue.QValueKind(tableSchema.ColumnTypes[i]) + clickhouseType, err := qValueKindToClickhouseType(colType) + if err != nil { + return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err) + } + stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", colName, clickhouseType)) + } + + // TODO support soft delete + + // synced at column will be added to all normalized tables + if syncedAtColName != "" { + colName := strings.ToLower(syncedAtColName) + stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", colName, "DateTime64(9) DEFAULT now()")) + } + + // add sign and version columns + stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", signColName, signColType)) + stmtBuilder.WriteString(fmt.Sprintf("`%s` %s", versionColName, versionColType)) + + stmtBuilder.WriteString(fmt.Sprintf(") ENGINE = ReplacingMergeTree(`%s`) ", versionColName)) + + pkeys := tableSchema.PrimaryKeyColumns + if len(pkeys) > 0 { + pkeyStr := strings.Join(pkeys, ",") + + stmtBuilder.WriteString("PRIMARY KEY (") + stmtBuilder.WriteString(pkeyStr) + stmtBuilder.WriteString(") ") + + stmtBuilder.WriteString("ORDER BY (") + stmtBuilder.WriteString(pkeyStr) + stmtBuilder.WriteString(")") + } + + return stmtBuilder.String(), nil +} + +func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { + normBatchID, err := c.GetLastNormalizeBatchID(req.FlowJobName) + if err != nil { + c.logger.ErrorContext(c.ctx, "[clickhouse] error while getting last sync and normalize batch id", err) + return nil, err + } + + // normalize has caught up with sync, chill until more records are loaded. + if normBatchID >= req.SyncBatchID { + return &model.NormalizeResponse{ + Done: false, + StartBatchID: normBatchID, + EndBatchID: req.SyncBatchID, + }, nil + } + + destinationTableNames, err := c.getDistinctTableNamesInBatch( + req.FlowJobName, + req.SyncBatchID, + normBatchID, + ) + if err != nil { + c.logger.ErrorContext(c.ctx, "[clickhouse] error while getting distinct table names in batch", err) + return nil, err + } + + rawTbl := c.getRawTableName(req.FlowJobName) + + // model the raw table data as inserts. + for _, tbl := range destinationTableNames { + // SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id + selectQuery := strings.Builder{} + selectQuery.WriteString("SELECT ") + + colSelector := strings.Builder{} + colSelector.WriteString("(") + + schema := req.TableNameSchemaMapping[tbl] + numCols := len(schema.ColumnNames) + + projection := strings.Builder{} + + for i := 0; i < numCols; i++ { + cn := schema.ColumnNames[i] + ct := schema.ColumnTypes[i] + + colSelector.WriteString(fmt.Sprintf("%s,", cn)) + // if i < numCols-1 { + // colSelector.WriteString(",") + // } + + extractionFuction := "JSONExtractRaw" + switch qvalue.QValueKind(ct) { + case qvalue.QValueKindString: + extractionFuction = "JSONExtractString" + case qvalue.QValueKindInt64: + // TODO check if int64 is supported. + extractionFuction = "JSONExtractInt" + } + projection.WriteString(fmt.Sprintf("%s(_peerdb_data, '%s') AS %s, ", extractionFuction, cn, cn)) + } + + // add _peerdb_sign as _peerdb_record_type / 2 + projection.WriteString(fmt.Sprintf("intDiv(_peerdb_record_type, 2) AS %s, ", signColName)) + colSelector.WriteString(fmt.Sprintf("%s,", signColName)) + + // add _peerdb_timestamp as _peerdb_version + projection.WriteString(fmt.Sprintf("_peerdb_timestamp AS %s", versionColName)) + colSelector.WriteString(versionColName) + colSelector.WriteString(") ") + + selectQuery.WriteString(projection.String()) + selectQuery.WriteString(" FROM ") + selectQuery.WriteString(rawTbl) + selectQuery.WriteString(" WHERE _peerdb_batch_id > ") + selectQuery.WriteString(strconv.FormatInt(normBatchID, 10)) + selectQuery.WriteString(" AND _peerdb_batch_id <= ") + selectQuery.WriteString(strconv.FormatInt(req.SyncBatchID, 10)) + selectQuery.WriteString(" AND _peerdb_destination_table_name = '") + selectQuery.WriteString(tbl) + selectQuery.WriteString("'") + + selectQuery.WriteString(" ORDER BY _peerdb_timestamp") + + insertIntoSelectQuery := strings.Builder{} + insertIntoSelectQuery.WriteString("INSERT INTO ") + insertIntoSelectQuery.WriteString(tbl) + insertIntoSelectQuery.WriteString(colSelector.String()) + insertIntoSelectQuery.WriteString(selectQuery.String()) + + q := insertIntoSelectQuery.String() + c.logger.InfoContext(c.ctx, fmt.Sprintf("[clickhouse] insert into select query %s", q)) + + _, err = c.database.ExecContext(c.ctx, q) + if err != nil { + return nil, fmt.Errorf("error while inserting into normalized table: %w", err) + } + } + + endNormalizeBatchId := normBatchID + 1 + err = c.pgMetadata.UpdateNormalizeBatchID(req.FlowJobName, endNormalizeBatchId) + if err != nil { + c.logger.ErrorContext(c.ctx, "[clickhouse] error while updating normalize batch id", err) + return nil, err + } + + return &model.NormalizeResponse{ + Done: true, + StartBatchID: endNormalizeBatchId, + EndBatchID: req.SyncBatchID, + }, nil +} + +func (c *ClickhouseConnector) getDistinctTableNamesInBatch( + flowJobName string, + syncBatchID int64, + normalizeBatchID int64, +) ([]string, error) { + rawTbl := c.getRawTableName(flowJobName) + + //nolint:gosec + q := fmt.Sprintf( + `SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id > %d AND _peerdb_batch_id <= %d`, + rawTbl, normalizeBatchID, syncBatchID) + + rows, err := c.database.QueryContext(c.ctx, q) + if err != nil { + return nil, fmt.Errorf("error while querying raw table for distinct table names in batch: %w", err) + } + defer rows.Close() + var tableNames []string + for rows.Next() { + var tableName sql.NullString + err = rows.Scan(&tableName) + if err != nil { + return nil, fmt.Errorf("error while scanning table name: %w", err) + } + + if !tableName.Valid { + return nil, fmt.Errorf("table name is not valid") + } + + tableNames = append(tableNames, tableName.String) + } + + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("failed to read rows: %w", err) + } + + return tableNames, nil +} + +func (c *ClickhouseConnector) GetLastNormalizeBatchID(flowJobName string) (int64, error) { + normalizeBatchID, err := c.pgMetadata.GetLastNormalizeBatchID(flowJobName) + if err != nil { + return 0, fmt.Errorf("error while getting last normalize batch id: %w", err) + } + + return normalizeBatchID, nil +} diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index 84c5d2eb89..68129a98d5 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "log/slog" + "strings" "time" "go.temporal.io/sdk/activity" @@ -31,6 +32,65 @@ func NewClickhouseAvroSyncMethod( } } +func (s *ClickhouseAvroSyncMethod) CopyStageToDestination(avroFile *avro.AvroFile) error { + stagingPath := s.config.StagingPath + if stagingPath == "" { + stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration // "s3://avro-clickhouse" + } + s3o, err := utils.NewS3BucketAndPrefix(stagingPath) + if err != nil { + return err + } + awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{}) + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) + + if err != nil { + return err + } + //nolint:gosec + query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", + s.config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) + + _, err = s.connector.database.Exec(query) + + return err +} + +func (s *ClickhouseAvroSyncMethod) SyncRecords( + dstTableSchema []*sql.ColumnType, + stream *model.QRecordStream, + flowJobName string, +) (int, error) { + tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier) + dstTableName := s.config.DestinationTableIdentifier + + schema, err := stream.Schema() + if err != nil { + return -1, fmt.Errorf("failed to get schema from stream: %w", err) + } + + s.connector.logger.Info("sync function called and schema acquired", tableLog) + + avroSchema, err := s.getAvroSchema(dstTableName, schema) + if err != nil { + return 0, err + } + + partitionID := shared.RandomString(16) + avroFile, err := s.writeToAvroFile(stream, avroSchema, partitionID, flowJobName) + if err != nil { + return 0, err + } + defer avroFile.Cleanup() + s.connector.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog) + err = s.CopyStageToDestination(avroFile) + if err != nil { + return 0, err + } + + return avroFile.NumRecords, nil +} + func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( config *protos.QRepConfig, partition *protos.QRepPartition, @@ -39,13 +99,13 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( ) (int, error) { startTime := time.Now() dstTableName := config.DestinationTableIdentifier - // s.config.StagingPath = "s3://avro-clickhouse" + + stagingPath := s.config.DestinationPeer.GetClickhouseConfig().S3Integration schema, err := stream.Schema() if err != nil { return -1, fmt.Errorf("failed to get schema from stream: %w", err) } - avroSchema, err := s.getAvroSchema(dstTableName, schema) if err != nil { return 0, err @@ -56,21 +116,23 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( return 0, err } - s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) + s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { return 0, err } awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{}) - avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) if err != nil { return 0, err } + selector := strings.Join(schema.GetColumnNames(), ",") //nolint:gosec - query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", - config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) + query := fmt.Sprintf("INSERT INTO %s (%s) SELECT * FROM s3('%s','%s','%s', 'Avro')", + config.DestinationTableIdentifier, selector, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) _, err = s.connector.database.Exec(query) + if err != nil { return 0, err } @@ -102,14 +164,20 @@ func (s *ClickhouseAvroSyncMethod) writeToAvroFile( partitionID string, flowJobName string, ) (*avro.AvroFile, error) { + stagingPath := s.config.StagingPath // "s3://avro-clickhouse" + if stagingPath == "" { + stagingPath = s.config.DestinationPeer.GetClickhouseConfig().S3Integration // "s3://avro-clickhouse" + } ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, qvalue.QDWHTypeClickhouse) - s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) + s3o, err := utils.NewS3BucketAndPrefix(stagingPath) if err != nil { return nil, fmt.Errorf("failed to parse staging path: %w", err) } - s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName + s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName + s3AvroFileKey = strings.Trim(s3AvroFileKey, "/") + avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) ///utils.S3PeerCredentials{}) if err != nil { return nil, fmt.Errorf("failed to write records to S3: %w", err) diff --git a/flow/connectors/clickhouse/qvalue_convert.go b/flow/connectors/clickhouse/qvalue_convert.go index 30249db70a..d09eeced65 100644 --- a/flow/connectors/clickhouse/qvalue_convert.go +++ b/flow/connectors/clickhouse/qvalue_convert.go @@ -39,3 +39,12 @@ var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{ "Array(Int64)": qvalue.QValueKindArrayInt64, "Array(Float64)": qvalue.QValueKindArrayFloat64, } + +func qValueKindToClickhouseType(colType qvalue.QValueKind) (string, error) { + val, err := colType.ToDWHColumnType(qvalue.QDWHTypeClickhouse) + if err != nil { + return "", err + } + + return val, err +} diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 1e28822181..b3a93f9b90 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -158,6 +158,8 @@ func GetCDCSyncConnector(ctx context.Context, config *protos.Peer) (CDCSyncConne return conneventhub.NewEventHubConnector(ctx, config.GetEventhubGroupConfig()) case *protos.Peer_S3Config: return conns3.NewS3Connector(ctx, config.GetS3Config()) + case *protos.Peer_ClickhouseConfig: + return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()) default: return nil, ErrUnsupportedFunctionality } @@ -174,6 +176,8 @@ func GetCDCNormalizeConnector(ctx context.Context, return connbigquery.NewBigQueryConnector(ctx, config.GetBigqueryConfig()) case *protos.Peer_SnowflakeConfig: return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig()) + case *protos.Peer_ClickhouseConfig: + return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()) default: return nil, ErrUnsupportedFunctionality } diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 73557ad300..ddf4da7108 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -122,7 +122,8 @@ func (p *PostgresMetadataStore) SetupMetadata() error { job_name TEXT PRIMARY KEY NOT NULL, last_offset BIGINT NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT NOW(), - sync_batch_id BIGINT NOT NULL + sync_batch_id BIGINT NOT NULL, + normalize_batch_id BIGINT ) `) if err != nil && !utils.IsUniqueError(err) { @@ -171,7 +172,7 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { return 0, nil } - slog.Error("failed to get last offset", slog.Any("error", err)) + p.logger.Error("failed to get last sync batch id", slog.Any("error", err)) return 0, err } p.logger.Info("got last batch id for job", slog.Int64("batch id", syncBatchID.Int64)) @@ -179,6 +180,29 @@ func (p *PostgresMetadataStore) GetLastBatchID(jobName string) (int64, error) { return syncBatchID.Int64, nil } +func (p *PostgresMetadataStore) GetLastNormalizeBatchID(jobName string) (int64, error) { + rows := p.conn.QueryRow(p.ctx, ` + SELECT normalize_batch_id + FROM `+p.schemaName+`.`+lastSyncStateTableName+` + WHERE job_name = $1 + `, jobName) + + var normalizeBatchID pgtype.Int8 + err := rows.Scan(&normalizeBatchID) + if err != nil { + // if the job doesn't exist, return 0 + if err.Error() == "no rows in result set" { + return 0, nil + } + + p.logger.Error("failed to get last normalize", slog.Any("error", err)) + return 0, err + } + p.logger.Info("got last normalize batch normalize id for job", slog.Int64("batch id", normalizeBatchID.Int64)) + + return normalizeBatchID.Int64, nil +} + // update offset for a job func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) error { // start a transaction @@ -213,7 +237,7 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e return nil } -// update offset for a job +// update the sync batch id for a job. func (p *PostgresMetadataStore) IncrementID(jobName string) error { p.logger.Info("incrementing sync batch id for job") _, err := p.conn.Exec(p.ctx, ` @@ -228,6 +252,20 @@ func (p *PostgresMetadataStore) IncrementID(jobName string) error { return nil } +func (p *PostgresMetadataStore) UpdateNormalizeBatchID(jobName string, batchID int64) error { + p.logger.Info("updating normalize batch id for job") + _, err := p.conn.Exec(p.ctx, ` + UPDATE `+p.schemaName+`.`+lastSyncStateTableName+` + SET normalize_batch_id=$2 WHERE job_name=$1 + `, jobName, batchID) + if err != nil { + p.logger.Error("failed to update normalize batch id", slog.Any("error", err)) + return err + } + + return nil +} + func (p *PostgresMetadataStore) DropMetadata(jobName string) error { _, err := p.conn.Exec(p.ctx, ` DELETE FROM `+p.QualifyTable(lastSyncStateTableName)+` diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 5c6b069639..5c1c9271e8 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -127,13 +127,20 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ } func (kind QValueKind) ToDWHColumnType(dwhType QDWHType) (string, error) { - if dwhType != QDWHTypeSnowflake { - return "", fmt.Errorf("unsupported DWH type: %v", dwhType) - } - - if val, ok := QValueKindToSnowflakeTypeMap[kind]; ok { - return val, nil - } else { - return "STRING", nil + switch dwhType { + case QDWHTypeSnowflake: + if val, ok := QValueKindToSnowflakeTypeMap[kind]; ok { + return val, nil + } else { + return "STRING", nil + } + case QDWHTypeClickhouse: + if val, ok := QValueKindToClickhouseTypeMap[kind]; ok { + return val, nil + } else { + return "String", nil + } + default: + return "", fmt.Errorf("unknown dwh type: %v", dwhType) } } diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 6602782e72..d2856cc381 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -789,6 +789,9 @@ fn parse_db_options( Some(config) } DbType::Clickhouse => { + let conn_str = opts.get("metadata_db"); + let metadata_db = parse_metadata_db_info(conn_str.copied())?; + let s3_int = opts .get("s3_integration") .map(|s| s.to_string()) @@ -814,6 +817,7 @@ fn parse_db_options( .context("no default database specified")? .to_string(), s3_integration: s3_int, + metadata_db, }; let config = Config::ClickhouseConfig(clickhouse_config); Some(config) diff --git a/protos/peers.proto b/protos/peers.proto index 372c02936b..351b11b8ce 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -97,6 +97,7 @@ message ClickhouseConfig{ string password = 4; string database = 5; string s3_integration = 6; // staging to store avro files + PostgresConfig metadata_db = 7; } message SqlServerConfig { diff --git a/ui/app/api/peers/getTruePeer.ts b/ui/app/api/peers/getTruePeer.ts index 1af4155dec..3cef249e48 100644 --- a/ui/app/api/peers/getTruePeer.ts +++ b/ui/app/api/peers/getTruePeer.ts @@ -1,6 +1,7 @@ import { CatalogPeer } from '@/app/dto/PeersDTO'; import { BigqueryConfig, + ClickhouseConfig, EventHubConfig, EventHubGroupConfig, Peer, @@ -23,7 +24,8 @@ export const getTruePeer = (peer: CatalogPeer) => { | EventHubConfig | S3Config | SqlServerConfig - | EventHubGroupConfig; + | EventHubGroupConfig + | ClickhouseConfig; switch (peer.type) { case 0: config = BigqueryConfig.decode(options); @@ -53,6 +55,10 @@ export const getTruePeer = (peer: CatalogPeer) => { config = EventHubGroupConfig.decode(options); newPeer.eventhubGroupConfig = config; break; + case 8: + config = ClickhouseConfig.decode(options); + newPeer.clickhouseConfig = config; + break; default: return newPeer; } diff --git a/ui/app/peers/create/[peerType]/helpers/ch.ts b/ui/app/peers/create/[peerType]/helpers/ch.ts index 57eee3c39a..ff8615267a 100644 --- a/ui/app/peers/create/[peerType]/helpers/ch.ts +++ b/ui/app/peers/create/[peerType]/helpers/ch.ts @@ -57,4 +57,5 @@ export const blankClickhouseSetting: ClickhouseConfig = { password: '', database: '', s3Integration: '', + metadataDb: undefined, }; diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index 318aff7529..81782f46e0 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -19,6 +19,7 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => { case 'S3': return '/svgs/aws.svg'; case 'CLICKHOUSE': + case DBType.CLICKHOUSE: return '/svgs/ch.svg'; case DBType.EVENTHUB_GROUP: case DBType.EVENTHUB: