From 9cd6874b9b1980cf7d397be9d41a843cb007ff24 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Mon, 4 Dec 2023 12:56:42 +0530 Subject: [PATCH] handles soft-delete in case insert and delete in same batch (#713) closes #665 --- flow/connectors/postgres/cdc.go | 31 +- .../snowflake/merge_stmt_generator_test.go | 4 +- flow/connectors/snowflake/snowflake.go | 53 +-- flow/connectors/utils/stream.go | 2 +- flow/e2e/snowflake/peer_flow_sf_test.go | 325 ++++++++++++++++++ flow/model/model.go | 4 +- 6 files changed, 387 insertions(+), 32 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index cfb0425515..d0f2028055 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -362,12 +362,12 @@ func (p *PostgresCDCSource) consumeStream( TableName: tableName, PkeyColVal: compositePKeyString, } - _, ok := tablePKeyLastSeen[tablePkeyVal] + recIndex, ok := tablePKeyLastSeen[tablePkeyVal] if !ok { addRecord(rec) tablePKeyLastSeen[tablePkeyVal] = len(localRecords) - 1 } else { - oldRec := localRecords[tablePKeyLastSeen[tablePkeyVal]] + oldRec := localRecords[recIndex] // iterate through unchanged toast cols and set them in new record updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems()) for _, col := range updatedCols { @@ -396,6 +396,33 @@ func (p *PostgresCDCSource) consumeStream( tablePKeyLastSeen[tablePkeyVal] = len(localRecords) - 1 } case *model.DeleteRecord: + compositePKeyString, err := p.compositePKeyToString(req, rec) + if err != nil { + return err + } + + tablePkeyVal := model.TableWithPkey{ + TableName: tableName, + PkeyColVal: compositePKeyString, + } + recIndex, ok := tablePKeyLastSeen[tablePkeyVal] + if ok { + latestRecord := localRecords[recIndex] + deleteRecord := rec.(*model.DeleteRecord) + deleteRecord.Items = latestRecord.GetItems() + updateRecord, ok := latestRecord.(*model.UpdateRecord) + if ok { + deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns + } + } else { + deleteRecord := rec.(*model.DeleteRecord) + // there is nothing to backfill the items in the delete record with, + // so don't update the row with this record + // add sentinel value to prevent update statements from selecting + deleteRecord.UnchangedToastColumns = map[string]struct{}{ + "_peerdb_not_backfilled_delete": {}, + } + } addRecord(rec) case *model.RelationRecord: tableSchemaDelta := r.TableSchemaDelta diff --git a/flow/connectors/snowflake/merge_stmt_generator_test.go b/flow/connectors/snowflake/merge_stmt_generator_test.go index 7db2ef6542..c4eb2e973e 100644 --- a/flow/connectors/snowflake/merge_stmt_generator_test.go +++ b/flow/connectors/snowflake/merge_stmt_generator_test.go @@ -25,7 +25,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`, } - result := c.generateUpdateStatement("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols) + result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", false, allCols, unchangedToastCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) @@ -47,7 +47,7 @@ func TestGenerateUpdateStatement_EmptyColumns(t *testing.T) { THEN UPDATE SET "COL1" = SOURCE."COL1", "COL2" = SOURCE."COL2", "COL3" = SOURCE."COL3", "_PEERDB_SYNCED_AT" = CURRENT_TIMESTAMP, "_PEERDB_SOFT_DELETE" = FALSE`, } - result := c.generateUpdateStatement("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", allCols, unchangedToastCols) + result := c.generateUpdateStatements("_PEERDB_SYNCED_AT", "_PEERDB_SOFT_DELETE", false, allCols, unchangedToastCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index ba78223a42..c624234923 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -57,7 +57,8 @@ const ( _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d` getTableNametoUnchangedColsSQL = `SELECT _PEERDB_DESTINATION_TABLE_NAME, ARRAY_AGG(DISTINCT _PEERDB_UNCHANGED_TOAST_COLUMNS) FROM %s.%s WHERE - _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d GROUP BY _PEERDB_DESTINATION_TABLE_NAME` + _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_RECORD_TYPE != 2 + GROUP BY _PEERDB_DESTINATION_TABLE_NAME` getTableSchemaSQL = `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=? AND TABLE_NAME=?` @@ -855,17 +856,6 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(columnName))) } - // add soft delete and synced at columns to the list of columns - if normalizeReq.SoftDelete { - colName := normalizeReq.SoftDeleteColName - quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(colName))) - } - - if normalizeReq.SyncedAtColName != "" { - colName := normalizeReq.SyncedAtColName - quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(colName))) - } - insertColumnsSQL := strings.TrimSuffix(strings.Join(quotedUpperColNames, ","), ",") insertValuesSQLArray := make([]string, 0, len(columnNames)) @@ -874,21 +864,24 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("SOURCE.%s,", quotedUpperColumnName)) } - // add soft delete and synced at columns to the list of columns - if normalizeReq.SoftDelete { - // add false as default value for soft delete column - insertValuesSQLArray = append(insertValuesSQLArray, "FALSE,") - } + insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",") - if normalizeReq.SyncedAtColName != "" { - // add current timestamp as default value for synced at column - insertValuesSQLArray = append(insertValuesSQLArray, "CURRENT_TIMESTAMP,") - } + updateStatementsforToastCols := c.generateUpdateStatements(normalizeReq.SyncedAtColName, + normalizeReq.SoftDeleteColName, normalizeReq.SoftDelete, + columnNames, unchangedToastColumns) - insertValuesSQL := strings.TrimSuffix(strings.Join(insertValuesSQLArray, ""), ",") + // handling the case when an insert and delete happen in the same batch, with updates in the middle + // with soft-delete, we want the row to be in the destination with SOFT_DELETE true + // the current merge statement doesn't do that, so we add another case to insert the DeleteRecord + if normalizeReq.SoftDelete { + softDeleteInsertColumnsSQL := strings.TrimSuffix(strings.Join(append(quotedUpperColNames, + normalizeReq.SoftDeleteColName), ","), ",") + softDeleteInsertValuesSQL := strings.Join(append(insertValuesSQLArray, "TRUE"), "") - updateStatementsforToastCols := c.generateUpdateStatement(normalizeReq.SyncedAtColName, - normalizeReq.SoftDeleteColName, columnNames, unchangedToastColumns) + updateStatementsforToastCols = append(updateStatementsforToastCols, + fmt.Sprintf("WHEN NOT MATCHED AND (SOURCE._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)", + softDeleteInsertColumnsSQL, softDeleteInsertValuesSQL)) + } updateStringToastCols := strings.Join(updateStatementsforToastCols, " ") pkeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns)) @@ -1036,8 +1029,8 @@ and updating the other columns. 6. Repeat steps 1-5 for each unique set of unchanged toast column groups. 7. Return the list of generated update statements. */ -func (c *SnowflakeConnector) generateUpdateStatement( - syncedAtCol string, softDeleteCol string, +func (c *SnowflakeConnector) generateUpdateStatements( + syncedAtCol string, softDeleteCol string, softDelete bool, allCols []string, unchangedToastCols []string) []string { updateStmts := make([]string, 0) @@ -1064,6 +1057,14 @@ func (c *SnowflakeConnector) generateUpdateStatement( (SOURCE._PEERDB_RECORD_TYPE != 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) + if softDelete && (softDeleteCol != "") { + tmpArray = append(tmpArray[:len(tmpArray)-1], fmt.Sprintf(`"%s" = TRUE`, softDeleteCol)) + ssep := strings.Join(tmpArray, ", ") + updateStmt := fmt.Sprintf(`WHEN MATCHED AND + (SOURCE._PEERDB_RECORD_TYPE = 2) AND _PEERDB_UNCHANGED_TOAST_COLUMNS='%s' + THEN UPDATE SET %s `, cols, ssep) + updateStmts = append(updateStmts, updateStmt) + } } return updateStmts } diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 9359c1565f..2ef78d33ce 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -169,7 +169,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor } entries[7] = qvalue.QValue{ Kind: qvalue.QValueKindString, - Value: "", + Value: KeysToString(typedRecord.UnchangedToastColumns), } tableMapping[typedRecord.DestinationTableName] += 1 default: diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index e5b7c588f9..a06ca8d8b8 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1228,3 +1228,328 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { s.Equal(4, len(sfRows.Schema.Fields)) s.Equal(10, len(sfRows.Records)) } + +func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + cmpTableName := s.attachSchemaSuffix("test_softdel") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.sfHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + s.NoError(err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + s.NoError(err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + // since we delete stuff, create another table to compare with + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + s.NoError(err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + s.NoError(err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsSF("test_softdel", "id,c1,c2,t", false) + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) + s.NoError(err) + s.Equal(1, numNewRows) +} + +func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + cmpTableName := s.attachSchemaSuffix("test_softdel_iud") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iud") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_iud"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.sfHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + insertTx, err := s.pool.Begin(context.Background()) + s.NoError(err) + + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + s.NoError(err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + s.NoError(err) + // since we delete stuff, create another table to compare with + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + s.NoError(err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + s.NoError(err) + + s.NoError(insertTx.Commit(context.Background())) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsSF("test_softdel_iud", "id,c1,c2,t", false) + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) + s.NoError(err) + s.Equal(1, numNewRows) +} + +func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + cmpTableName := s.attachSchemaSuffix("test_softdel_ud") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_ud") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_ud"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.sfHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 4, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + s.NoError(err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + + insertTx, err := s.pool.Begin(context.Background()) + s.NoError(err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) + s.NoError(err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + s.NoError(err) + // since we delete stuff, create another table to compare with + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + s.NoError(err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + s.NoError(err) + + s.NoError(insertTx.Commit(context.Background())) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsSF("test_softdel_ud", "id,c1,c2,t", false) + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) + s.NoError(err) + s.Equal(1, numNewRows) +} + +func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_softdel_iad") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iad") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_iad"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.sfHelper.Peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + s.NoError(err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + s.NoError(err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) + s.NoError(err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsSF("test_softdel_iad", "id,c1,c2,t", false) + + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) + numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) + s.NoError(err) + s.Equal(0, numNewRows) +} diff --git a/flow/model/model.go b/flow/model/model.go index b91dbaa901..297313b0f2 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -288,6 +288,8 @@ type DeleteRecord struct { CheckPointID int64 // Items is a map of column name to value. Items *RecordItems + // unchanged toast columns, filled from latest UpdateRecord + UnchangedToastColumns map[string]struct{} } // Implement Record interface for DeleteRecord. @@ -296,7 +298,7 @@ func (r *DeleteRecord) GetCheckPointID() int64 { } func (r *DeleteRecord) GetTableName() string { - return r.SourceTableName + return r.DestinationTableName } func (r *DeleteRecord) GetItems() *RecordItems {