diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index fbc31704e2..45f5239c14 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -367,6 +367,10 @@ func (p *PostgresCDCSource) consumeStream( latestRecord := localRecords[tablePKeyLastSeen[tablePkeyVal]] deleteRecord := rec.(*model.DeleteRecord) deleteRecord.Items = latestRecord.GetItems() + updateRecord, ok := latestRecord.(*model.UpdateRecord) + if ok { + deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns + } } addRecord(rec) case *model.RelationRecord: diff --git a/flow/connectors/snowflake/merge_stmt_generator_test.go b/flow/connectors/snowflake/merge_stmt_generator_test.go index e80307ed04..c4eb2e973e 100644 --- a/flow/connectors/snowflake/merge_stmt_generator_test.go +++ b/flow/connectors/snowflake/merge_stmt_generator_test.go @@ -25,7 +25,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`, } - result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols) + result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", false, allCols, unchangedToastCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) @@ -47,7 +47,7 @@ func TestGenerateUpdateStatement_EmptyColumns(t *testing.T) { THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3", "_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`, } - result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols) + result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", false, allCols, unchangedToastCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 032c3322ff..7a0af874c7 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -863,7 +863,8 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",") updateStatementsforToastCols := c.generateUpdateStatements(normalizeReq.SyncedAtColName, - normalizeReq.SoftDeleteColName, columnNames, unchangedToastColumns) + normalizeReq.SoftDeleteColName, normalizeReq.SoftDelete, + columnNames, unchangedToastColumns) // handling the case when an insert and delete happen in the same batch, with updates in the middle // with soft-delete, we want the row to be in the destination with SOFT_DELETE true @@ -1025,7 +1026,7 @@ and updating the other columns. 7. Return the list of generated update statements. */ func (c *SnowflakeConnector) generateUpdateStatements( - syncedAtCol string, softDeleteCol string, + syncedAtCol string, softDeleteCol string, softDelete bool, allCols []string, unchangedToastCols []string) []string { updateStmts := make([]string, 0) @@ -1052,6 +1053,14 @@ func (c *SnowflakeConnector) generateUpdateStatements( (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) + if softDelete && (softDeleteCol != "") { + tmpArray = append(tmpArray[:len(tmpArray)-1], fmt.Sprintf(`"%s" = TRUE`, softDeleteCol)) + ssep := strings.Join(tmpArray, ", ") + updateStmt := fmt.Sprintf(`WHEN MATCHED AND + (SOURCE._PEERDB_RECORD_TYPE = 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s' + THEN UPDATE SET %s `, cols, ssep) + updateStmts = append(updateStmts, updateStmt) + } } return updateStmts } diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 9359c1565f..2ef78d33ce 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -169,7 +169,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor } entries[7] = qvalue.QValue{ Kind: qvalue.QValueKindString, - Value: "", + Value: KeysToString(typedRecord.UnchangedToastColumns), } tableMapping[typedRecord.DestinationTableName] += 1 default: diff --git a/flow/model/model.go b/flow/model/model.go index 987ce80223..297313b0f2 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -288,6 +288,8 @@ type DeleteRecord struct { CheckPointID int64 // Items is a map of column name to value. Items *RecordItems + // unchanged toast columns, filled from latest UpdateRecord + UnchangedToastColumns map[string]struct{} } // Implement Record interface for DeleteRecord.