diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 6e0bfb9710..94009cd37a 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -332,26 +332,26 @@ func (c *ClickHouseConnector) NormalizeRecords( switch clickHouseType { case "Date32", "Nullable(Date32)": projection.WriteString(fmt.Sprintf( - "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,", + "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6)) AS `%s`,", colName, dstColName, )) if enablePrimaryUpdate { projectionUpdate.WriteString(fmt.Sprintf( - "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'))) AS `%s`,", + "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6)) AS `%s`,", colName, dstColName, )) } case "DateTime64(6)", "Nullable(DateTime64(6))": projection.WriteString(fmt.Sprintf( - "parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s')) AS `%s`,", + "parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6) AS `%s`,", colName, dstColName, )) if enablePrimaryUpdate { projectionUpdate.WriteString(fmt.Sprintf( - "parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s')) AS `%s`,", + "parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6) AS `%s`,", colName, dstColName, )) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 648a6aa7ac..c2382e5b0c 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -1,6 +1,7 @@ package qvalue import ( + "encoding/base64" "errors" "fmt" "log/slog" @@ -15,7 +16,19 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" ) +type AvroSchemaField struct { + Name string `json:"name"` + Type interface{} `json:"type"` + LogicalType string `json:"logicalType,omitempty"` +} + +type AvroSchemaLogical struct { + Type string `json:"type"` + LogicalType string `json:"logicalType,omitempty"` +} + // https://avro.apache.org/docs/1.11.0/spec.html +// please make this generic at some point type AvroSchemaArray struct { Type string `json:"type"` Items string `json:"items"` @@ -39,17 +52,6 @@ type AvroSchemaRecord struct { Fields []AvroSchemaField `json:"fields"` } -type AvroSchemaLogical struct { - Type string `json:"type"` - LogicalType string `json:"logicalType,omitempty"` -} - -type AvroSchemaField struct { - Name string `json:"name"` - Type interface{} `json:"type"` - LogicalType string `json:"logicalType,omitempty"` -} - func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) { if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY { bidigi := datatypes.CountDigits(num.BigInt()) @@ -96,6 +98,9 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci case QValueKindBoolean: return "boolean", nil case QValueKindBytes: + if targetDWH == protos.DBType_CLICKHOUSE { + return "string", nil + } return "bytes", nil case QValueKindNumeric: avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH) @@ -105,18 +110,20 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci Precision: avroNumericPrecision, Scale: avroNumericScale, }, nil - case QValueKindTime, QValueKindTimeTZ, QValueKindDate: + case QValueKindDate: if targetDWH == protos.DBType_CLICKHOUSE { - if kind == QValueKindTime { - return "string", nil - } - if kind == QValueKindDate { - return AvroSchemaLogical{ - Type: "int", - LogicalType: "date", - }, nil - } - return "long", nil + return AvroSchemaLogical{ + Type: "int", + LogicalType: "date", + }, nil + } + return "string", nil + case QValueKindTime, QValueKindTimeTZ: + if targetDWH == protos.DBType_CLICKHOUSE { + return AvroSchemaLogical{ + Type: "long", + LogicalType: "time-micros", + }, nil } return "string", nil case QValueKindTimestamp, QValueKindTimestampTZ: @@ -155,11 +162,29 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci Items: "boolean", }, nil case QValueKindArrayDate: + if targetDWH == protos.DBType_CLICKHOUSE { + return AvroSchemaComplexArray{ + Type: "array", + Items: AvroSchemaField{ + Type: "int", + LogicalType: "date", + }, + }, nil + } return AvroSchemaArray{ Type: "array", Items: "string", }, nil case QValueKindArrayTimestamp, QValueKindArrayTimestampTZ: + if targetDWH == protos.DBType_CLICKHOUSE { + return AvroSchemaComplexArray{ + Type: "array", + Items: AvroSchemaField{ + Type: "long", + LogicalType: "timestamp-micros", + }, + }, nil + } return AvroSchemaArray{ Type: "array", Items: "string", @@ -203,26 +228,18 @@ func QValueToAvro(value QValue, field *QField, targetDWH protos.DBType, logger l if t == nil { return nil, nil } - if c.TargetDWH == protos.DBType_SNOWFLAKE { if c.Nullable { return c.processNullableUnion("string", t.(string)) } else { return t.(string), nil } - } - - if c.TargetDWH == protos.DBType_CLICKHOUSE { + } else { if c.Nullable { - return c.processNullableUnion("string", t.(string)) - } else { - return t.(string), nil + return goavro.Union("long.time-micros", t.(int64)), nil } + return t.(int64), nil } - if c.Nullable { - return goavro.Union("long.time-micros", t.(int64)), nil - } - return t.(int64), nil case QValueTimeTZ: t := c.processGoTimeTZ(v.Val) if t == nil { @@ -234,19 +251,12 @@ func QValueToAvro(value QValue, field *QField, targetDWH protos.DBType, logger l } else { return t.(string), nil } - } - - if c.TargetDWH == protos.DBType_CLICKHOUSE { + } else { if c.Nullable { - return c.processNullableUnion("long", t.(int64)) - } else { - return t.(int64), nil + return goavro.Union("long.time-micros", t.(int64)), nil } + return t.(int64), nil } - if c.Nullable { - return goavro.Union("long.time-micros", t.(int64)), nil - } - return t.(int64), nil case QValueTimestamp: t := c.processGoTimestamp(v.Val) if t == nil { @@ -374,9 +384,6 @@ func (c *QValueAvroConverter) processGoTime(t time.Time) interface{} { if c.TargetDWH == protos.DBType_SNOWFLAKE { return t.Format("15:04:05.999999") } - if c.TargetDWH == protos.DBType_CLICKHOUSE { - return t.Format("15:04:05.999999") - } return t.UnixMicro() } @@ -455,6 +462,13 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} { } func (c *QValueAvroConverter) processBytes(byteData []byte) interface{} { + if c.TargetDWH == protos.DBType_CLICKHOUSE { + encoded := base64.StdEncoding.EncodeToString(byteData) + if c.Nullable { + return goavro.Union("string", encoded) + } + return encoded + } if c.Nullable { return goavro.Union("bytes", byteData) } diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 669105a13e..79e8f89e40 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -109,23 +109,26 @@ var QValueKindToClickHouseTypeMap = map[QValueKind]string{ QValueKindJSON: "String", QValueKindTimestamp: "DateTime64(6)", QValueKindTimestampTZ: "DateTime64(6)", - QValueKindTime: "String", + QValueKindTime: "DateTime64(6)", + QValueKindTimeTZ: "DateTime64(6)", QValueKindDate: "Date32", QValueKindBytes: "String", QValueKindStruct: "String", QValueKindUUID: "UUID", - QValueKindTimeTZ: "String", QValueKindInvalid: "String", QValueKindHStore: "String", // array types will be mapped to VARIANT - QValueKindArrayFloat32: "Array(Float32)", - QValueKindArrayFloat64: "Array(Float64)", - QValueKindArrayInt32: "Array(Int32)", - QValueKindArrayInt64: "Array(Int64)", - QValueKindArrayString: "Array(String)", - QValueKindArrayBoolean: "Array(Bool)", - QValueKindArrayInt16: "Array(Int16)", + QValueKindArrayFloat32: "Array(Float32)", + QValueKindArrayFloat64: "Array(Float64)", + QValueKindArrayInt32: "Array(Int32)", + QValueKindArrayInt64: "Array(Int64)", + QValueKindArrayString: "Array(String)", + QValueKindArrayBoolean: "Array(Bool)", + QValueKindArrayInt16: "Array(Int16)", + QValueKindArrayDate: "Array(Date)", + QValueKindArrayTimestamp: "Array(DateTime64(6))", + QValueKindArrayTimestampTZ: "Array(DateTime64(6))", } func (kind QValueKind) ToDWHColumnType(dwhType protos.DBType) (string, error) {