From 58de0973660e39862b1356c47c08f4e3567c3096 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 19 Feb 2024 22:36:36 +0530 Subject: [PATCH] Clickhouse: type mapping change and cleanup (#1326) - Fix syncing of Timestamp and TimestampTZ - Map Timestamps and Dates to `Nullable(type)` because epoch in most cases not a sane default. - Remove dead code --- flow/connectors/clickhouse/client.go | 35 ------------------ flow/connectors/clickhouse/normalize.go | 31 +++++++++------- flow/connectors/clickhouse/qvalue_convert.go | 38 -------------------- flow/model/qvalue/avro_converter.go | 29 ++++++--------- flow/model/qvalue/kind.go | 6 +--- 5 files changed, 31 insertions(+), 108 deletions(-) delete mode 100644 flow/connectors/clickhouse/client.go diff --git a/flow/connectors/clickhouse/client.go b/flow/connectors/clickhouse/client.go deleted file mode 100644 index 8bed8f48ed..0000000000 --- a/flow/connectors/clickhouse/client.go +++ /dev/null @@ -1,35 +0,0 @@ -package connclickhouse - -import ( - "context" - "fmt" - - "github.com/jmoiron/sqlx" - - peersql "github.com/PeerDB-io/peer-flow/connectors/sql" - "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/logger" - "github.com/PeerDB-io/peer-flow/model/qvalue" -) - -type ClickhouseClient struct { - peersql.GenericSQLQueryExecutor - Config *protos.ClickhouseConfig -} - -func NewClickhouseClient(ctx context.Context, config *protos.ClickhouseConfig) (*ClickhouseClient, error) { - databaseSql, err := connect(ctx, config) - database := sqlx.NewDb(databaseSql, "clickhouse") // Use the appropriate driver name - - if err != nil { - return nil, fmt.Errorf("failed to open connection to Snowflake peer: %w", err) - } - - genericExecutor := *peersql.NewGenericSQLQueryExecutor( - logger.LoggerFromCtx(ctx), database, clickhouseTypeToQValueKindMap, qvalue.QValueKindToSnowflakeTypeMap) - - return &ClickhouseClient{ - GenericSQLQueryExecutor: genericExecutor, - Config: config, - }, nil -} diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index a1e6e43c46..bd6869ec4b 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -3,6 +3,7 @@ package connclickhouse import ( "context" "database/sql" + "errors" "fmt" "strconv" "strings" @@ -71,7 +72,7 @@ func generateCreateTableSQLForNormalizedTable( syncedAtColName string, ) (string, error) { var stmtBuilder strings.Builder - stmtBuilder.WriteString(fmt.Sprintf("CREATE TABLE `%s` (", normalizedTable)) + stmtBuilder.WriteString(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` (", normalizedTable)) for _, column := range tableSchema.Columns { colName := column.Name @@ -81,7 +82,8 @@ func generateCreateTableSQLForNormalizedTable( return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err) } - if colType == qvalue.QValueKindNumeric { + switch colType { + case qvalue.QValueKindNumeric: precision, scale := numeric.ParseNumericTypmod(column.TypeModifier) if column.TypeModifier == -1 || precision > 76 || scale > precision { precision = numeric.PeerDBClickhousePrecision @@ -89,13 +91,14 @@ func generateCreateTableSQLForNormalizedTable( } stmtBuilder.WriteString(fmt.Sprintf("`%s` DECIMAL(%d, %d), ", colName, precision, scale)) - } else { + case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ, qvalue.QValueKindDate: + // 1st Jan 1970 is not a sane default which clickhouse sets, so we use nullable + stmtBuilder.WriteString(fmt.Sprintf("`%s` Nullable(%s), ", colName, clickhouseType)) + default: stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", colName, clickhouseType)) } } - // TODO support soft delete - // synced at column will be added to all normalized tables if syncedAtColName != "" { colName := strings.ToLower(syncedAtColName) @@ -176,17 +179,21 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N if err != nil { return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) } - if clickhouseType == "DateTime64(6)" || clickhouseType == "UUID" { - clickhouseType = "String" - } - if clickhouseType == "Date" { + switch clickhouseType { + case "Date": projection.WriteString(fmt.Sprintf( "toDate(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,", cn, cn, )) - } else { + case "DateTime64(6)": + projection.WriteString(fmt.Sprintf( + "parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s')) AS `%s`,", + cn, + cn, + )) + default: projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", cn, clickhouseType, cn)) } } @@ -220,7 +227,7 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N insertIntoSelectQuery.WriteString(selectQuery.String()) q := insertIntoSelectQuery.String() - c.logger.Info(fmt.Sprintf("[clickhouse] insert into select query %s", q)) + c.logger.Info("[clickhouse] insert into select query " + q) _, err = c.database.ExecContext(ctx, q) if err != nil { @@ -269,7 +276,7 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch( } if !tableName.Valid { - return nil, fmt.Errorf("table name is not valid") + return nil, errors.New("table name is not valid") } tableNames = append(tableNames, tableName.String) diff --git a/flow/connectors/clickhouse/qvalue_convert.go b/flow/connectors/clickhouse/qvalue_convert.go index 8ec8a52f0f..97d39221dc 100644 --- a/flow/connectors/clickhouse/qvalue_convert.go +++ b/flow/connectors/clickhouse/qvalue_convert.go @@ -4,44 +4,6 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -// TODO: remove extra types from here -var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{ - "INT": qvalue.QValueKindInt32, - "Int64": qvalue.QValueKindInt64, - "Int16": qvalue.QValueKindInt16, - "Float64": qvalue.QValueKindFloat64, - "DOUBLE": qvalue.QValueKindFloat64, - "REAL": qvalue.QValueKindFloat64, - "VARCHAR": qvalue.QValueKindString, - "CHAR": qvalue.QValueKindString, - "TEXT": qvalue.QValueKindString, - "String": qvalue.QValueKindString, - "FixedString(1)": qvalue.QValueKindQChar, - "Bool": qvalue.QValueKindBoolean, - "DateTime": qvalue.QValueKindTimestamp, - "TIMESTAMP": qvalue.QValueKindTimestamp, - "DateTime64(6)": qvalue.QValueKindTimestamp, - "TIMESTAMP_NTZ": qvalue.QValueKindTimestamp, - "TIMESTAMP_TZ": qvalue.QValueKindTimestampTZ, - "TIME": qvalue.QValueKindTime, - "UUID": qvalue.QValueKindUUID, - "DATE": qvalue.QValueKindDate, - "BLOB": qvalue.QValueKindBytes, - "BYTEA": qvalue.QValueKindBytes, - "BINARY": qvalue.QValueKindBytes, - "FIXED": qvalue.QValueKindNumeric, - "NUMBER": qvalue.QValueKindNumeric, - "DECIMAL": qvalue.QValueKindNumeric, - "NUMERIC": qvalue.QValueKindNumeric, - "VARIANT": qvalue.QValueKindJSON, - "GEOMETRY": qvalue.QValueKindGeometry, - "GEOGRAPHY": qvalue.QValueKindGeography, - "Array(String)": qvalue.QValueKindArrayString, - "Array(Int32)": qvalue.QValueKindArrayInt32, - "Array(Int64)": qvalue.QValueKindArrayInt64, - "Array(Float64)": qvalue.QValueKindArrayFloat64, -} - func qValueKindToClickhouseType(colType qvalue.QValueKind) (string, error) { val, err := colType.ToDWHColumnType(qvalue.QDWHTypeClickhouse) if err != nil { diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 28a4da4e30..017e16962c 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -85,7 +85,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision Precision: avroNumericPrecision, Scale: avroNumericScale, }, nil - case QValueKindTime, QValueKindTimeTZ, QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ: + case QValueKindTime, QValueKindTimeTZ, QValueKindDate: if targetDWH == QDWHTypeClickhouse { if kind == QValueKindTime { return "string", nil @@ -99,6 +99,14 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision return "long", nil } return "string", nil + case QValueKindTimestamp, QValueKindTimestampTZ: + if targetDWH == QDWHTypeClickhouse { + return AvroSchemaLogical{ + Type: "long", + LogicalType: "timestamp-micros", + }, nil + } + return "string", nil case QValueKindHStore, QValueKindJSON, QValueKindStruct: return "string", nil case QValueKindArrayFloat32: @@ -179,6 +187,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if err != nil || t == nil { return t, err } + if c.TargetDWH == QDWHTypeSnowflake { if c.Nullable { return c.processNullableUnion("string", t.(string)) @@ -235,14 +244,6 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } } - if c.TargetDWH == QDWHTypeClickhouse { - if c.Nullable { - return c.processNullableUnion("long", t.(int64)) - } else { - return t.(int64), nil - } - } - if c.Nullable { return goavro.Union("long.timestamp-micros", t.(int64)), nil } @@ -260,14 +261,6 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { } } - if c.TargetDWH == QDWHTypeClickhouse { - if c.Nullable { - return c.processNullableUnion("long", t.(int64)) - } else { - return t.(int64), nil - } - } - if c.Nullable { return goavro.Union("long.timestamp-micros", t.(int64)), nil } @@ -396,10 +389,10 @@ func (c *QValueAvroConverter) processGoTime() (interface{}, error) { if c.TargetDWH == QDWHTypeSnowflake { return t.Format("15:04:05.999999"), nil } - if c.TargetDWH == QDWHTypeClickhouse { return t.Format("15:04:05.999999"), nil } + return t.UnixMicro(), nil } diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 78c9ece45b..9def7821f4 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -107,7 +107,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindString: "String", QValueKindJSON: "String", QValueKindTimestamp: "DateTime64(6)", - QValueKindTimestampTZ: "TIMESTAMP", + QValueKindTimestampTZ: "DateTime64(6)", QValueKindTime: "String", QValueKindDate: "Date", QValueKindBit: "Boolean", @@ -117,10 +117,6 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindTimeTZ: "String", QValueKindInvalid: "String", QValueKindHStore: "String", - QValueKindGeography: "GEOGRAPHY", - QValueKindGeometry: "GEOMETRY", - QValueKindPoint: "GEOMETRY", - // array types will be mapped to VARIANT QValueKindArrayFloat32: "Array(Float32)", QValueKindArrayFloat64: "Array(Float64)",