Skip to content

Commit

Permalink
sets soft-delete back to false when inserting
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 24, 2023
1 parent 963b6f1 commit f4e9e10
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
21 changes: 12 additions & 9 deletions flow/connectors/snowflake/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {

expected := []string{
`WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS=''
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3"`,
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3",
"_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`,
`WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='col2,col3'
THEN UPDATE SET "COL1" = SOURCE."COL1"`,
THEN UPDATE SET "COL1" = SOURCE."COL1",
"_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`,
`WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='col2'
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL3" = SOURCE."COL3"`,
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL3" = SOURCE."COL3",
"_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`,
`WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='col3'
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2"`,
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2",
"_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`,
}

result := c.generateUpdateStatement("", allCols, unchangedToastCols)
result := c.generateUpdateStatement("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -41,10 +44,10 @@ func TestGenerateUpdateStatement_EmptyColumns(t *testing.T) {

expected := []string{
`WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS=''
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3"`,
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3",
"_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`,
}

result := c.generateUpdateStatement("", allCols, unchangedToastCols)
result := c.generateUpdateStatement("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
13 changes: 10 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,8 +882,8 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(

insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",")

updateStatementsforToastCols := c.generateUpdateStatement(
normalizeReq.SyncedAtColName, columnNames, unchangedToastColumns)
updateStatementsforToastCols := c.generateUpdateStatement(normalizeReq.SyncedAtColName,
normalizeReq.SoftDeleteColName, columnNames, unchangedToastColumns)
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
Expand Down Expand Up @@ -1013,6 +1013,8 @@ This function generates UPDATE statements for a MERGE operation based on the pro
Inputs:
1. allCols: An array of all column names.
2. unchangedToastCols: An array capturing unique sets of unchanged toast column groups.
3. softDeleteCol: just set to false in the case we see an insert after a soft-deleted column
4. syncedAtCol: set to the CURRENT_TIMESTAMP
Algorithm:
1. Iterate over each unique set of unchanged toast column groups.
Expand All @@ -1030,7 +1032,8 @@ and updating the other columns.
7. Return the list of generated update statements.
*/
func (c *SnowflakeConnector) generateUpdateStatement(
syncedAtCol string, allCols []string, unchangedToastCols []string) []string {
syncedAtCol string, softDeleteCol string,
allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)

for _, cols := range unchangedToastCols {
Expand All @@ -1046,6 +1049,10 @@ func (c *SnowflakeConnector) generateUpdateStatement(
if syncedAtCol != "" {
tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = CURRENT_TIMESTAMP`, syncedAtCol))
}
// set soft-deleted to false, tackles insert after soft-delete
if softDeleteCol != "" {
tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = FALSE`, softDeleteCol))
}

ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
Expand Down

0 comments on commit f4e9e10

Please sign in to comment.