diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 531affbd74..7a03f25188 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1275,7 +1275,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_softdel"), + FlowJobName: cmpTableName, } config := &protos.FlowConnectionConfigs{ @@ -1312,23 +1312,23 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize update", srcTableName, dstTableName, "id,c1,c2,t") - // since we delete stuff, create another table to compare with - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize delete", srcTableName, dstTableName, "id,c1,c2,t") + e2e.EnvWaitForEqualTablesWithNames( + env, + s, + "normalize delete", + srcTableName, + dstTableName+" WHERE NOT _PEERDB_IS_DELETED", + "id,c1,c2,t", + ) env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - // verify our updates and delete happened - e2e.RequireEqualTables(s, "test_softdel", "id,c1,c2,t") - newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) @@ -1480,16 +1480,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - // since we delete stuff, create another table to compare with - _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) - e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize transaction", srcTableName, dstTableName, "id,c1,c2,t") + e2e.EnvWaitForEqualTablesWithNames( + env, + s, + "normalize transaction", + srcTableName, + dstTableName+" WHERE NOT _PEERDB_IS_DELETED", + "id,c1,c2,t", + ) env.CancelWorkflow() }() @@ -1557,7 +1560,15 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.EnvWaitForEqualTablesWithNames(env, s, "normalize delete", srcTableName, dstTableName, "id,c1,c2,t") + e2e.EnvWaitForEqualTablesWithNames( + env, + s, + "normalize delete", + srcTableName, + dstTableName+" WHERE NOT _PEERDB_IS_DELETED", + "id,c1,c2,t", + ) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) e2e.EnvNoError(s.t, env, err)