From 3117c8203999894e6137b00dd858d095f54fcceb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 28 Dec 2023 21:20:06 +0000 Subject: [PATCH] Create e2e.GoWorkflow utility function to always couple goroutine & workflow execution Also tune timeouts on mock workflow environment --- .editorconfig | 15 +++ flow/e2e/bigquery/peer_flow_bq_test.go | 145 +++++--------------- flow/e2e/postgres/peer_flow_pg_test.go | 40 ++---- flow/e2e/s3/cdc_s3_test.go | 12 +- flow/e2e/snowflake/peer_flow_sf_test.go | 171 ++++++------------------ flow/e2e/snowflake/qrep_flow_sf_test.go | 2 +- flow/e2e/test_utils.go | 17 ++- 7 files changed, 124 insertions(+), 278 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000000..d9c97c37b3 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,15 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.rs] +indent_style = space +indent_size = 4 + +[package.json] +indent_style = space +indent_size = 2 \ No newline at end of file diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index cc0352fae3..2860aba200 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -5,7 +5,6 @@ import ( "fmt" "log/slog" "strings" - "sync" "testing" "time" @@ -293,9 +292,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 10 rows into the source table - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -307,9 +304,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { require.NoError(s.t, err) } s.t.Log("Inserted 10 rows into the source table") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -361,9 +356,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* Executing a transaction which @@ -381,9 +374,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { `, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -429,10 +420,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - done := make(chan struct{}) - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -443,10 +431,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { `, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - done <- struct{}{} - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -457,7 +442,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") env.AssertExpectations(s.t) - <-done } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { @@ -493,9 +477,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -519,9 +501,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -566,9 +546,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -586,9 +564,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -634,9 +610,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating a single row @@ -653,9 +627,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { `, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -701,9 +673,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -721,9 +691,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { ARRAY['hello','bye']; `, srcTableName)) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -778,9 +746,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -789,9 +755,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { `, srcTable1Name, srcTable2Name)) require.NoError(s.t, err) s.t.Log("Executed an insert on two tables") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error require.True(s.t, env.IsWorkflowCompleted()) @@ -840,9 +804,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert and mutate schema repeatedly. - go func() { + e2e.GoWorkflow(func() { // insert first row. e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -895,9 +857,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) s.compareTableContentsBQ("test_simple_schema_changes", "id,c1") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -943,9 +903,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -966,9 +924,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -1017,9 +973,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) require.NoError(s.t, err) @@ -1042,9 +996,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { err = rowsTx.Commit(context.Background()) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -1094,9 +1046,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table @@ -1115,9 +1065,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -1163,7 +1111,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { MaxBatchSize: 100, } - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 1 row into the source table testKey := fmt.Sprintf("test_key_%d", 1) @@ -1178,9 +1126,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { DELETE FROM %s WHERE id=1 `, srcTableName)) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -1232,7 +1178,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1241,9 +1187,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { `, srcTable1Name, srcTable2Name)) require.NoError(s.t, err) s.t.Log("Executed an insert on two tables") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error require.True(s.t, env.IsWorkflowCompleted()) @@ -1306,12 +1250,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1329,17 +1268,12 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) require.NoError(s.t, err) + }, env, peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - wg.Done() - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") - wg.Wait() - // verify our updates and delete happened s.compareTableContentsBQ("test_softdel", "id,c1,c2,t") @@ -1396,7 +1330,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) @@ -1417,9 +1351,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { require.NoError(s.t, err) require.NoError(s.t, insertTx.Commit(context.Background())) - }() + }, env, peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1478,9 +1411,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1505,9 +1436,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { require.NoError(s.t, err) require.NoError(s.t, insertTx.Commit(context.Background())) - }() + }, env, peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1565,9 +1495,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1581,9 +1509,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) require.NoError(s.t, err) - }() + }, env, peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 53658bf6ff..ebf647e18c 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -71,9 +71,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 10 rows into the source table - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -85,9 +83,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { s.NoError(err) } s.T().Log("Inserted 10 rows into the source table") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -133,9 +129,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert and mutate schema repeatedly. - go func() { + e2e.GoWorkflow(func() { // insert first row. e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -249,9 +243,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") s.NoError(err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -299,7 +291,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -321,9 +313,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) s.NoError(err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -378,7 +368,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) s.NoError(err) @@ -401,9 +391,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { err = rowsTx.Commit(context.Background()) s.NoError(err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -459,7 +447,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table @@ -478,9 +466,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) s.NoError(err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -529,7 +515,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { MaxBatchSize: 100, } - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 1 row into the source table testKey := fmt.Sprintf("test_key_%d", 1) @@ -545,9 +531,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { `, srcTableName)) s.NoError(err) s.T().Log("Inserted and deleted a row for peerdb column check") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index f28161b97b..f00cb42e6f 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -54,7 +54,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { MaxBatchSize: 5, } - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) s.NoError(err) // insert 20 rows @@ -67,9 +67,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { s.NoError(err) } s.NoError(err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -126,7 +124,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { MaxBatchSize: 5, } - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) s.NoError(err) // insert 20 rows @@ -140,9 +138,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { } s.T().Log("Inserted 20 rows into the source table") s.NoError(err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 314c92d2b5..b180681d67 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -5,7 +5,6 @@ import ( "fmt" "log/slog" "strings" - "sync" "testing" "time" @@ -140,9 +139,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 20 rows into the source table - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 20 rows into the source table for i := 0; i < 20; i++ { @@ -154,9 +151,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { require.NoError(s.t, err) } s.t.Log("Inserted 20 rows into the source table") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -218,9 +213,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 20 rows into the source table - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 20 rows into the source table for i := 0; i < 20; i++ { @@ -232,9 +225,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { require.NoError(s.t, err) } s.t.Log("Inserted 20 rows into the source table") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -281,9 +272,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 10 rows into the source table - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { @@ -308,9 +297,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { require.NoError(s.t, err) } s.t.Log("Inserted 6 valid geography rows and 10 total rows into source") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -367,9 +354,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* Executing a transaction which @@ -387,9 +372,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { `, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -435,10 +418,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -452,11 +432,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { slog.Error("Error executing transaction", slog.Any("error", err)) s.FailNow() } - - wg.Done() - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -467,8 +443,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { s.compareTableContentsSF("test_toast_sf_2", `id,t1,t2,k`) env.AssertExpectations(s.t) - - wg.Wait() } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { @@ -503,9 +477,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -529,9 +501,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -575,9 +545,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -595,9 +563,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -642,9 +608,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating a single row @@ -661,9 +625,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { `, srcTableName, srcTableName, srcTableName, srcTableName)) require.NoError(s.t, err) s.t.Log("Executed a transaction touching toast columns") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -709,9 +671,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -729,9 +689,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -785,9 +743,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and execute a transaction touching toast columns - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -795,9 +751,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -844,9 +798,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert and mutate schema repeatedly. - go func() { + e2e.GoWorkflow(func() { // insert first row. e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -960,9 +912,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -1007,9 +957,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1030,9 +978,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -1081,9 +1027,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) require.NoError(s.t, err) @@ -1106,9 +1050,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { err = rowsTx.Commit(context.Background()) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -1157,9 +1099,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table @@ -1178,9 +1118,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) @@ -1238,9 +1176,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 10 rows into the source table @@ -1259,9 +1195,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) require.NoError(s.t, err) - }() + }, env, peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1321,12 +1256,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { MaxBatchSize: 100, } - wg := sync.WaitGroup{} - wg.Add(1) - - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1344,17 +1274,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) require.NoError(s.t, err) + }, env, peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - wg.Done() - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") - wg.Wait() - // verify our updates and delete happened s.compareTableContentsSF("test_softdel", "id,c1,c2,t") @@ -1408,9 +1333,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) @@ -1431,9 +1354,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { require.NoError(s.t, err) require.NoError(s.t, insertTx.Commit(context.Background())) - }() + }, env, peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1491,9 +1413,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert, update and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1518,9 +1438,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { require.NoError(s.t, err) require.NoError(s.t, insertTx.Commit(context.Background())) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + }, env, peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1577,9 +1495,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert and delete rows in the table. - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1593,9 +1509,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) require.NoError(s.t, err) - }() + }, env, peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1642,9 +1557,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { MaxBatchSize: 100, } - // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup - // and then insert 20 rows into the source table - go func() { + e2e.GoWorkflow(func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) // insert 20 rows into the source table for i := 0; i < 20; i++ { @@ -1655,10 +1568,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { `, s.pgSuffix, "testMixedCase"), testKey, testValue) require.NoError(s.t, err) } - fmt.Println("Inserted 20 rows into the source table") - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + s.t.Log("Inserted 20 rows into the source table") + }, env, peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index f45a78d7c1..14e49d446a 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -43,7 +43,7 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableNam qualifiedTableName = fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) } - sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) + sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) s.t.Logf("running query on snowflake: %s\n", sfSelQuery) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) require.NoError(s.t, err) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4674667b97..feabb23d11 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -7,6 +7,7 @@ import ( "log/slog" "os" "strings" + "sync" "testing" "time" @@ -33,8 +34,8 @@ func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnv t.Fatalf("unable to create catalog connection pool: %v", err) } - // set a 300 second timeout for the workflow to execute a few runs. - env.SetTestTimeout(300 * time.Second) + env.SetTestTimeout(time.Minute) + env.SetWorkflowRunTimeout(4 * time.Minute) env.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig) env.RegisterWorkflow(peerflow.SyncFlowWorkflow) @@ -317,6 +318,18 @@ func RunXminFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos. env.ExecuteWorkflow(peerflow.XminFlowWorkflow, config, state) } +// Execute fn in parallel with ExecuteWorkflow, returns when both complete +func GoWorkflow(fn func(), env *testsuite.TestWorkflowEnvironment, workflowFn interface{}, args ...interface{}) { + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + fn() + wg.Done() + }() + env.ExecuteWorkflow(workflowFn, args...) + wg.Wait() +} + func GetOwnersSchema() *model.QRecordSchema { return &model.QRecordSchema{ Fields: []model.QField{