Skip to content

Commit

Permalink
[clickhouse] misc type fixes (#2181)
Browse files Browse the repository at this point in the history
1. bytes was inserted raw during initial load and base64 encoded in CDC,
synchronize to base64
2. timestamp types were inserted with precision 3 in CDC as opposed to 6
in initial load, synchronized to 6
3. Time and TimeTZ are inserted as `DateTime(6)`
4. Fix handling of `ArrayDate` `ArrayTimestamp` and `ArrayTimestampTZ`
  • Loading branch information
heavycrystal authored Oct 24, 2024
1 parent 2c9fb2e commit 44f0031
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 59 deletions.
8 changes: 4 additions & 4 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,26 +332,26 @@ func (c *ClickHouseConnector) NormalizeRecords(
switch clickHouseType {
case "Date32", "Nullable(Date32)":
projection.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,",
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6)) AS `%s`,",
colName,
dstColName,
))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'))) AS `%s`,",
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6)) AS `%s`,",
colName,
dstColName,
))
}
case "DateTime64(6)", "Nullable(DateTime64(6))":
projection.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s')) AS `%s`,",
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6) AS `%s`,",
colName,
dstColName,
))
if enablePrimaryUpdate {
projectionUpdate.WriteString(fmt.Sprintf(
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s')) AS `%s`,",
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6) AS `%s`,",
colName,
dstColName,
))
Expand Down
106 changes: 60 additions & 46 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package qvalue

import (
"encoding/base64"
"errors"
"fmt"
"log/slog"
Expand All @@ -15,7 +16,19 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
)

type AvroSchemaField struct {
Name string `json:"name"`
Type interface{} `json:"type"`
LogicalType string `json:"logicalType,omitempty"`
}

type AvroSchemaLogical struct {
Type string `json:"type"`
LogicalType string `json:"logicalType,omitempty"`
}

