Skip to content

Commit

Permalink
cast to string for comparison for json pkey
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 1, 2024
1 parent ef2cef5 commit 0305953
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,26 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
m.syncBatchID, m.dstTableName)
}

func (m *mergeStmtGenerator) transformedPkeyStrings() []string {
// This function is to handle
func (m *mergeStmtGenerator) transformedPkeyStrings(forPartition bool) []string {
pkeys := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns))
for _, col := range m.normalizedTableSchema.Columns {
if slices.Contains(m.normalizedTableSchema.PrimaryKeyColumns, col.Name) {
pkeyCol := col.Name
switch qvalue.QValueKind(col.Type) {
case qvalue.QValueKindJSON:
pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(%s)", m.shortColumn[pkeyCol]))
if forPartition {
pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(%s)", m.shortColumn[pkeyCol]))
} else {
pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(_t.%s)=TO_JSON_STRING(_d.%s)",
pkeyCol, m.shortColumn[pkeyCol]))
}
default:
pkeys = append(pkeys, m.shortColumn[pkeyCol])
if forPartition {
pkeys = append(pkeys, m.shortColumn[pkeyCol])
} else {
pkeys = append(pkeys, fmt.Sprintf("_t.%s=_d.%s", pkeyCol, m.shortColumn[pkeyCol]))
}
}
}
}
Expand All @@ -128,9 +138,8 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
WHERE _peerdb_rank=1
) SELECT * FROM _dd`

shortPkeys := m.transformedPkeyStrings()
pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(shortPkeys,
", '_peerdb_concat_', "))
shortPkeys := m.transformedPkeyStrings(true)
pkeyColsStr := strings.Join(shortPkeys, ", '_peerdb_concat_' ")
return fmt.Sprintf(cte, pkeyColsStr)
}

Expand Down Expand Up @@ -164,11 +173,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(unchangedToastColumns []string) s
}
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(m.normalizedTableSchema.PrimaryKeyColumns))
for _, pkeyColName := range m.normalizedTableSchema.PrimaryKeyColumns {
pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_t.%s=_d.%s",
pkeyColName, m.shortColumn[pkeyColName]))
}
pkeySelectSQLArray := m.transformedPkeyStrings(false)
// t.<pkey1> = d.<pkey1> AND t.<pkey2> = d.<pkey2> ...
pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ")

Expand Down

0 comments on commit 0305953

Please sign in to comment.