diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index fda539be30..ae72f38817 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -149,16 +149,16 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques ct := column.Type colSelector.WriteString(fmt.Sprintf("%s,", cn)) - - extractionFuction := "JSONExtractRaw" - switch qvalue.QValueKind(ct) { - case qvalue.QValueKindString: - extractionFuction = "JSONExtractString" - case qvalue.QValueKindInt64: - // TODO check if int64 is supported. - extractionFuction = "JSONExtractInt" + colType := qvalue.QValueKind(ct) + clickhouseType, err := qValueKindToClickhouseType(colType) + if err != nil { + return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) + } + if clickhouseType == "DateTime64(6)" { + clickhouseType = "String" } - projection.WriteString(fmt.Sprintf("%s(_peerdb_data, '%s') AS %s, ", extractionFuction, cn, cn)) + + projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS '%s', ", cn, clickhouseType, cn)) } // add _peerdb_sign as _peerdb_record_type / 2 diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index f76a98e87b..70d9e12b85 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -74,9 +74,6 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision case QValueKindBytes, QValueKindBit: return "bytes", nil case QValueKindNumeric: - if targetDWH == QDWHTypeClickhouse { - return "double", nil - } return AvroSchemaNumeric{ Type: "bytes", LogicalType: "decimal", @@ -85,6 +82,9 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision }, nil case QValueKindTime, QValueKindTimeTZ, QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ: if targetDWH == QDWHTypeClickhouse { + if kind == QValueKindTime { + return "string", nil + } return "long", nil } return "string", nil @@ -172,9 +172,9 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { if c.TargetDWH == QDWHTypeClickhouse { if c.Nullable { - return c.processNullableUnion("long", t.(int64)) + return c.processNullableUnion("string", t.(string)) } else { - return t.(int64), nil + return t.(string), nil } } if c.Nullable { @@ -372,6 +372,10 @@ func (c *QValueAvroConverter) processGoTime() (interface{}, error) { if c.TargetDWH == QDWHTypeSnowflake { return t.Format("15:04:05.999999"), nil } + + if c.TargetDWH == QDWHTypeClickhouse { + return t.Format("15:04:05.999999"), nil + } return t.UnixMicro(), nil } diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 5c1c9271e8..6f2bc9c976 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -100,13 +100,13 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindInt64: "Int64", QValueKindFloat32: "Float32", QValueKindFloat64: "Float64", - QValueKindNumeric: "Float64", + QValueKindNumeric: "Decimal128(9)", QValueKindString: "String", QValueKindJSON: "String", QValueKindTimestamp: "DateTime64(6)", QValueKindTimestampTZ: "TIMESTAMP", - QValueKindTime: "TIME", - QValueKindDate: "DATE", + QValueKindTime: "String", + QValueKindDate: "Date", QValueKindBit: "Boolean", QValueKindBytes: "String", QValueKindStruct: "String", @@ -124,6 +124,8 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindArrayInt32: "Array(Int32)", QValueKindArrayInt64: "Array(Int64)", QValueKindArrayString: "Array(String)", + QValueKindArrayBoolean: "Array(Bool)", + QValueKindArrayInt16: "Array(Int16)", } func (kind QValueKind) ToDWHColumnType(dwhType QDWHType) (string, error) {