Skip to content

Commit

Permalink
try fixing soft delete mixup
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 5, 2024
1 parent 276c577 commit fb22a59
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() {
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_softdel"),
FlowJobName: cmpTableName,
}

config := &protos.FlowConnectionConfigs{
Expand Down Expand Up @@ -1312,23 +1312,23 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() {
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update", srcTableName, dstTableName, "id,c1,c2,t")
// since we delete stuff, create another table to compare with
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize delete", srcTableName, dstTableName, "id,c1,c2,t")
e2e.EnvWaitForEqualTablesWithNames(
env,
s,
"normalize delete",
srcTableName,
dstTableName+" WHERE NOT _PEERDB_IS_DELETED",
"id,c1,c2,t",
)

env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)

// verify our updates and delete happened
e2e.RequireEqualTables(s, "test_softdel", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName)
numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery)
Expand Down Expand Up @@ -1480,16 +1480,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() {
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)

e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize transaction", srcTableName, dstTableName, "id,c1,c2,t")
e2e.EnvWaitForEqualTablesWithNames(
env,
s,
"normalize transaction",
srcTableName,
dstTableName+" WHERE NOT _PEERDB_IS_DELETED",
"id,c1,c2,t",
)

env.CancelWorkflow()
}()
Expand Down Expand Up @@ -1557,7 +1560,15 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize delete", srcTableName, dstTableName, "id,c1,c2,t")
e2e.EnvWaitForEqualTablesWithNames(
env,
s,
"normalize delete",
srcTableName,
dstTableName+" WHERE NOT _PEERDB_IS_DELETED",
"id,c1,c2,t",
)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
e2e.EnvNoError(s.t, env, err)
Expand Down

0 comments on commit fb22a59

Please sign in to comment.