diff --git a/flow/connectors/snowflake/merge_stmt_generator_test.go b/flow/connectors/snowflake/merge_stmt_generator_test.go index e12d55e69..7db2ef654 100644 --- a/flow/connectors/snowflake/merge_stmt_generator_test.go +++ b/flow/connectors/snowflake/merge_stmt_generator_test.go @@ -13,16 +13,19 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { expected := []string{ `WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='' - THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3"`, + THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3", + "_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`, `WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='col2,col3' - THEN UPDATE SET "COL1" = SOURCE."COL1"`, + THEN UPDATE SET "COL1" = SOURCE."COL1", + "_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`, `WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='col2' - THEN UPDATE SET "COL1" = SOURCE."COL1", "COL3" = SOURCE."COL3"`, + THEN UPDATE SET "COL1" = SOURCE."COL1", "COL3" = SOURCE."COL3", + "_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`, `WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='col3' - THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2"`, + THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", + "_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`, } - - result := c.generateUpdateStatement("", allCols, unchangedToastCols) + result := c.generateUpdateStatement("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) @@ -41,10 +44,10 @@ func TestGenerateUpdateStatement_EmptyColumns(t *testing.T) { expected := []string{ `WHEN MATCHED AND (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='' - THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3"`, + THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3", + "_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`, } - - result := c.generateUpdateStatement("", allCols, unchangedToastCols) + result := c.generateUpdateStatement("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index f04b9be38..661b92621 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -882,8 +882,8 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",") - updateStatementsforToastCols := c.generateUpdateStatement( - normalizeReq.SyncedAtColName, columnNames, unchangedToastColumns) + updateStatementsforToastCols := c.generateUpdateStatement(normalizeReq.SyncedAtColName, + normalizeReq.SoftDeleteColName, columnNames, unchangedToastColumns) updateStringToastCols := strings.Join(updateStatementsforToastCols, " ") pkeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns)) @@ -1013,6 +1013,8 @@ This function generates UPDATE statements for a MERGE operation based on the pro Inputs: 1. allCols: An array of all column names. 2. unchangedToastCols: An array capturing unique sets of unchanged toast column groups. +3. softDeleteCol: just set to false in the case we see an insert after a soft-deleted column +4. syncedAtCol: set to the CURRENT_TIMESTAMP Algorithm: 1. Iterate over each unique set of unchanged toast column groups. @@ -1030,7 +1032,8 @@ and updating the other columns. 7. Return the list of generated update statements. */ func (c *SnowflakeConnector) generateUpdateStatement( - syncedAtCol string, allCols []string, unchangedToastCols []string) []string { + syncedAtCol string, softDeleteCol string, + allCols []string, unchangedToastCols []string) []string { updateStmts := make([]string, 0) for _, cols := range unchangedToastCols { @@ -1046,6 +1049,10 @@ func (c *SnowflakeConnector) generateUpdateStatement( if syncedAtCol != "" { tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = CURRENT_TIMESTAMP`, syncedAtCol)) } + // set soft-deleted to false, tackles insert after soft-delete + if softDeleteCol != "" { + tmpArray = append(tmpArray, fmt.Sprintf(`"%s" = FALSE`, softDeleteCol)) + } ssep := strings.Join(tmpArray, ", ") updateStmt := fmt.Sprintf(`WHEN MATCHED AND