diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 820dbb7c75..e3ad6777f0 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -157,7 +157,7 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N if err != nil { return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err) } - if clickhouseType == "DateTime64(6)" { + if clickhouseType == "DateTime64(6)" || clickhouseType == "Date" || clickhouseType == "TIMESTAMP" { clickhouseType = "String" } @@ -166,7 +166,7 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N // add _peerdb_sign as _peerdb_record_type / 2 projection.WriteString(fmt.Sprintf("intDiv(_peerdb_record_type, 2) AS `%s`,", signColName)) - colSelector.WriteString(fmt.Sprintf("%s,", signColName)) + colSelector.WriteString(fmt.Sprintf("`%s`,", signColName)) // add _peerdb_timestamp as _peerdb_version projection.WriteString(fmt.Sprintf("_peerdb_timestamp AS `%s`", versionColName))