Skip to content

Commit

Permalink
adds fix for pg, adds comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 21, 2023
1 parent f78bec3 commit eac5b85
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 1 deletion.
4 changes: 4 additions & 0 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
(_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)

// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf("`%s` = TRUE", peerdbCols.SoftDeleteColName))
Expand Down
15 changes: 14 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string,
peerdbCols.SyncedAtColName))
}
// set soft-deleted to false, tackles insert after soft-delete
if peerdbCols.SoftDeleteColName != "" {
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = FALSE`,
peerdbCols.SoftDeleteColName))
}
Expand All @@ -771,6 +771,19 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string,
src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)

// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf(`"%s" = TRUE`, peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
src._peerdb_record_type = 2 AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
}
return updateStmts
}
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,10 @@ func (c *SnowflakeConnector) generateUpdateStatements(
(SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)

// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if softDelete && (softDeleteCol != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1], fmt.Sprintf(`"%s" = TRUE`, softDeleteCol))
ssep := strings.Join(tmpArray, ", ")
Expand Down

0 comments on commit eac5b85

Please sign in to comment.