diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 5ff80769fb..c71f97cad8 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -842,6 +842,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { MaxBatchSize: 100, } + wg := sync.WaitGroup{} + wg.Add(1) + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. go func() { @@ -897,6 +900,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) s.compareTableContentsBQ("test_simple_schema_changes", "id,c1") + + wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -908,6 +913,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") + wg.Wait() env.AssertExpectations(s.t) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 314c92d2b5..80d7a3596e 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -844,6 +844,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { MaxBatchSize: 100, } + wg := sync.WaitGroup{} + wg.Add(1) + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. go func() { @@ -960,6 +963,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") + + wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -971,6 +976,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") + wg.Wait() env.AssertExpectations(s.t) }