Skip to content

Commit

Permalink
fixing delete after update in same batch
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 27, 2023
1 parent ef50c46 commit 3b61783
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 5 deletions.
4 changes: 4 additions & 0 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ func (p *PostgresCDCSource) consumeStream(
latestRecord := localRecords[tablePKeyLastSeen[tablePkeyVal]]
deleteRecord := rec.(*model.DeleteRecord)
deleteRecord.Items = latestRecord.GetItems()
updateRecord, ok := latestRecord.(*model.UpdateRecord)
if ok {
deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns
}
}
addRecord(rec)
case *model.RelationRecord:
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2",
"_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`,
}
result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols)
result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", false, allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -47,7 +47,7 @@ func TestGenerateUpdateStatement_EmptyColumns(t *testing.T) {
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3",
"_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`,
}
result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols)
result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", false, allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
13 changes: 11 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,8 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",")

updateStatementsforToastCols := c.generateUpdateStatements(normalizeReq.SyncedAtColName,
normalizeReq.SoftDeleteColName, columnNames, unchangedToastColumns)
normalizeReq.SoftDeleteColName, normalizeReq.SoftDelete,
columnNames, unchangedToastColumns)

// handling the case when an insert and delete happen in the same batch, with updates in the middle
// with soft-delete, we want the row to be in the destination with SOFT_DELETE true
Expand Down Expand Up @@ -1025,7 +1026,7 @@ and updating the other columns.
7. Return the list of generated update statements.
*/
func (c *SnowflakeConnector) generateUpdateStatements(
syncedAtCol string, softDeleteCol string,
syncedAtCol string, softDeleteCol string, softDelete bool,
allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)

Expand All @@ -1052,6 +1053,14 @@ func (c *SnowflakeConnector) generateUpdateStatements(
(SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
if softDelete && (softDeleteCol != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1], fmt.Sprintf(`"%s" = TRUE`, softDeleteCol))
ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(SOURCE._PEERDB_RECORD_TYPE = 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
}
return updateStmts
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
entries[7] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
Value: KeysToString(typedRecord.UnchangedToastColumns),
}
tableMapping[typedRecord.DestinationTableName] += 1
default:
Expand Down
2 changes: 2 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ type DeleteRecord struct {
CheckPointID int64
// Items is a map of column name to value.
Items *RecordItems
// unchanged toast columns, filled from latest UpdateRecord
UnchangedToastColumns map[string]struct{}
}

// Implement Record interface for DeleteRecord.
Expand Down

0 comments on commit 3b61783

Please sign in to comment.