diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 213c137649..c27fc974c8 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -101,26 +101,27 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { m.syncBatchID, m.dstTableName) } -// This function is to support datatypes like JSON which cannot be compared by BigQuery +// This function is to support datatypes like JSON which either cannot be partitoned by or compared by BigQuery func (m *mergeStmtGenerator) transformedPkeyStrings(forPartition bool) []string { pkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) + transformations := map[qvalue.QValueKind]string{ + qvalue.QValueKindJSON: "TO_JSON_STRING(%s)", + qvalue.QValueKindFloat32: "CAST(%s as STRING)", + qvalue.QValueKindFloat64: "CAST(%s as STRING)", + } + for _, col := range m.normalizedTableSchema.Columns { if slices.Contains(m.normalizedTableSchema.PrimaryKeyColumns, col.Name) { pkeyCol := col.Name - switch qvalue.QValueKind(col.Type) { - case qvalue.QValueKindJSON: - if forPartition { - pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(%s)", m.shortColumn[pkeyCol])) - } else { - pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(_t.%s)=TO_JSON_STRING(_d.%s)", - pkeyCol, m.shortColumn[pkeyCol])) - } - default: - if forPartition { - pkeys = append(pkeys, m.shortColumn[pkeyCol]) - } else { - pkeys = append(pkeys, fmt.Sprintf("_t.%s=_d.%s", pkeyCol, m.shortColumn[pkeyCol])) - } + transformation, exists := transformations[qvalue.QValueKind(col.Type)] + if !exists { + transformation = "%s" + } + if forPartition { + pkeys = append(pkeys, fmt.Sprintf(transformation, m.shortColumn[pkeyCol])) + } else { + pkeys = append(pkeys, fmt.Sprintf(transformation+"="+transformation, + "_t.`"+pkeyCol+"`", "_d."+m.shortColumn[pkeyCol])) } } }