diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 71230b05f8..8884508449 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -232,8 +232,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -274,8 +273,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -319,8 +317,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -330,7 +327,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -385,8 +382,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -396,7 +392,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* Executing a transaction which 1. changes both toast column @@ -452,8 +448,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -464,7 +459,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { // and execute a transaction touching toast columns done := make(chan struct{}) go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* transaction updating no rows */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -515,8 +510,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 11, @@ -526,7 +520,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -587,8 +581,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 6, @@ -598,7 +591,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -654,8 +647,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -665,7 +657,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* transaction updating a single row multiple times with changed/unchanged toast columns @@ -720,8 +712,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -731,7 +722,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', @@ -800,8 +791,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -811,7 +801,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -881,8 +871,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -892,7 +881,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); @@ -941,8 +930,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -953,7 +941,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // and then insert and mutate schema repeatedly. go func() { // insert first row. - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES (1)`, srcTableName)) e2e.EnvNoError(s.t, env, err) @@ -1032,8 +1020,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -1043,7 +1030,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) @@ -1098,8 +1085,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -1109,7 +1095,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1173,8 +1159,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -1184,7 +1169,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1233,8 +1218,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { SoftDelete: true, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -1242,7 +1226,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { } go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 1 row into the source table testKey := fmt.Sprintf("test_key_%d", 1) testValue := fmt.Sprintf("test_value_%d", 1) @@ -1298,8 +1282,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { CdcStagingPath: "", } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -1309,7 +1292,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); @@ -1383,7 +1366,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) @@ -1469,7 +1452,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1553,7 +1536,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) @@ -1646,7 +1629,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 73da88436f..b6358fcae5 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -65,8 +65,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -76,7 +75,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -147,8 +146,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -159,7 +157,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // and then insert and mutate schema repeatedly. go func() { // insert first row. - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) @@ -283,8 +281,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -294,7 +291,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) @@ -353,8 +350,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -364,7 +360,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -432,8 +428,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { Destination: s.peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -443,7 +438,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -498,8 +493,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { SoftDelete: true, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -507,7 +501,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { } go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 1 row into the source table testKey := fmt.Sprintf("test_key_%d", 1) testValue := fmt.Sprintf("test_value_%d", 1) @@ -582,7 +576,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) @@ -672,7 +666,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -757,7 +751,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) @@ -844,7 +838,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 4c5d8feb94..b26e44ad25 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -40,8 +40,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { Destination: s.s3Helper.GetPeer(), } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -50,8 +49,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - e2e.EnvNoError(s.t, env, err) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 0319154dc7..e70beddb0f 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -150,8 +150,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -161,7 +160,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 20 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -216,8 +215,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -227,7 +225,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 20 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -278,8 +276,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -289,7 +286,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 10 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 4 invalid shapes and 6 valid shapes into the source table for i := 0; i < 4; i++ { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -359,8 +356,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -370,7 +366,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* Executing a transaction which 1. changes both toast column @@ -426,8 +422,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -439,7 +434,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { go func() { defer wg.Done() - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* transaction updating no rows */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -487,8 +482,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 11, @@ -498,7 +492,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -558,8 +552,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 6, @@ -569,7 +562,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // complex transaction with random DMLs on a table with toast columns _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -624,8 +617,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -635,7 +627,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* transaction updating a single row multiple times with changed/unchanged toast columns @@ -690,8 +682,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -701,7 +692,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* test inserting various types*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s SELECT 2,2,b'1',b'101', @@ -769,8 +760,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -780,7 +770,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and execute a transaction touching toast columns go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) /* inserting across multiple tables*/ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); @@ -826,8 +816,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -837,7 +826,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and mutate schema repeatedly. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) e2e.EnvNoError(s.t, env, err) @@ -1003,8 +992,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -1014,7 +1002,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) @@ -1066,8 +1054,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -1077,7 +1064,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1141,8 +1128,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -1152,7 +1138,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1225,7 +1211,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1306,7 +1292,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) @@ -1386,7 +1372,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) e2e.EnvNoError(s.t, env, err) @@ -1470,7 +1456,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) @@ -1557,7 +1543,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert and delete rows in the table. go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) @@ -1616,8 +1602,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { Destination: s.sfHelper.Peer, } - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs() limits := peerflow.CDCFlowLimits{ ExitAfterRecords: -1, @@ -1627,7 +1612,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert 20 rows into the source table go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) + e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table for i := 0; i < 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4aa058c06f..dbc7b6f5cb 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -167,9 +167,10 @@ func EnvWaitForEqualTablesWithNames( }) } -func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, +func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironment, connectionGen FlowConnectionGenerationConfig, -) error { +) { + t.Helper() // errors expected while PeerFlowStatusQuery is setup counter := 0 for { @@ -187,10 +188,12 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, } if *state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { - return nil + return } } else if counter > 15 { - return err + t.Error("UNEXPECTED SETUP CDC TIMEOUT", err.Error()) + env.CancelWorkflow() + runtime.Goexit() } else if counter > 5 { // log the error for informational purposes slog.Error(err.Error())