diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index eb840b86b7..820dbb7c75 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -151,7 +151,7 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N cn := column.Name ct := column.Type - colSelector.WriteString(fmt.Sprintf("%s,", cn)) + colSelector.WriteString(fmt.Sprintf("`%s`,", cn)) colType := qvalue.QValueKind(ct) clickhouseType, err := qValueKindToClickhouseType(colType) if err != nil { @@ -161,15 +161,15 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N clickhouseType = "String" } - projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS %s, ", cn, clickhouseType, cn)) + projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", cn, clickhouseType, cn)) } // add _peerdb_sign as _peerdb_record_type / 2 - projection.WriteString(fmt.Sprintf("intDiv(_peerdb_record_type, 2) AS %s, ", signColName)) + projection.WriteString(fmt.Sprintf("intDiv(_peerdb_record_type, 2) AS `%s`,", signColName)) colSelector.WriteString(fmt.Sprintf("%s,", signColName)) // add _peerdb_timestamp as _peerdb_version - projection.WriteString(fmt.Sprintf("_peerdb_timestamp AS %s", versionColName)) + projection.WriteString(fmt.Sprintf("_peerdb_timestamp AS `%s`", versionColName)) colSelector.WriteString(versionColName) colSelector.WriteString(") ")