diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index b655607989..c6c15f3f84 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -663,6 +663,15 @@ func (h *FlowRequestHandler) CreatePeer( } s3Config := s3ConfigObject.S3Config encodedConfig, encodingErr = proto.Marshal(s3Config) + case protos.DBType_CLICKHOUSE: + chConfigObject, ok := config.(*protos.Peer_ClickhouseConfig) + + if !ok { + return wrongConfigResponse, nil + } + + chConfig := chConfigObject.ClickhouseConfig + encodedConfig, encodingErr = proto.Marshal(chConfig) default: return wrongConfigResponse, nil } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go new file mode 100644 index 0000000000..de51b0feb9 --- /dev/null +++ b/flow/connectors/clickhouse/clickhouse.go @@ -0,0 +1,81 @@ +package connclickhouse + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + + _ "github.com/ClickHouse/clickhouse-go/v2" + _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" +) + +type ClickhouseConnector struct { + ctx context.Context + database *sql.DB + tableSchemaMapping map[string]*protos.TableSchema + logger slog.Logger +} + +func NewClickhouseConnector(ctx context.Context, + clickhouseProtoConfig *protos.ClickhouseConfig, +) (*ClickhouseConnector, error) { + database, err := connect(ctx, clickhouseProtoConfig) + if err != nil { + return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) + } + + flowName, _ := ctx.Value(shared.FlowNameKey).(string) + return &ClickhouseConnector{ + ctx: ctx, + database: database, + tableSchemaMapping: nil, + logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), + }, nil +} + +func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) { + dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", //&database=%s" + config.Host, config.Port, config.User, config.Password) //, config.Database + + conn, err := sql.Open("clickhouse", dsn) + if err != nil { + return nil, fmt.Errorf("failed to open connection to Clickhouse peer: %w", err) + } + + if err := conn.PingContext(ctx); err != nil { + return nil, fmt.Errorf("failed to ping to Clickhouse peer: %w", err) + } + + // Execute USE database command to select a specific database + _, err = conn.Exec(fmt.Sprintf("USE %s", config.Database)) + if err != nil { + return nil, fmt.Errorf("failed in selecting db in Clickhouse peer: %w", err) + } + + return conn, nil +} + +func (c *ClickhouseConnector) Close() error { + if c == nil || c.database == nil { + return nil + } + + err := c.database.Close() + if err != nil { + return fmt.Errorf("error while closing connection to Clickhouse peer: %w", err) + } + return nil +} + +func (c *ClickhouseConnector) ConnectionActive() error { + if c == nil || c.database == nil { + return fmt.Errorf("ClickhouseConnector is nil") + } + + // This also checks if database exists + err := c.database.PingContext(c.ctx) + return err +} diff --git a/flow/connectors/clickhouse/client.go b/flow/connectors/clickhouse/client.go new file mode 100644 index 0000000000..9aa14cd57c --- /dev/null +++ b/flow/connectors/clickhouse/client.go @@ -0,0 +1,37 @@ +package connclickhouse + +import ( + "context" + "fmt" + + peersql "github.com/PeerDB-io/peer-flow/connectors/sql" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/jmoiron/sqlx" +) + +type ClickhouseClient struct { + peersql.GenericSQLQueryExecutor + // ctx is the context. + ctx context.Context + // config is the Snowflake config. + Config *protos.ClickhouseConfig +} + +func NewClickhouseClient(ctx context.Context, config *protos.ClickhouseConfig) (*ClickhouseClient, error) { + databaseSql, err := connect(ctx, config) + database := sqlx.NewDb(databaseSql, "clickhouse") // Use the appropriate driver name + + if err != nil { + return nil, fmt.Errorf("failed to open connection to Snowflake peer: %w", err) + } + + genericExecutor := *peersql.NewGenericSQLQueryExecutor( + ctx, database, clickhouseTypeToQValueKindMap, qvalue.QValueKindToSnowflakeTypeMap) + + return &ClickhouseClient{ + GenericSQLQueryExecutor: genericExecutor, + ctx: ctx, + Config: config, + }, nil +} diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go new file mode 100644 index 0000000000..74ffe26524 --- /dev/null +++ b/flow/connectors/clickhouse/qrep.go @@ -0,0 +1,197 @@ +package connclickhouse + +import ( + "database/sql" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/shared" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "google.golang.org/protobuf/encoding/protojson" +) + +const qRepMetadataTableName = "_peerdb_query_replication_metadata" + +func (c *ClickhouseConnector) SyncQRepRecords( + config *protos.QRepConfig, + partition *protos.QRepPartition, + stream *model.QRecordStream, +) (int, error) { + // Ensure the destination table is available. + destTable := config.DestinationTableIdentifier + flowLog := slog.Group("sync_metadata", + slog.String(string(shared.PartitionIDKey), partition.PartitionId), + slog.String("destinationTable", destTable), + ) + + done, err := c.isPartitionSynced(partition.PartitionId) + if err != nil { + return 0, fmt.Errorf("failed to check if partition %s is synced: %w", partition.PartitionId, err) + } + + if done { + c.logger.Info("Partition has already been synced", flowLog) + return 0, nil + } + + tblSchema, err := c.getTableSchema(destTable) + if err != nil { + return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err) + } + c.logger.Info("Called QRep sync function and obtained table schema", flowLog) + + avroSync := NewClickhouseAvroSyncMethod(config, c) + + return avroSync.SyncQRepRecords(config, partition, tblSchema, stream) +} + +func (c *ClickhouseConnector) createMetadataInsertStatement( + partition *protos.QRepPartition, + jobName string, + startTime time.Time, +) (string, error) { + // marshal the partition to json using protojson + pbytes, err := protojson.Marshal(partition) + if err != nil { + return "", fmt.Errorf("failed to marshal partition to json: %v", err) + } + + // convert the bytes to string + partitionJSON := string(pbytes) + + insertMetadataStmt := fmt.Sprintf( + `INSERT INTO %s + (flowJobName, partitionID, syncPartition, syncStartTime, syncFinishTime) + VALUES ('%s', '%s', '%s', '%s', NOW());`, + qRepMetadataTableName, jobName, partition.PartitionId, + partitionJSON, startTime.Format("2006-01-02 15:04:05.000000")) + + return insertMetadataStmt, nil +} + +func (c *ClickhouseConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) { + //nolint:gosec + queryString := fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, tableName) + rows, err := c.database.Query(queryString) + if err != nil { + return nil, fmt.Errorf("failed to execute query: %w", err) + } + defer rows.Close() + + columnTypes, err := rows.ColumnTypes() + if err != nil { + return nil, fmt.Errorf("failed to get column types: %w", err) + } + + return columnTypes, nil +} + +func (c *ClickhouseConnector) isPartitionSynced(partitionID string) (bool, error) { + //nolint:gosec + queryString := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE partitionID = '%s'`, qRepMetadataTableName, partitionID) + + row := c.database.QueryRow(queryString) + + var count int + if err := row.Scan(&count); err != nil { + return false, fmt.Errorf("failed to execute query: %w", err) + } + return count > 0, nil +} + +func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { + err := c.createQRepMetadataTable() //(createMetadataTablesTx) + if err != nil { + return err + } + + if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { + _, err = c.database.Exec(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) + if err != nil { + return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) + } + } + + return nil +} + +func (c *ClickhouseConnector) createQRepMetadataTable() error { // createMetadataTableTx *sql.Tx + // Define the schema + schemaStatement := ` + CREATE TABLE IF NOT EXISTS %s ( + flowJobName String, + partitionID String, + syncPartition String, + syncStartTime DateTime64, + syncFinishTime DateTime64 + ) ENGINE = MergeTree() + ORDER BY partitionID; + ` + queryString := fmt.Sprintf(schemaStatement, qRepMetadataTableName) + _, err := c.database.Exec(queryString) + if err != nil { + c.logger.Error(fmt.Sprintf("failed to create table %s", qRepMetadataTableName), + slog.Any("error", err)) + + return fmt.Errorf("failed to create table %s: %w", qRepMetadataTableName, err) + } + c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName)) + return nil +} + +func (c *ClickhouseConnector) ConsolidateQRepPartitions(config *protos.QRepConfig) error { + c.logger.Info("Consolidating partitions noop") + return nil +} + +// CleanupQRepFlow function for clickhouse connector +func (c *ClickhouseConnector) CleanupQRepFlow(config *protos.QRepConfig) error { + c.logger.Info("Cleaning up flow job") + return c.dropStage(config.StagingPath, config.FlowJobName) +} + +// dropStage drops the stage for the given job. +func (c *ClickhouseConnector) dropStage(stagingPath string, job string) error { + // if s3 we need to delete the contents of the bucket + if strings.HasPrefix(stagingPath, "s3://") { + s3o, err := utils.NewS3BucketAndPrefix(stagingPath) + if err != nil { + c.logger.Error("failed to create S3 bucket and prefix", slog.Any("error", err)) + return fmt.Errorf("failed to create S3 bucket and prefix: %w", err) + } + + c.logger.Info(fmt.Sprintf("Deleting contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job)) + + // deleting the contents of the bucket with prefix + s3svc, err := utils.CreateS3Client(utils.S3PeerCredentials{}) + if err != nil { + c.logger.Error("failed to create S3 client", slog.Any("error", err)) + return fmt.Errorf("failed to create S3 client: %w", err) + } + + // Create a list of all objects with the defined prefix in the bucket + iter := s3manager.NewDeleteListIterator(s3svc, &s3.ListObjectsInput{ + Bucket: aws.String(s3o.Bucket), + Prefix: aws.String(fmt.Sprintf("%s/%s", s3o.Prefix, job)), + }) + + // Iterate through the objects in the bucket with the prefix and delete them + s3Client := s3manager.NewBatchDeleteWithClient(s3svc) + if err := s3Client.Delete(aws.BackgroundContext(), iter); err != nil { + c.logger.Error("failed to delete objects from bucket", slog.Any("error", err)) + return fmt.Errorf("failed to delete objects from bucket: %w", err) + } + + c.logger.Info(fmt.Sprintf("Deleted contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job)) + } + + c.logger.Info(fmt.Sprintf("Dropped stage %s", stagingPath)) + return nil +} diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go new file mode 100644 index 0000000000..2adb391457 --- /dev/null +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -0,0 +1,159 @@ +package connclickhouse + +import ( + "database/sql" + "fmt" + "log/slog" + "time" + + "github.com/PeerDB-io/peer-flow/connectors/utils" + avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" + "go.temporal.io/sdk/activity" +) + +type ClickhouseAvroSyncMethod struct { + config *protos.QRepConfig + connector *ClickhouseConnector +} + +func NewClickhouseAvroSyncMethod( + config *protos.QRepConfig, + connector *ClickhouseConnector, +) *ClickhouseAvroSyncMethod { + return &ClickhouseAvroSyncMethod{ + config: config, + connector: connector, + } +} + +func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( + config *protos.QRepConfig, + partition *protos.QRepPartition, + dstTableSchema []*sql.ColumnType, + stream *model.QRecordStream, +) (int, error) { + startTime := time.Now() + dstTableName := config.DestinationTableIdentifier + // s.config.StagingPath = "s3://avro-clickhouse" + + schema, err := stream.Schema() + if err != nil { + return -1, fmt.Errorf("failed to get schema from stream: %w", err) + } + + avroSchema, err := s.getAvroSchema(dstTableName, schema) + if err != nil { + return 0, err + } + + avroFile, err := s.writeToAvroFile(stream, avroSchema, partition.PartitionId, config.FlowJobName) + if err != nil { + return 0, err + } + + s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) + if err != nil { + return 0, err + } + awsCreds, err := utils.GetAWSSecrets(utils.S3PeerCredentials{}) + avroFileUrl := fmt.Sprintf("https://%s.s3.%s.amazonaws.com%s", s3o.Bucket, awsCreds.Region, avroFile.FilePath) + + if err != nil { + return 0, err + } + //nolint:gosec + query := fmt.Sprintf("INSERT INTO %s SELECT * FROM s3('%s','%s','%s', 'Avro')", + config.DestinationTableIdentifier, avroFileUrl, awsCreds.AccessKeyID, awsCreds.SecretAccessKey) + + _, err = s.connector.database.Exec(query) + if err != nil { + return 0, err + } + + err = s.insertMetadata(partition, config.FlowJobName, startTime) + if err != nil { + return -1, err + } + + activity.RecordHeartbeat(s.connector.ctx, "finished syncing records") + + return avroFile.NumRecords, nil +} + +func (s *ClickhouseAvroSyncMethod) getAvroSchema( + dstTableName string, + schema *model.QRecordSchema, +) (*model.QRecordAvroSchemaDefinition, error) { + avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, qvalue.QDWHTypeClickhouse) + if err != nil { + return nil, fmt.Errorf("failed to define Avro schema: %w", err) + } + return avroSchema, nil +} + +func (s *ClickhouseAvroSyncMethod) writeToAvroFile( + stream *model.QRecordStream, + avroSchema *model.QRecordAvroSchemaDefinition, + partitionID string, + flowJobName string, +) (*avro.AvroFile, error) { + ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, avro.CompressZstd, + qvalue.QDWHTypeClickhouse) + s3o, err := utils.NewS3BucketAndPrefix(s.config.StagingPath) + if err != nil { + return nil, fmt.Errorf("failed to parse staging path: %w", err) + } + + s3AvroFileKey := fmt.Sprintf("%s/%s/%s.avro.zst", s3o.Prefix, flowJobName, partitionID) // s.config.FlowJobName + avroFile, err := ocfWriter.WriteRecordsToS3(s3o.Bucket, s3AvroFileKey, utils.S3PeerCredentials{}) ///utils.S3PeerCredentials{}) + if err != nil { + return nil, fmt.Errorf("failed to write records to S3: %w", err) + } + return avroFile, nil +} + +func (s *ClickhouseAvroSyncMethod) insertMetadata( + partition *protos.QRepPartition, + flowJobName string, + startTime time.Time, +) error { + partitionLog := slog.String(string(shared.PartitionIDKey), partition.PartitionId) + insertMetadataStmt, err := s.connector.createMetadataInsertStatement(partition, flowJobName, startTime) + if err != nil { + s.connector.logger.Error("failed to create metadata insert statement", + slog.Any("error", err), partitionLog) + return fmt.Errorf("failed to create metadata insert statement: %v", err) + } + + if _, err := s.connector.database.Exec(insertMetadataStmt); err != nil { + return fmt.Errorf("failed to execute metadata insert statement: %v", err) + } + + return nil +} + +type ClickhouseAvroWriteHandler struct { + connector *ClickhouseConnector + dstTableName string + stage string + copyOpts []string +} + +// NewClickhouseAvroWriteHandler creates a new ClickhouseAvroWriteHandler +func NewClickhouseAvroWriteHandler( + connector *ClickhouseConnector, + dstTableName string, + stage string, + copyOpts []string, +) *ClickhouseAvroWriteHandler { + return &ClickhouseAvroWriteHandler{ + connector: connector, + dstTableName: dstTableName, + stage: stage, + copyOpts: copyOpts, + } +} diff --git a/flow/connectors/clickhouse/qvalue_convert.go b/flow/connectors/clickhouse/qvalue_convert.go new file mode 100644 index 0000000000..30249db70a --- /dev/null +++ b/flow/connectors/clickhouse/qvalue_convert.go @@ -0,0 +1,41 @@ +package connclickhouse + +import ( + "github.com/PeerDB-io/peer-flow/model/qvalue" +) + +// TODO: remove extra types from here +var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{ + "INT": qvalue.QValueKindInt32, + "Int64": qvalue.QValueKindInt64, + "Int16": qvalue.QValueKindInt16, + "Float64": qvalue.QValueKindFloat64, + "DOUBLE": qvalue.QValueKindFloat64, + "REAL": qvalue.QValueKindFloat64, + "VARCHAR": qvalue.QValueKindString, + "CHAR": qvalue.QValueKindString, + "TEXT": qvalue.QValueKindString, + "String": qvalue.QValueKindString, + "Bool": qvalue.QValueKindBoolean, + "DateTime": qvalue.QValueKindTimestamp, + "TIMESTAMP": qvalue.QValueKindTimestamp, + "DateTime64(6)": qvalue.QValueKindTimestamp, + "TIMESTAMP_NTZ": qvalue.QValueKindTimestamp, + "TIMESTAMP_TZ": qvalue.QValueKindTimestampTZ, + "TIME": qvalue.QValueKindTime, + "DATE": qvalue.QValueKindDate, + "BLOB": qvalue.QValueKindBytes, + "BYTEA": qvalue.QValueKindBytes, + "BINARY": qvalue.QValueKindBytes, + "FIXED": qvalue.QValueKindNumeric, + "NUMBER": qvalue.QValueKindNumeric, + "DECIMAL": qvalue.QValueKindNumeric, + "NUMERIC": qvalue.QValueKindNumeric, + "VARIANT": qvalue.QValueKindJSON, + "GEOMETRY": qvalue.QValueKindGeometry, + "GEOGRAPHY": qvalue.QValueKindGeography, + "Array(String)": qvalue.QValueKindArrayString, + "Array(Int32)": qvalue.QValueKindArrayInt32, + "Array(Int64)": qvalue.QValueKindArrayInt64, + "Array(Float64)": qvalue.QValueKindArrayFloat64, +} diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 44a7f4a250..a9c80b8a46 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -7,6 +7,7 @@ import ( "log/slog" connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" + connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" conns3 "github.com/PeerDB-io/peer-flow/connectors/s3" @@ -197,6 +198,8 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig()) case *protos.Peer_S3Config: return conns3.NewS3Connector(ctx, config.GetS3Config()) + case *protos.Peer_ClickhouseConfig: + return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()) default: return nil, ErrUnsupportedFunctionality } @@ -242,6 +245,12 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) { return conns3.NewS3Connector(ctx, s3Config) // case protos.DBType_EVENTHUB: // return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig()) + case protos.DBType_CLICKHOUSE: + clickhouseConfig := peer.GetClickhouseConfig() + if clickhouseConfig == nil { + return nil, fmt.Errorf("missing clickhouse config for %s peer %s", peer.Type.String(), peer.Name) + } + return connclickhouse.NewClickhouseConnector(ctx, clickhouseConfig) default: return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String()) } @@ -254,7 +263,8 @@ func GetQRepConsolidateConnector(ctx context.Context, switch inner.(type) { case *protos.Peer_SnowflakeConfig: return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig()) - + case *protos.Peer_ClickhouseConfig: + return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()) default: return nil, ErrUnsupportedFunctionality } diff --git a/flow/connectors/s3/qrep.go b/flow/connectors/s3/qrep.go index 4ece0356bc..16a8e222ef 100644 --- a/flow/connectors/s3/qrep.go +++ b/flow/connectors/s3/qrep.go @@ -43,7 +43,7 @@ func getAvroSchema( dstTableName string, schema *model.QRecordSchema, ) (*model.QRecordAvroSchemaDefinition, error) { - avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema) + avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, qvalue.QDWHTypeS3) if err != nil { return nil, fmt.Errorf("failed to define Avro schema: %w", err) } diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 94d66ef416..8a5753680c 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -142,7 +142,7 @@ func TestWriteRecordsToAvroFileHappyPath(t *testing.T) { // Define sample data records, schema := generateRecords(t, true, 10, false) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) + avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) @@ -170,7 +170,7 @@ func TestWriteRecordsToZstdAvroFileHappyPath(t *testing.T) { // Define sample data records, schema := generateRecords(t, true, 10, false) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) + avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) @@ -198,7 +198,7 @@ func TestWriteRecordsToDeflateAvroFileHappyPath(t *testing.T) { // Define sample data records, schema := generateRecords(t, true, 10, false) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) + avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) @@ -225,7 +225,7 @@ func TestWriteRecordsToAvroFileNonNull(t *testing.T) { records, schema := generateRecords(t, false, 10, false) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) + avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) @@ -253,7 +253,7 @@ func TestWriteRecordsToAvroFileAllNulls(t *testing.T) { // Define sample data records, schema := generateRecords(t, true, 10, true) - avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema) + avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake) require.NoError(t, err) t.Logf("[test] avroSchema: %v", avroSchema) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 47484a06f8..9f81d254de 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -216,7 +216,7 @@ func (s *SnowflakeAvroSyncMethod) getAvroSchema( dstTableName string, schema *model.QRecordSchema, ) (*model.QRecordAvroSchemaDefinition, error) { - avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema) + avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, qvalue.QDWHTypeSnowflake) if err != nil { return nil, fmt.Errorf("failed to define Avro schema: %w", err) } diff --git a/flow/go.mod b/flow/go.mod index aa9da75e69..eb3eccd246 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -9,6 +9,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 + github.com/ClickHouse/clickhouse-go/v2 v2.17.1 github.com/aws/aws-sdk-go v1.49.20 github.com/cenkalti/backoff/v4 v4.2.1 github.com/cockroachdb/pebble v0.0.0-20231210175914-b4d301aeb46a @@ -42,6 +43,7 @@ require ( ) require ( + github.com/ClickHouse/ch-go v0.58.2 // indirect github.com/DataDog/zstd v1.5.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -51,18 +53,23 @@ require ( github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/getsentry/sentry-go v0.26.0 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.6.1 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/paulmach/orb v0.10.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect + github.com/segmentio/asm v1.2.0 // indirect + github.com/shopspring/decimal v1.3.1 // indirect github.com/sirupsen/logrus v1.9.3 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect diff --git a/flow/go.sum b/flow/go.sum index 9d9f7dd58d..6c865c3566 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -1,7 +1,5 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.111.0 h1:YHLKNupSD1KqjDbQ3+LVdQ81h/UJbJyZG203cEfnQgM= -cloud.google.com/go v0.111.0/go.mod h1:0mibmpKP1TyOOFYQY5izo0LnT+ecvOQ0Sg3OdmMiNRU= cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM= cloud.google.com/go v0.112.0/go.mod h1:3jEEVwZ/MHU4djK5t5RHuKOA/GbLddgTdVubX1qnPD4= cloud.google.com/go/bigquery v1.57.1 h1:FiULdbbzUxWD0Y4ZGPSVCDLvqRSyCIO6zKV7E2nf5uA= @@ -49,6 +47,10 @@ github.com/Azure/go-amqp v1.0.3/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51 github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 h1:DzHpqpoJVaCgOUdVHxE8QB52S6NiVdDQvGlny1qvPqA= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/ClickHouse/ch-go v0.58.2 h1:jSm2szHbT9MCAB1rJ3WuCJqmGLi5UTjlNu+f530UTS0= +github.com/ClickHouse/ch-go v0.58.2/go.mod h1:Ap/0bEmiLa14gYjCiRkYGbXvbe8vwdrfTYWhsuQ99aw= +github.com/ClickHouse/clickhouse-go/v2 v2.17.1 h1:ZCmAYWpu75IyEi7+Yrs/uaAjiCGY5wfW5kXo64exkX4= +github.com/ClickHouse/clickhouse-go/v2 v2.17.1/go.mod h1:rkGTvFDTLqLIm0ma+13xmcCfr/08Gvs7KmFt1tgiWHQ= github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= @@ -57,8 +59,6 @@ github.com/alecthomas/assert/v2 v2.4.1 h1:mwPZod/d35nlaCppr6sFP0rbCL05WH9fIo7lvs github.com/alecthomas/assert/v2 v2.4.1/go.mod h1:fw5suVxB+wfYJ3291t0hRTqtGzFYdSwstnRQdaQx2DM= github.com/alecthomas/repr v0.3.0 h1:NeYzUPfjjlqHY4KtzgKJiWd6sVq2eNUPTi34PiFGjY8= github.com/alecthomas/repr v0.3.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= -github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI= -github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= @@ -66,8 +66,6 @@ github.com/apache/arrow/go/v12 v12.0.1 h1:JsR2+hzYYjgSUkBSaahpqCetqZMr76djX80fF/ github.com/apache/arrow/go/v12 v12.0.1/go.mod h1:weuTY7JvTG/HDPtMQxEUp7pU73vkLWMLpY67QwZ/WWw= github.com/apache/thrift v0.19.0 h1:sOqkWPzMj7w6XaYbJQG7m4sGqVolaW/0D28Ln7yPzMk= github.com/apache/thrift v0.19.0/go.mod h1:SUALL216IiaOw2Oy+5Vs9lboJ/t9g40C+G07Dc0QC1I= -github.com/aws/aws-sdk-go v1.49.19 h1:oZryiqeQpeJsIcAmZlp86duMu/s/DJ43qyfwa51qmLg= -github.com/aws/aws-sdk-go v1.49.19/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/aws/aws-sdk-go v1.49.20 h1:VgEUq2/ZbUkLbqPyDcxrirfXB+PgiZUUF5XbsgWe2S0= github.com/aws/aws-sdk-go v1.49.20/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/aws/aws-sdk-go-v2 v1.24.1 h1:xAojnj+ktS95YZlDf0zxWBkbFtymPeDP+rvUQIH3uAU= @@ -165,6 +163,10 @@ github.com/getsentry/sentry-go v0.26.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= +github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -225,6 +227,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= @@ -290,6 +293,7 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= @@ -322,12 +326,16 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpsp github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= +github.com/paulmach/orb v0.10.0 h1:guVYVqzxHE/CQ1KpfGO077TR0ATHSNjp4s6XGLn3W9s= +github.com/paulmach/orb v0.10.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -360,6 +368,10 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= @@ -377,6 +389,7 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -385,12 +398,17 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/twpayne/go-geos v0.15.0 h1:L8RCcbaEDfhRz/HhzOvw8fU2s7SzxBLh1sID125EneY= github.com/twpayne/go-geos v0.15.0/go.mod h1:zmBwZNTaMTB1usptcCl4n7FjIDoBi2IGtm6h6nq9G8c= github.com/urfave/cli/v3 v3.0.0-alpha8 h1:H+qxFPoCkGzdF8KUMs2fEOZl5io/1QySgUiGfar8occ= github.com/urfave/cli/v3 v3.0.0-alpha8/go.mod h1:0kK/RUFHyh+yIKSfWxwheGndfnrvYSmYFVeKCh03ZUc= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e h1:+SOyEddqYF09QP7vr7CgJ1eti3pY9Fn3LHO1M1r/0sI= github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -401,6 +419,7 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE= @@ -411,8 +430,8 @@ go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= -go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= -go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= +go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= +go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.temporal.io/api v1.26.0 h1:N4V0Daqa0qqK5+9LELSZV7clBYrwB4l33iaFfKgycPk= @@ -432,12 +451,11 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20240110193028-0dcbfd608b1e h1:723BNChdd0c2Wk6WOE320qGBiPtYx0F0Bbm1kriShfE= -golang.org/x/exp v0.0.0-20240110193028-0dcbfd608b1e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o= golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -524,8 +542,6 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= -golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -575,6 +591,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index 02a52b26d6..6399e54b5d 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -70,12 +70,13 @@ type QRecordAvroSchemaDefinition struct { func GetAvroSchemaDefinition( dstTableName string, qRecordSchema *QRecordSchema, + targetDWH qvalue.QDWHType, ) (*QRecordAvroSchemaDefinition, error) { avroFields := make([]QRecordAvroField, 0, len(qRecordSchema.Fields)) nullableFields := make(map[string]struct{}) for _, qField := range qRecordSchema.Fields { - avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type) + avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, targetDWH) if err != nil { return nil, err } diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 9fafb8a18c..4e8f0a60d3 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -31,7 +31,7 @@ type AvroSchemaNumeric struct { // // For example, QValueKindInt64 would return an AvroLogicalSchema of "long". Unsupported QValueKinds // will return an error. -func GetAvroSchemaFromQValueKind(kind QValueKind) (interface{}, error) { +func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType) (interface{}, error) { switch kind { case QValueKindString, QValueKindUUID: return "string", nil @@ -48,6 +48,9 @@ func GetAvroSchemaFromQValueKind(kind QValueKind) (interface{}, error) { case QValueKindBytes, QValueKindBit: return "bytes", nil case QValueKindNumeric: + if targetDWH == QDWHTypeClickhouse { + return "double", nil + } return AvroSchemaNumeric{ Type: "bytes", LogicalType: "decimal", @@ -55,6 +58,9 @@ func GetAvroSchemaFromQValueKind(kind QValueKind) (interface{}, error) { Scale: 9, }, nil case QValueKindTime, QValueKindTimeTZ, QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ: + if targetDWH == QDWHTypeClickhouse { + return "long", nil + } return "string", nil case QValueKindHStore, QValueKindJSON, QValueKindStruct: return "string", nil @@ -122,6 +128,14 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return t.(string), nil } } + + if c.TargetDWH == QDWHTypeClickhouse { + if c.Nullable { + return c.processNullableUnion("long", t.(int64)) + } else { + return t.(int64), nil + } + } if c.Nullable { return goavro.Union("long.timestamp-micros", t.(int64)), nil } else { @@ -233,6 +247,18 @@ func (c *QValueAvroConverter) processBytes() (interface{}, error) { return nil, nil } + if c.TargetDWH == QDWHTypeClickhouse { + bigNum, ok := c.Value.Value.(*big.Rat) + if !ok { + return nil, fmt.Errorf("invalid Numeric value: expected float64, got %T", c.Value.Value) + } + num, ok := bigNum.Float64() + if !ok { + return nil, fmt.Errorf("not able to convert bigNum to float64 %+v", bigNum) + } + return goavro.Union("double", num), nil + } + byteData, ok := c.Value.Value.([]byte) if !ok { return nil, fmt.Errorf("invalid Bytes value") diff --git a/flow/model/qvalue/dwh_type.go b/flow/model/qvalue/dwh_type.go index 806c469f77..293e396847 100644 --- a/flow/model/qvalue/dwh_type.go +++ b/flow/model/qvalue/dwh_type.go @@ -3,6 +3,8 @@ package qvalue type QDWHType int const ( - QDWHTypeSnowflake QDWHType = 2 - QDWHTypeBigQuery QDWHType = 3 + QDWHTypeS3 QDWHType = 1 + QDWHTypeSnowflake QDWHType = 2 + QDWHTypeBigQuery QDWHType = 3 + QDWHTypeClickhouse QDWHType = 4 ) diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 7099d418c0..175430f92e 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -78,6 +78,39 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindArrayString: "VARIANT", } +var QValueKindToClickhouseTypeMap = map[QValueKind]string{ + QValueKindBoolean: "Bool", + QValueKindInt16: "Int16", + QValueKindInt32: "Int32", + QValueKindInt64: "Int64", + QValueKindFloat32: "Float32", + QValueKindFloat64: "Float64", + QValueKindNumeric: "Float64", + QValueKindString: "String", + QValueKindJSON: "String", + QValueKindTimestamp: "DateTime64(6)", + QValueKindTimestampTZ: "TIMESTAMP", + QValueKindTime: "TIME", + QValueKindDate: "DATE", + QValueKindBit: "Boolean", + QValueKindBytes: "String", + QValueKindStruct: "String", + QValueKindUUID: "String", + QValueKindTimeTZ: "String", + QValueKindInvalid: "String", + QValueKindHStore: "String", + QValueKindGeography: "GEOGRAPHY", + QValueKindGeometry: "GEOMETRY", + QValueKindPoint: "GEOMETRY", + + // array types will be mapped to VARIANT + QValueKindArrayFloat32: "Array(Float32)", + QValueKindArrayFloat64: "Array(Float64)", + QValueKindArrayInt32: "Array(Int32)", + QValueKindArrayInt64: "Array(Int64)", + QValueKindArrayString: "Array(String)", +} + func (kind QValueKind) ToDWHColumnType(dwhType QDWHType) (string, error) { if dwhType != QDWHTypeSnowflake { return "", fmt.Errorf("unsupported DWH type: %v", dwhType)