Skip to content

Commit

Permalink
float support, quoting, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 1, 2024
1 parent 98b9c4f commit 55f8f92
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,26 +101,27 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
m.syncBatchID, m.dstTableName)
}

// This function is to support datatypes like JSON which cannot be compared by BigQuery
// This function is to support datatypes like JSON which either cannot be partitoned by or compared by BigQuery
func (m *mergeStmtGenerator) transformedPkeyStrings(forPartition bool) []string {
pkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns))
transformations := map[qvalue.QValueKind]string{
qvalue.QValueKindJSON: "TO_JSON_STRING(%s)",
qvalue.QValueKindFloat32: "CAST(%s as STRING)",
qvalue.QValueKindFloat64: "CAST(%s as STRING)",
}

for _, col := range m.normalizedTableSchema.Columns {
if slices.Contains(m.normalizedTableSchema.PrimaryKeyColumns, col.Name) {
pkeyCol := col.Name
switch qvalue.QValueKind(col.Type) {
case qvalue.QValueKindJSON:
if forPartition {
pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(%s)", m.shortColumn[pkeyCol]))
} else {
pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(_t.%s)=TO_JSON_STRING(_d.%s)",
pkeyCol, m.shortColumn[pkeyCol]))
}
default:
if forPartition {
pkeys = append(pkeys, m.shortColumn[pkeyCol])
} else {
pkeys = append(pkeys, fmt.Sprintf("_t.%s=_d.%s", pkeyCol, m.shortColumn[pkeyCol]))
}
transformation, exists := transformations[qvalue.QValueKind(col.Type)]
if !exists {
transformation = "%s"
}
if forPartition {
pkeys = append(pkeys, fmt.Sprintf(transformation, m.shortColumn[pkeyCol]))
} else {
pkeys = append(pkeys, fmt.Sprintf(transformation+"="+transformation,
"_t.`"+pkeyCol+"`", "_d."+m.shortColumn[pkeyCol]))
}
}
}
Expand Down

0 comments on commit 55f8f92

Please sign in to comment.