Skip to content

Commit

Permalink
more fixes to soft delete behaviour + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 29, 2023
1 parent d1bcdf6 commit 4a4cae6
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 1 deletion.
8 changes: 8 additions & 0 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,14 @@ func (p *PostgresCDCSource) consumeStream(
if ok {
deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns
}
} else {
deleteRecord := rec.(*model.DeleteRecord)
// there is nothing to backfill the items in the delete record with,
// so don't update the row with this record
// add sentinel value to prevent update statements from selecting
deleteRecord.UnchangedToastColumns = map[string]struct{}{
"_peerdb_not_backfilled_delete": {},
}
}
addRecord(rec)
case *model.RelationRecord:
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ const (
_PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d`
getTableNametoUnchangedColsSQL = `SELECT _PEERDB_DESTINATION_TABLE_NAME,
ARRAY_AGG(DISTINCT _PEERDB_UNCHANGED_TOAST_COLUMNS) FROM %s.%s WHERE
_PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d GROUP BY _PEERDB_DESTINATION_TABLE_NAME`
_PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_RECORD_TYPE != 2
GROUP BY _PEERDB_DESTINATION_TABLE_NAME`
getTableSchemaSQL = `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA=? AND TABLE_NAME=?`

Expand Down
325 changes: 325 additions & 0 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,3 +1231,328 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {
s.Equal(4, len(sfRows.Schema.Fields))
s.Equal(10, len(sfRows.Records))
}

func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

cmpTableName := s.attachSchemaSuffix("test_softdel")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
s.NoError(err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.sfHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
}

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 6,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
s.NoError(err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
s.NoError(err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
// since we delete stuff, create another table to compare with
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
s.NoError(err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
s.NoError(err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
s.Error(err)
s.Contains(err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsSF("test_softdel", "id,c1,c2,t", false)

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName)
numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery)
s.NoError(err)
s.Equal(1, numNewRows)
}

func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

cmpTableName := s.attachSchemaSuffix("test_softdel_iud")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iud")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
s.NoError(err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_iud"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.sfHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
}

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 2,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

insertTx, err := s.pool.Begin(context.Background())
s.NoError(err)

_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
s.NoError(err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
s.NoError(err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
s.NoError(err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
s.NoError(err)

s.NoError(insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
s.Error(err)
s.Contains(err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsSF("test_softdel_iud", "id,c1,c2,t", false)

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName)
numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery)
s.NoError(err)
s.Equal(1, numNewRows)
}

func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

cmpTableName := s.attachSchemaSuffix("test_softdel_ud")
srcTableName := fmt.Sprintf("%s_src", cmpTableName)
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_ud")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
s.NoError(err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_ud"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.sfHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
}

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 4,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
s.NoError(err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)

insertTx, err := s.pool.Begin(context.Background())
s.NoError(err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName))
s.NoError(err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
s.NoError(err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
s.NoError(err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
s.NoError(err)

s.NoError(insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
s.Error(err)
s.Contains(err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsSF("test_softdel_ud", "id,c1,c2,t", false)

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName)
numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery)
s.NoError(err)
s.Equal(1, numNewRows)
}

func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

srcTableName := s.attachSchemaSuffix("test_softdel_iad")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iad")

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
c1 INT,
c2 INT,
t TEXT
);
`, srcTableName))
s.NoError(err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel_iad"),
}

config := &protos.FlowConnectionConfigs{
FlowJobName: connectionGen.FlowJobName,
Destination: s.sfHelper.Peer,
TableMappings: []*protos.TableMapping{
{
SourceTableIdentifier: srcTableName,
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
}

limits := peerflow.CDCFlowLimits{
TotalSyncFlows: 6,
MaxBatchSize: 100,
}

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
s.NoError(err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
s.NoError(err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
s.NoError(err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
s.True(env.IsWorkflowCompleted())
err = env.GetWorkflowError()
s.Error(err)
s.Contains(err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsSF("test_softdel_iad", "id,c1,c2,t", false)

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName)
numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery)
s.NoError(err)
s.Equal(0, numNewRows)
}

0 comments on commit 4a4cae6

Please sign in to comment.