Skip to content

Commit

Permalink
Clickhouse cdc data types (#1210)
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb authored Feb 7, 2024
1 parent eb7dc78 commit dc00c1a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 17 deletions.
18 changes: 9 additions & 9 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,16 @@ func (c *ClickhouseConnector) NormalizeRecords(req *model.NormalizeRecordsReques
ct := column.Type

colSelector.WriteString(fmt.Sprintf("%s,", cn))

extractionFuction := "JSONExtractRaw"
switch qvalue.QValueKind(ct) {
case qvalue.QValueKindString:
extractionFuction = "JSONExtractString"
case qvalue.QValueKindInt64:
// TODO check if int64 is supported.
extractionFuction = "JSONExtractInt"
colType := qvalue.QValueKind(ct)
clickhouseType, err := qValueKindToClickhouseType(colType)
if err != nil {
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
if clickhouseType == "DateTime64(6)" {
clickhouseType = "String"
}
projection.WriteString(fmt.Sprintf("%s(_peerdb_data, '%s') AS %s, ", extractionFuction, cn, cn))

projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS '%s', ", cn, clickhouseType, cn))
}

// add _peerdb_sign as _peerdb_record_type / 2
Expand Down
14 changes: 9 additions & 5 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision
case QValueKindBytes, QValueKindBit:
return "bytes", nil
case QValueKindNumeric:
if targetDWH == QDWHTypeClickhouse {
return "double", nil
}
return AvroSchemaNumeric{
Type: "bytes",
LogicalType: "decimal",
Expand All @@ -85,6 +82,9 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision
}, nil
case QValueKindTime, QValueKindTimeTZ, QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ:
if targetDWH == QDWHTypeClickhouse {
if kind == QValueKindTime {
return "string", nil
}
return "long", nil
}
return "string", nil
Expand Down Expand Up @@ -172,9 +172,9 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {

if c.TargetDWH == QDWHTypeClickhouse {
if c.Nullable {
return c.processNullableUnion("long", t.(int64))
return c.processNullableUnion("string", t.(string))
} else {
return t.(int64), nil
return t.(string), nil
}
}
if c.Nullable {
Expand Down Expand Up @@ -372,6 +372,10 @@ func (c *QValueAvroConverter) processGoTime() (interface{}, error) {
if c.TargetDWH == QDWHTypeSnowflake {
return t.Format("15:04:05.999999"), nil
}

if c.TargetDWH == QDWHTypeClickhouse {
return t.Format("15:04:05.999999"), nil
}
return t.UnixMicro(), nil
}

Expand Down
8 changes: 5 additions & 3 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{
QValueKindInt64: "Int64",
QValueKindFloat32: "Float32",
QValueKindFloat64: "Float64",
QValueKindNumeric: "Float64",
QValueKindNumeric: "Decimal128(9)",
QValueKindString: "String",
QValueKindJSON: "String",
QValueKindTimestamp: "DateTime64(6)",
QValueKindTimestampTZ: "TIMESTAMP",
QValueKindTime: "TIME",
QValueKindDate: "DATE",
QValueKindTime: "String",
QValueKindDate: "Date",
QValueKindBit: "Boolean",
QValueKindBytes: "String",
QValueKindStruct: "String",
Expand All @@ -124,6 +124,8 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{
QValueKindArrayInt32: "Array(Int32)",
QValueKindArrayInt64: "Array(Int64)",
QValueKindArrayString: "Array(String)",
QValueKindArrayBoolean: "Array(Bool)",
QValueKindArrayInt16: "Array(Int16)",
}

func (kind QValueKind) ToDWHColumnType(dwhType QDWHType) (string, error) {
Expand Down

0 comments on commit dc00c1a

Please sign in to comment.