diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index cca68cdda7..a6eb395026 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -101,16 +101,26 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { m.syncBatchID, m.dstTableName) } -func (m *mergeStmtGenerator) transformedPkeyStrings() []string { +// This function is to handle +func (m *mergeStmtGenerator) transformedPkeyStrings(forPartition bool) []string { pkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) for _, col := range m.normalizedTableSchema.Columns { if slices.Contains(m.normalizedTableSchema.PrimaryKeyColumns, col.Name) { pkeyCol := col.Name switch qvalue.QValueKind(col.Type) { case qvalue.QValueKindJSON: - pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(%s)", m.shortColumn[pkeyCol])) + if forPartition { + pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(%s)", m.shortColumn[pkeyCol])) + } else { + pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(_t.%s)=TO_JSON_STRING(_d.%s)", + pkeyCol, m.shortColumn[pkeyCol])) + } default: - pkeys = append(pkeys, m.shortColumn[pkeyCol]) + if forPartition { + pkeys = append(pkeys, m.shortColumn[pkeyCol]) + } else { + pkeys = append(pkeys, fmt.Sprintf("_t.%s=_d.%s", pkeyCol, m.shortColumn[pkeyCol])) + } } } } @@ -128,9 +138,8 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string { WHERE _peerdb_rank=1 ) SELECT * FROM _dd` - shortPkeys := m.transformedPkeyStrings() - pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(shortPkeys, - ", '_peerdb_concat_', ")) + shortPkeys := m.transformedPkeyStrings(true) + pkeyColsStr := strings.Join(shortPkeys, ", '_peerdb_concat_' ") return fmt.Sprintf(cte, pkeyColsStr) } @@ -164,11 +173,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s } updateStringToastCols := strings.Join(updateStatementsforToastCols, " ") - pkeySelectSQLArray := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns)) - for _, pkeyColName := range m.normalizedTableSchema.PrimaryKeyColumns { - pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_t.%s=_d.%s", - pkeyColName, m.shortColumn[pkeyColName])) - } + pkeySelectSQLArray := m.transformedPkeyStrings(false) // t. = d. AND t. = d. ... pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")