Skip to content

Commit

Permalink
Clickhouse: type mapping change and cleanup (#1326)
Browse files Browse the repository at this point in the history
- Fix syncing of Timestamp and TimestampTZ
- Map Timestamps and Dates to `Nullable(type)` because epoch in most
cases not a sane default.
- Remove dead code
  • Loading branch information
Amogh-Bharadwaj authored Feb 19, 2024
1 parent 457de35 commit 58de097
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 108 deletions.
35 changes: 0 additions & 35 deletions flow/connectors/clickhouse/client.go

This file was deleted.

31 changes: 19 additions & 12 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connclickhouse
import (
"context"
"database/sql"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -71,7 +72,7 @@ func generateCreateTableSQLForNormalizedTable(
syncedAtColName string,
) (string, error) {
var stmtBuilder strings.Builder
stmtBuilder.WriteString(fmt.Sprintf("CREATE TABLE `%s` (", normalizedTable))
stmtBuilder.WriteString(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` (", normalizedTable))

for _, column := range tableSchema.Columns {
colName := column.Name
Expand All @@ -81,21 +82,23 @@ func generateCreateTableSQLForNormalizedTable(
return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}

if colType == qvalue.QValueKindNumeric {
switch colType {
case qvalue.QValueKindNumeric:
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 76 || scale > precision {
precision = numeric.PeerDBClickhousePrecision
scale = numeric.PeerDBClickhouseScale
}
stmtBuilder.WriteString(fmt.Sprintf("`%s` DECIMAL(%d, %d), ",
colName, precision, scale))
} else {
case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ, qvalue.QValueKindDate:
// 1st Jan 1970 is not a sane default which clickhouse sets, so we use nullable
stmtBuilder.WriteString(fmt.Sprintf("`%s` Nullable(%s), ", colName, clickhouseType))
default:
stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", colName, clickhouseType))
}
}

// TODO support soft delete

// synced at column will be added to all normalized tables
if syncedAtColName != "" {
colName := strings.ToLower(syncedAtColName)
Expand Down Expand Up @@ -176,17 +179,21 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N
if err != nil {
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
if clickhouseType == "DateTime64(6)" || clickhouseType == "UUID" {
clickhouseType = "String"
}

if clickhouseType == "Date" {
switch clickhouseType {
case "Date":
projection.WriteString(fmt.Sprintf(
"toDate(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,",
cn,
cn,
))
} else {
case "DateTime64(6)":
projection.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s')) AS `%s`,",
cn,
cn,
))
default:
projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", cn, clickhouseType, cn))
}
}
Expand Down Expand Up @@ -220,7 +227,7 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N
insertIntoSelectQuery.WriteString(selectQuery.String())

q := insertIntoSelectQuery.String()
c.logger.Info(fmt.Sprintf("[clickhouse] insert into select query %s", q))
c.logger.Info("[clickhouse] insert into select query " + q)

_, err = c.database.ExecContext(ctx, q)
if err != nil {
Expand Down Expand Up @@ -269,7 +276,7 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch(
}

if !tableName.Valid {
return nil, fmt.Errorf("table name is not valid")
return nil, errors.New("table name is not valid")
}

tableNames = append(tableNames, tableName.String)
Expand Down
38 changes: 0 additions & 38 deletions flow/connectors/clickhouse/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,6 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

// TODO: remove extra types from here
var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{
"INT": qvalue.QValueKindInt32,
"Int64": qvalue.QValueKindInt64,
"Int16": qvalue.QValueKindInt16,
"Float64": qvalue.QValueKindFloat64,
"DOUBLE": qvalue.QValueKindFloat64,
"REAL": qvalue.QValueKindFloat64,
"VARCHAR": qvalue.QValueKindString,
"CHAR": qvalue.QValueKindString,
"TEXT": qvalue.QValueKindString,
"String": qvalue.QValueKindString,
"FixedString(1)": qvalue.QValueKindQChar,
"Bool": qvalue.QValueKindBoolean,
"DateTime": qvalue.QValueKindTimestamp,
"TIMESTAMP": qvalue.QValueKindTimestamp,
"DateTime64(6)": qvalue.QValueKindTimestamp,
"TIMESTAMP_NTZ": qvalue.QValueKindTimestamp,
"TIMESTAMP_TZ": qvalue.QValueKindTimestampTZ,
"TIME": qvalue.QValueKindTime,
"UUID": qvalue.QValueKindUUID,
"DATE": qvalue.QValueKindDate,
"BLOB": qvalue.QValueKindBytes,
"BYTEA": qvalue.QValueKindBytes,
"BINARY": qvalue.QValueKindBytes,
"FIXED": qvalue.QValueKindNumeric,
"NUMBER": qvalue.QValueKindNumeric,
"DECIMAL": qvalue.QValueKindNumeric,
"NUMERIC": qvalue.QValueKindNumeric,
"VARIANT": qvalue.QValueKindJSON,
"GEOMETRY": qvalue.QValueKindGeometry,
"GEOGRAPHY": qvalue.QValueKindGeography,
"Array(String)": qvalue.QValueKindArrayString,
"Array(Int32)": qvalue.QValueKindArrayInt32,
"Array(Int64)": qvalue.QValueKindArrayInt64,
"Array(Float64)": qvalue.QValueKindArrayFloat64,
}

func qValueKindToClickhouseType(colType qvalue.QValueKind) (string, error) {
val, err := colType.ToDWHColumnType(qvalue.QDWHTypeClickhouse)
if err != nil {
Expand Down
29 changes: 11 additions & 18 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision
Precision: avroNumericPrecision,
Scale: avroNumericScale,
}, nil
case QValueKindTime, QValueKindTimeTZ, QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ:
case QValueKindTime, QValueKindTimeTZ, QValueKindDate:
if targetDWH == QDWHTypeClickhouse {
if kind == QValueKindTime {
return "string", nil
Expand All @@ -99,6 +99,14 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision
return "long", nil
}
return "string", nil
case QValueKindTimestamp, QValueKindTimestampTZ:
if targetDWH == QDWHTypeClickhouse {
return AvroSchemaLogical{
Type: "long",
LogicalType: "timestamp-micros",
}, nil
}
return "string", nil
case QValueKindHStore, QValueKindJSON, QValueKindStruct:
return "string", nil
case QValueKindArrayFloat32:
Expand Down Expand Up @@ -179,6 +187,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
if err != nil || t == nil {
return t, err
}

if c.TargetDWH == QDWHTypeSnowflake {
if c.Nullable {
return c.processNullableUnion("string", t.(string))
Expand Down Expand Up @@ -235,14 +244,6 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
}
}

if c.TargetDWH == QDWHTypeClickhouse {
if c.Nullable {
return c.processNullableUnion("long", t.(int64))
} else {
return t.(int64), nil
}
}

if c.Nullable {
return goavro.Union("long.timestamp-micros", t.(int64)), nil
}
Expand All @@ -260,14 +261,6 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
}
}

if c.TargetDWH == QDWHTypeClickhouse {
if c.Nullable {
return c.processNullableUnion("long", t.(int64))
} else {
return t.(int64), nil
}
}

if c.Nullable {
return goavro.Union("long.timestamp-micros", t.(int64)), nil
}
Expand Down Expand Up @@ -396,10 +389,10 @@ func (c *QValueAvroConverter) processGoTime() (interface{}, error) {
if c.TargetDWH == QDWHTypeSnowflake {
return t.Format("15:04:05.999999"), nil
}

if c.TargetDWH == QDWHTypeClickhouse {
return t.Format("15:04:05.999999"), nil
}

return t.UnixMicro(), nil
}

Expand Down
6 changes: 1 addition & 5 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{
QValueKindString: "String",
QValueKindJSON: "String",
QValueKindTimestamp: "DateTime64(6)",
QValueKindTimestampTZ: "TIMESTAMP",
QValueKindTimestampTZ: "DateTime64(6)",
QValueKindTime: "String",
QValueKindDate: "Date",
QValueKindBit: "Boolean",
Expand All @@ -117,10 +117,6 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{
QValueKindTimeTZ: "String",
QValueKindInvalid: "String",
QValueKindHStore: "String",
QValueKindGeography: "GEOGRAPHY",
QValueKindGeometry: "GEOMETRY",
QValueKindPoint: "GEOMETRY",

// array types will be mapped to VARIANT
QValueKindArrayFloat32: "Array(Float32)",
QValueKindArrayFloat64: "Array(Float64)",
Expand Down

0 comments on commit 58de097

Please sign in to comment.