diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 13fc3a51f2..a06ca8d8b8 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1279,11 +1279,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) s.NoError(err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) s.NoError(err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 4) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) // since we delete stuff, create another table to compare with _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) @@ -1442,7 +1442,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) s.NoError(err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) insertTx, err := s.pool.Begin(context.Background()) s.NoError(err) @@ -1521,18 +1521,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { } // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. + // and then insert and delete rows in the table. go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) s.NoError(err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) s.NoError(err) - e2e.NormalizeFlowCountQuery(env, connectionGen, 4) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) s.NoError(err)