From 96bb20f33ea416a3faa125219c05a5e6f1356e94 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 15 Dec 2023 16:53:08 +0530 Subject: [PATCH] fixes merge statement casting for JSONB and array types Conflicts: flow/cmd/worker.go flow/connectors/bigquery/bigquery.go flow/connectors/postgres/cdc.go --- flow/cmd/worker.go | 2 +- flow/connectors/bigquery/merge_statement_generator.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index a9e1281619..ffc0d30fde 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -43,7 +43,7 @@ func setupPyroscope(opts *WorkerOptions) { ServerAddress: opts.PyroscopeServer, // you can disable logging by setting this to nil - Logger: log.StandardLogger(), + Logger: nil, // you can provide static tags via a map: Tags: map[string]string{"hostname": os.Getenv("HOSTNAME")}, diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 427fcd7884..b038e6b46f 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -62,16 +62,16 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON: //if the type is JSON, then just extract JSON - castStmt = fmt.Sprintf("CAST(JSON_EXTRACT(_peerdb_data, '$.%s') AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", colName, bqType, colName) // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: - castStmt = fmt.Sprintf("FROM_BASE64(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s')) AS `%s`", + castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data, '$.%s')) AS `%s`", colName, colName) case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ - "UNNEST(CAST(JSON_EXTRACT_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", + "UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY)) AS element) AS `%s`", bqType, colName, colName) // MAKE_INTERVAL(years INT64, months INT64, days INT64, hours INT64, minutes INT64, seconds INT64) // Expecting interval to be in the format of {"Microseconds":2000000,"Days":0,"Months":0,"Valid":true} @@ -89,7 +89,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // " AS int64))) AS %s", // colName, colName) default: - castStmt = fmt.Sprintf("CAST(JSON_EXTRACT_SCALAR(_peerdb_data, '$.%s') AS %s) AS `%s`", + castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`", colName, bqType, colName) } flattenedProjs = append(flattenedProjs, castStmt)