Skip to content

Commit

Permalink
handles soft-delete in case insert and delete in same batch (#713)
Browse files Browse the repository at this point in the history
closes #665
  • Loading branch information
heavycrystal authored Dec 4, 2023
1 parent ad769d4 commit 9cd6874
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 32 deletions.
31 changes: 29 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,12 @@ func (p *PostgresCDCSource) consumeStream(
TableName: tableName,
PkeyColVal: compositePKeyString,
}
_, ok := tablePKeyLastSeen[tablePkeyVal]
recIndex, ok := tablePKeyLastSeen[tablePkeyVal]
if !ok {
addRecord(rec)
tablePKeyLastSeen[tablePkeyVal] = len(localRecords) - 1
} else {
oldRec := localRecords[tablePKeyLastSeen[tablePkeyVal]]
oldRec := localRecords[recIndex]
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems())
for _, col := range updatedCols {
Expand Down Expand Up @@ -396,6 +396,33 @@ func (p *PostgresCDCSource) consumeStream(
tablePKeyLastSeen[tablePkeyVal] = len(localRecords) - 1
}
case *model.DeleteRecord:
compositePKeyString, err := p.compositePKeyToString(req, rec)
if err != nil {
return err
}

tablePkeyVal := model.TableWithPkey{
TableName: tableName,
PkeyColVal: compositePKeyString,
}
recIndex, ok := tablePKeyLastSeen[tablePkeyVal]
if ok {
latestRecord := localRecords[recIndex]
deleteRecord := rec.(*model.DeleteRecord)
deleteRecord.Items = latestRecord.GetItems()
updateRecord, ok := latestRecord.(*model.UpdateRecord)
if ok {
deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns
}
} else {
deleteRecord := rec.(*model.DeleteRecord)
// there is nothing to backfill the items in the delete record with,
// so don't update the row with this record
// add sentinel value to prevent update statements from selecting
deleteRecord.UnchangedToastColumns = map[string]struct{}{
"_peerdb_not_backfilled_delete": {},
}
}
addRecord(rec)
case *model.RelationRecord:
tableSchemaDelta := r.TableSchemaDelta
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2",
"_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`,
}
result := c.generateUpdateStatement("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols)
result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", false, allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand All @@ -47,7 +47,7 @@ func TestGenerateUpdateStatement_EmptyColumns(t *testing.T) {
THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3",
"_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`,
}
result := c.generateUpdateStatement("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols)
result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", false, allCols, unchangedToastCols)

for i := range expected {
expected[i] = removeSpacesTabsNewlines(expected[i])
Expand Down
53 changes: 27 additions & 26 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ const (
_PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d`
getTableNametoUnchangedColsSQL = `SELECT _PEERDB_DESTINATION_TABLE_NAME,
ARRAY_AGG(DISTINCT _PEERDB_UNCHANGED_TOAST_COLUMNS) FROM %s.%s WHERE
_PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d GROUP BY _PEERDB_DESTINATION_TABLE_NAME`
_PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_RECORD_TYPE != 2
GROUP BY _PEERDB_DESTINATION_TABLE_NAME`
getTableSchemaSQL = `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA=? AND TABLE_NAME=?`

Expand Down Expand Up @@ -855,17 +856,6 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(columnName)))
}

// add soft delete and synced at columns to the list of columns
if normalizeReq.SoftDelete {
colName := normalizeReq.SoftDeleteColName
quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(colName)))
}

if normalizeReq.SyncedAtColName != "" {
colName := normalizeReq.SyncedAtColName
quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(colName)))
}

insertColumnsSQL := strings.TrimSuffix(strings.Join(quotedUpperColNames, ","), ",")

insertValuesSQLArray := make([]string, 0, len(columnNames))
Expand All @@ -874,21 +864,24 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(
insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("SOURCE.%s,", quotedUpperColumnName))
}

// add soft delete and synced at columns to the list of columns
if normalizeReq.SoftDelete {
// add false as default value for soft delete column
insertValuesSQLArray = append(insertValuesSQLArray, "FALSE,")
}
insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",")

if normalizeReq.SyncedAtColName != "" {
// add current timestamp as default value for synced at column
insertValuesSQLArray = append(insertValuesSQLArray, "CURRENT_TIMESTAMP,")
}
updateStatementsforToastCols := c.generateUpdateStatements(normalizeReq.SyncedAtColName,
normalizeReq.SoftDeleteColName, normalizeReq.SoftDelete,
columnNames, unchangedToastColumns)

insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",")
// handling the case when an insert and delete happen in the same batch, with updates in the middle
// with soft-delete, we want the row to be in the destination with SOFT_DELETE true
// the current merge statement doesn't do that, so we add another case to insert the DeleteRecord
if normalizeReq.SoftDelete {
softDeleteInsertColumnsSQL := strings.TrimSuffix(strings.Join(append(quotedUpperColNames,
normalizeReq.SoftDeleteColName), ","), ",")
softDeleteInsertValuesSQL := strings.Join(append(insertValuesSQLArray, "TRUE"), "")

updateStatementsforToastCols := c.generateUpdateStatement(normalizeReq.SyncedAtColName,
normalizeReq.SoftDeleteColName, columnNames, unchangedToastColumns)
updateStatementsforToastCols = append(updateStatementsforToastCols,
fmt.Sprintf("WHEN NOT MATCHED AND (SOURCE._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)",
softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL))
}
updateStringToastCols := strings.Join(updateStatementsforToastCols, " ")

pkeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
Expand Down Expand Up @@ -1036,8 +1029,8 @@ and updating the other columns.
6. Repeat steps 1-5 for each unique set of unchanged toast column groups.
7. Return the list of generated update statements.
*/
func (c *SnowflakeConnector) generateUpdateStatement(
syncedAtCol string, softDeleteCol string,
func (c *SnowflakeConnector) generateUpdateStatements(
syncedAtCol string, softDeleteCol string, softDelete bool,
allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)

Expand All @@ -1064,6 +1057,14 @@ func (c *SnowflakeConnector) generateUpdateStatement(
(SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
if softDelete && (softDeleteCol != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1], fmt.Sprintf(`"%s" = TRUE`, softDeleteCol))
ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(SOURCE._PEERDB_RECORD_TYPE = 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
}
return updateStmts
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
entries[7] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
Value: KeysToString(typedRecord.UnchangedToastColumns),
}
tableMapping[typedRecord.DestinationTableName] += 1
default:
Expand Down
Loading

0 comments on commit 9cd6874

Please sign in to comment.