Skip to content

Commit

Permalink
fix-aliasing
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 6, 2024
1 parent ec78b18 commit 1b17ff9
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// for each column in the normalized table, generate CAST + JSON_EXTRACT_SCALAR
// statement.
flattenedProjs := make([]string, 0, utils.TableSchemaColumns(m.normalizedTableSchema)+3)
i := 0
for colName, colType := range m.normalizedTableSchema.Columns {

for i, colName := range m.normalizedTableSchema.ColumnNames {
colType := m.normalizedTableSchema.ColumnTypes[i]
bqType := qValueKindToBigQueryType(colType)
// CAST doesn't work for FLOAT, so rewrite it to FLOAT64.
if bqType == bigquery.FloatFieldType {
bqType = "FLOAT64"
}
var castStmt string
i += 1
shortCol := fmt.Sprintf("_c%d", i)
m.shortColumn[colName] = shortCol
shortCol := m.shortColumn[colName]
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
// if the type is JSON, then just extract JSON
Expand All @@ -55,7 +54,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`",
colName, colName)
colName, shortCol)
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
Expand Down Expand Up @@ -110,7 +109,13 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
) _peerdb_ranked
WHERE _peerdb_rank=1
) SELECT * FROM _dd`
pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.normalizedTableSchema.PrimaryKeyColumns,

shortPkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns))
for _, pkeyCol := range m.normalizedTableSchema.PrimaryKeyColumns {
shortPkeys = append(shortPkeys, m.shortColumn[pkeyCol])
}

pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(shortPkeys,
", '_peerdb_concat_', "))
return fmt.Sprintf(cte, pkeyColsStr)
}
Expand All @@ -122,9 +127,11 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
backtickColNames := make([]string, 0, columnCount)
shortBacktickColNames := make([]string, 0, columnCount)
pureColNames := make([]string, 0, columnCount)
for colName := range m.normalizedTableSchema.Columns {
for i, colName := range m.normalizedTableSchema.ColumnNames {
shortCol := fmt.Sprintf("_c%d", i)
m.shortColumn[colName] = shortCol
backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName))
shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", m.shortColumn[colName]))
shortBacktickColNames = append(shortBacktickColNames, fmt.Sprintf("`%s`", shortCol))
pureColNames = append(pureColNames, colName)
}
csep := strings.Join(backtickColNames, ", ")
Expand All @@ -147,7 +154,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() string {
pkeySelectSQLArray := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns))
for _, pkeyColName := range m.normalizedTableSchema.PrimaryKeyColumns {
pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_t.%s=_d.%s",
pkeyColName, pkeyColName))
pkeyColName, m.shortColumn[pkeyColName]))
}
// t.<pkey1> = d.<pkey1> AND t.<pkey2> = d.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")
Expand Down

0 comments on commit 1b17ff9

Please sign in to comment.