Skip to content

Commit

Permalink
Simple_Schema_Changes: WaitGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 2, 2024
1 parent 9741b6a commit fc2effb
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
6 changes: 6 additions & 0 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
MaxBatchSize: 100,
}

wg := sync.WaitGroup{}
wg.Add(1)

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert and mutate schema repeatedly.
go func() {
Expand Down Expand Up @@ -894,6 +897,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 8)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1")

wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand All @@ -905,6 +910,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

wg.Wait()
env.AssertExpectations(s.t)
}

Expand Down
6 changes: 6 additions & 0 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
MaxBatchSize: 100,
}

wg := sync.WaitGroup{}
wg.Add(1)

// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert and mutate schema repeatedly.
go func() {
Expand Down Expand Up @@ -990,6 +993,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
require.NoError(s.t, err)
require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName])
s.compareTableContentsSF("test_simple_schema_changes", "id,c1")

wg.Done()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand All @@ -1001,6 +1006,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

wg.Wait()
env.AssertExpectations(s.t)
}

Expand Down

0 comments on commit fc2effb

Please sign in to comment.