// https://avro.apache.org/docs/1.11.0/spec.html
// please make this generic at some point
type AvroSchemaArray struct {
Type string `json:"type"`
Items string `json:"items"`
Expand All @@ -39,17 +52,6 @@ type AvroSchemaRecord struct {
Fields []AvroSchemaField `json:"fields"`
}

type AvroSchemaLogical struct {
Type string `json:"type"`
LogicalType string `json:"logicalType,omitempty"`
}

type AvroSchemaField struct {
Name string `json:"name"`
Type interface{} `json:"type"`
LogicalType string `json:"logicalType,omitempty"`
}

func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) {
if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY {
bidigi := datatypes.CountDigits(num.BigInt())
Expand Down Expand Up @@ -96,6 +98,9 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci
case QValueKindBoolean:
return "boolean", nil
case QValueKindBytes:
if targetDWH == protos.DBType_CLICKHOUSE {
return "string", nil
}
return "bytes", nil
case QValueKindNumeric:
avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH)
Expand All @@ -105,18 +110,20 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci
Precision: avroNumericPrecision,
Scale: avroNumericScale,
}, nil
case QValueKindTime, QValueKindTimeTZ, QValueKindDate:
case QValueKindDate:
if targetDWH == protos.DBType_CLICKHOUSE {
if kind == QValueKindTime {
return "string", nil
}
if kind == QValueKindDate {
return AvroSchemaLogical{
Type: "int",
LogicalType: "date",
}, nil
}
return "long", nil
return AvroSchemaLogical{
Type: "int",
LogicalType: "date",
}, nil
}
return "string", nil
case QValueKindTime, QValueKindTimeTZ:
if targetDWH == protos.DBType_CLICKHOUSE {
return AvroSchemaLogical{
Type: "long",
LogicalType: "time-micros",
}, nil
}
return "string", nil
case QValueKindTimestamp, QValueKindTimestampTZ:
Expand Down Expand Up @@ -155,11 +162,29 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci
Items: "boolean",
}, nil
case QValueKindArrayDate:
if targetDWH == protos.DBType_CLICKHOUSE {
return AvroSchemaComplexArray{
Type: "array",
Items: AvroSchemaField{
Type: "int",
LogicalType: "date",
},
}, nil
}
return AvroSchemaArray{
Type: "array",
Items: "string",
}, nil
case QValueKindArrayTimestamp, QValueKindArrayTimestampTZ:
if targetDWH == protos.DBType_CLICKHOUSE {
return AvroSchemaComplexArray{
Type: "array",
Items: AvroSchemaField{
Type: "long",
LogicalType: "timestamp-micros",
},
}, nil
}
return AvroSchemaArray{
Type: "array",
Items: "string",
Expand Down Expand Up @@ -203,26 +228,18 @@ func QValueToAvro(value QValue, field *QField, targetDWH protos.DBType, logger l
if t == nil {
return nil, nil
}

if c.TargetDWH == protos.DBType_SNOWFLAKE {
if c.Nullable {
return c.processNullableUnion("string", t.(string))
} else {
return t.(string), nil
}
}

if c.TargetDWH == protos.DBType_CLICKHOUSE {
} else {
if c.Nullable {
return c.processNullableUnion("string", t.(string))
} else {
return t.(string), nil
return goavro.Union("long.time-micros", t.(int64)), nil
}
return t.(int64), nil
}
if c.Nullable {
return goavro.Union("long.time-micros", t.(int64)), nil
}
return t.(int64), nil
case QValueTimeTZ:
t := c.processGoTimeTZ(v.Val)
if t == nil {
Expand All @@ -234,19 +251,12 @@ func QValueToAvro(value QValue, field *QField, targetDWH protos.DBType, logger l
} else {
return t.(string), nil
}
}

if c.TargetDWH == protos.DBType_CLICKHOUSE {
} else {
if c.Nullable {
return c.processNullableUnion("long", t.(int64))
} else {
return t.(int64), nil
return goavro.Union("long.time-micros", t.(int64)), nil
}
return t.(int64), nil
}
if c.Nullable {
return goavro.Union("long.time-micros", t.(int64)), nil
}
return t.(int64), nil
case QValueTimestamp:
t := c.processGoTimestamp(v.Val)
if t == nil {
Expand Down Expand Up @@ -374,9 +384,6 @@ func (c *QValueAvroConverter) processGoTime(t time.Time) interface{} {
if c.TargetDWH == protos.DBType_SNOWFLAKE {
return t.Format("15:04:05.999999")
}
if c.TargetDWH == protos.DBType_CLICKHOUSE {
return t.Format("15:04:05.999999")
}

return t.UnixMicro()
}
Expand Down Expand Up @@ -455,6 +462,13 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} {
}

func (c *QValueAvroConverter) processBytes(byteData []byte) interface{} {
if c.TargetDWH == protos.DBType_CLICKHOUSE {
encoded := base64.StdEncoding.EncodeToString(byteData)
if c.Nullable {
return goavro.Union("string", encoded)
}
return encoded
}
if c.Nullable {
return goavro.Union("bytes", byteData)
}
Expand Down
21 changes: 12 additions & 9 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,26 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{
QValueKindJSON: "String",
QValueKindTimestamp: "DateTime64(6)",
QValueKindTimestampTZ: "DateTime64(6)",
QValueKindTime: "String",
QValueKindTime: "DateTime64(6)",
QValueKindTimeTZ: "DateTime64(6)",
QValueKindDate: "Date32",
QValueKindBytes: "String",
QValueKindStruct: "String",
QValueKindUUID: "UUID",
QValueKindTimeTZ: "String",
QValueKindInvalid: "String",
QValueKindHStore: "String",

// array types will be mapped to VARIANT
QValueKindArrayFloat32: "Array(Float32)",
QValueKindArrayFloat64: "Array(Float64)",
QValueKindArrayInt32: "Array(Int32)",
QValueKindArrayInt64: "Array(Int64)",
QValueKindArrayString: "Array(String)",
QValueKindArrayBoolean: "Array(Bool)",
QValueKindArrayInt16: "Array(Int16)",
QValueKindArrayFloat32: "Array(Float32)",
QValueKindArrayFloat64: "Array(Float64)",
QValueKindArrayInt32: "Array(Int32)",
QValueKindArrayInt64: "Array(Int64)",
QValueKindArrayString: "Array(String)",
QValueKindArrayBoolean: "Array(Bool)",
QValueKindArrayInt16: "Array(Int16)",
QValueKindArrayDate: "Array(Date)",
QValueKindArrayTimestamp: "Array(DateTime64(6))",
QValueKindArrayTimestampTZ: "Array(DateTime64(6))",
}

func (kind QValueKind) ToDWHColumnType(dwhType protos.DBType) (string, error) {
Expand Down

0 comments on commit 44f0031

Please sign in to comment.