From c67e5e1b8f11f8faafce9889f64d3e926230ed0f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 6 Jan 2024 18:43:24 +0530 Subject: [PATCH] fix-aliasing --- .../bigquery/merge_statement_generator.go | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 357270a0ea..47dd2b3936 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -36,17 +36,16 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR // statement. flattenedProjs := make([]string, 0, utils.TableSchemaColumns(m.normalizedTableSchema)+3) - i := 0 - for colName, colType := range m.normalizedTableSchema.Columns { + + for i, colName := range m.normalizedTableSchema.ColumnNames { + colType := m.normalizedTableSchema.ColumnTypes[i] bqType := qValueKindToBigQueryType(colType) // CAST doesn't work for FLOAT, so rewrite it to FLOAT64. if bqType == bigquery.FloatFieldType { bqType = "FLOAT64" } var castStmt string - i += 1 - shortCol := fmt.Sprintf("_c%d", i) - m.shortColumn[colName] = shortCol + shortCol := m.shortColumn[colName] switch qvalue.QValueKind(colType) { case qvalue.QValueKindJSON: // if the type is JSON, then just extract JSON @@ -55,7 +54,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`", - colName, colName) + colName, shortCol) case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString: castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+ @@ -110,7 +109,13 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { ) _peerdb_ranked WHERE _peerdb_rank=1 ) SELECT * FROM _dd` - pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.normalizedTableSchema.PrimaryKeyColumns, + + shortPkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) + for _, pkeyCol := range m.normalizedTableSchema.PrimaryKeyColumns { + shortPkeys = append(shortPkeys, m.shortColumn[pkeyCol]) + } + + pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(shortPkeys, ", '_peerdb_concat_', ")) return fmt.Sprintf(cte, pkeyColsStr) } @@ -122,9 +127,11 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { backtickColNames := make([]string, 0, columnCount) shortBacktickColNames := make([]string, 0, columnCount) pureColNames := make([]string, 0, columnCount) - for colName := range m.normalizedTableSchema.Columns { + for i, colName := range m.normalizedTableSchema.ColumnNames { + shortCol := fmt.Sprintf("_c%d", i) + m.shortColumn[colName] = shortCol backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName)) - shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", m.shortColumn[colName])) + shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", shortCol)) pureColNames = append(pureColNames, colName) } csep := strings.Join(backtickColNames, ", ") @@ -147,7 +154,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string { pkeySelectSQLArray := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) for _, pkeyColName := range m.normalizedTableSchema.PrimaryKeyColumns { pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_t.%s=_d.%s", - pkeyColName, pkeyColName)) + pkeyColName, m.shortColumn[pkeyColName])) } // t. = d. AND t. = d. ... pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")