From 69b5d339aa15f933474b60f9efe92b4b2d4fbfa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 12 Feb 2024 21:34:05 +0000 Subject: [PATCH] clickhouse: specialize Date json extract (#1260) --- flow/connectors/clickhouse/normalize.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 820dbb7c75..7ac8e21445 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -161,12 +161,16 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N clickhouseType = "String" } - projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", cn, clickhouseType, cn)) + if clickhouseType == "Date" { + projection.WriteString(fmt.Sprintf("toDate(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,", cn, cn)) + } else { + projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", cn, clickhouseType, cn)) + } } // add _peerdb_sign as _peerdb_record_type / 2 projection.WriteString(fmt.Sprintf("intDiv(_peerdb_record_type, 2) AS `%s`,", signColName)) - colSelector.WriteString(fmt.Sprintf("%s,", signColName)) + colSelector.WriteString(fmt.Sprintf("`%s`,", signColName)) // add _peerdb_timestamp as _peerdb_version projection.WriteString(fmt.Sprintf("_peerdb_timestamp AS `%s`", versionColName))