diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 9d581a1112..1a99c92b1f 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -428,9 +428,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns - wg := sync.WaitGroup{} - wg.Add(1) - + done := make(chan struct{}) go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ @@ -442,7 +440,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { `, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - wg.Done() + done <- struct{}{} }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -456,7 +454,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") env.AssertExpectations(s.t) - wg.Wait() + <-done } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { @@ -923,9 +921,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. go func() { @@ -981,8 +976,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) s.compareTableContentsBQ("test_simple_schema_changes", "id,c1") - - wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -994,7 +987,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - wg.Wait() env.AssertExpectations(s.t) } @@ -1034,8 +1026,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. - wg := sync.WaitGroup{} - wg.Add(1) go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table @@ -1057,11 +1047,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) require.NoError(s.t, err) - wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - wg.Wait() // Verify workflow completes without error require.True(s.t, env.IsWorkflowCompleted()) @@ -1110,9 +1098,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { @@ -1138,11 +1123,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { err = rowsTx.Commit(context.Background()) require.NoError(s.t, err) - wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - wg.Wait() // Verify workflow completes without error require.True(s.t, env.IsWorkflowCompleted()) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index efb2078a92..8c904a38dc 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -842,9 +842,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. go func() { @@ -990,8 +987,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { require.NoError(s.t, err) require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") - - wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1003,7 +998,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - wg.Wait() env.AssertExpectations(s.t) } @@ -1040,9 +1034,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { @@ -1066,11 +1057,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) require.NoError(s.t, err) - wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - wg.Wait() // Verify workflow completes without error require.True(s.t, env.IsWorkflowCompleted()) @@ -1197,8 +1186,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. - wg := sync.WaitGroup{} - wg.Add(1) go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) @@ -1218,11 +1205,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) require.NoError(s.t, err) - wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - wg.Wait() // Verify workflow completes without error require.True(s.t, env.IsWorkflowCompleted()) @@ -1282,9 +1267,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. - wg := sync.WaitGroup{} - wg.Add(1) - go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) @@ -1304,11 +1286,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) require.NoError(s.t, err) - wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - wg.Wait() require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 795d191ab6..63c3f75a32 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -95,17 +95,17 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, time.Sleep(5 * time.Second) for { response, err := env.QueryWorkflow( - peerflow.CDCFlowStatusQuery, + peerflow.CDCNormFlowStatusQuery, connectionGen.FlowJobName, ) if err == nil { - var state peerflow.CDCFlowWorkflowState + var state int err = response.Get(&state) if err != nil { slog.Error(err.Error()) } - if len(state.NormalizeFlowStatuses) >= minCount { + if state >= minCount { break } } else { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 4e1fb447bb..17eff00cdb 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -19,6 +19,7 @@ import ( const ( CDCFlowStatusQuery = "q-cdc-flow-status" + CDCNormFlowStatusQuery = "q-norm-flow-status" maxSyncFlowsPerCDCFlow = 32 ) @@ -169,7 +170,6 @@ func CDCFlowWorkflowWithConfig( limits.TotalSyncFlows = maxSyncFlowsPerCDCFlow } - // Support a Query for the current state of the peer flow. err := workflow.SetQueryHandler(ctx, CDCFlowStatusQuery, func(jobName string) (CDCFlowWorkflowState, error) { return *state, nil }) diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index b2434b5d88..29a88e0fec 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -22,6 +22,13 @@ func NormalizeFlowWorkflow(ctx workflow.Context, errors := make([]string, 0) syncChan := workflow.GetSignalChannel(normalizeFlowCtx, "Sync") + err := workflow.SetQueryHandler(ctx, CDCNormFlowStatusQuery, func(jobName string) (int, error) { + return len(results), nil + }) + if err != nil { + errors = append(errors, err.Error()) + } + stopLoop := false needSync := true for {