diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d84c060f29..a2e57f6d0f 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -593,7 +593,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, stream = model.NewQRecordStream(bufferSize) wg.Add(1) - pullPgRecords := func() { + go func() { pgConn := srcConn.(*connpostgres.PostgresConnector) tmp, err := pgConn.PullQRepRecordStream(config, partition, stream) numRecords := int64(tmp) @@ -610,9 +610,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } } wg.Done() - } - - go pullPgRecords() + }() } else { recordBatch, err := srcConn.PullQRepRecords(config, partition) if err != nil { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index dfc1d2791d..2bbc24c1b4 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -316,7 +316,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") }() @@ -391,7 +391,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed a transaction touching toast columns") }() @@ -453,7 +453,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed a transaction touching toast columns") done <- struct{}{} }() @@ -529,7 +529,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed a transaction touching toast columns") }() @@ -596,7 +596,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed a transaction touching toast columns") }() @@ -663,7 +663,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed a transaction touching toast columns") }() @@ -732,7 +732,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { ARRAY[0.0003, 8902.0092], ARRAY['hello','bye']; `, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -809,7 +809,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -819,7 +819,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -887,7 +887,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed an insert on two tables") }() @@ -947,7 +947,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") // verify we got our first row. @@ -957,11 +957,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -971,11 +971,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -985,11 +985,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1053,7 +1053,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") @@ -1063,9 +1063,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1122,7 +1122,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1130,18 +1130,18 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) err = rowsTx.Commit(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1205,16 +1205,16 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1271,13 +1271,13 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // delete that row _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1 `, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1339,7 +1339,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed an insert on two tables") }() @@ -1412,25 +1412,24 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { + defer wg.Done() e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 2) // since we delete stuff, create another table to compare with _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) - - wg.Done() + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -1500,23 +1499,23 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // since we delete stuff, create another table to compare with _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) - require.NoError(s.t, insertTx.Commit(context.Background())) + e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -1585,26 +1584,26 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) insertTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // since we delete stuff, create another table to compare with _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) - require.NoError(s.t, insertTx.Commit(context.Background())) + e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -1672,15 +1671,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 43f3a0a6c4..5ebf1e457b 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -83,8 +83,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - require.NoError(s.t, err) + `, srcTableName), testKey, testValue) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") }() @@ -141,7 +141,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") // verify we got our first row. @@ -155,19 +155,19 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -185,19 +185,19 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -216,19 +216,19 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -247,10 +247,10 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -308,20 +308,20 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -382,7 +382,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -390,18 +390,18 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000)) `, srcTableName, randomString), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) err = rowsTx.Commit(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -469,16 +469,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000)) `, srcTableName, randomString), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -537,13 +537,13 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // delete that row _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1 `, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted and deleted a row for peerdb column check") }() @@ -609,25 +609,24 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { + defer wg.Done() e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 2) // since we delete stuff, create another table to compare with _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) - - wg.Done() + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -698,23 +697,23 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // since we delete stuff, create another table to compare with _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) - require.NoError(s.t, insertTx.Commit(context.Background())) + e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -784,26 +783,26 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) insertTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // since we delete stuff, create another table to compare with _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) - require.NoError(s.t, insertTx.Commit(context.Background())) + e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -872,15 +871,15 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 064c28d16c..8c27c52c8b 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -197,10 +197,9 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { signal := connpostgres.NewSlotSignal() - // Moved to a go routine + setupError := make(chan error) go func() { - err := s.connector.SetupReplication(signal, setupReplicationInput) - require.NoError(s.t, err) + setupError <- s.connector.SetupReplication(signal, setupReplicationInput) }() slog.Info("waiting for slot creation to complete", flowLog) @@ -211,6 +210,7 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { time.Sleep(2 * time.Second) signal.CloneComplete <- struct{}{} + require.NoError(s.t, <-setupError) slog.Info("successfully setup replication", flowLog) } diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 4a870a06aa..69e8f2cb40 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -51,7 +51,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -59,9 +59,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 8c904a38dc..c47c8964dc 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -147,7 +147,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 20 rows into the source table") }() @@ -225,7 +225,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (id, key, value) VALUES ($1, $2, $3) `, srcTableName), i, testKey, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 20 rows into the source table") }() @@ -291,7 +291,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -301,7 +301,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -378,7 +378,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed a transaction touching toast columns") }() @@ -432,6 +432,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { wg.Add(1) go func() { + defer wg.Done() e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -440,13 +441,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - - if err != nil { - slog.Error("Error executing transaction", slog.Any("error", err)) - s.t.FailNow() - } - - wg.Done() + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -460,7 +455,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { s.compareTableContentsSF("test_toast_sf_2", `id,t1,t2,k`) env.AssertExpectations(s.t) - wg.Wait() } @@ -520,7 +514,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed a transaction touching toast columns") }() @@ -586,7 +580,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed a transaction touching toast columns") }() @@ -652,7 +646,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Executed a transaction touching toast columns") }() @@ -721,7 +715,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -792,7 +786,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -849,7 +843,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") // verify we got our first row. @@ -872,18 +866,18 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + e2e.EnvNoError(s.t, env, err) + e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -908,18 +902,18 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + e2e.EnvNoError(s.t, env, err) + e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -946,18 +940,18 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + e2e.EnvNoError(s.t, env, err) + e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -984,8 +978,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + e2e.EnvNoError(s.t, env, err) + e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") }() @@ -1044,7 +1038,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") @@ -1054,9 +1048,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1113,7 +1107,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1121,18 +1115,18 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) err = rowsTx.Commit(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1195,16 +1189,16 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1276,16 +1270,16 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100)) `, srcTableName), i, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -1354,25 +1348,24 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { + defer wg.Done() e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 2) // since we delete stuff, create another table to compare with _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) - - wg.Done() + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -1441,23 +1434,23 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // since we delete stuff, create another table to compare with _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) - require.NoError(s.t, insertTx.Commit(context.Background())) + e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -1525,26 +1518,26 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) insertTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) // since we delete stuff, create another table to compare with _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) - require.NoError(s.t, insertTx.Commit(context.Background())) + e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -1611,15 +1604,15 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) @@ -1680,7 +1673,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO e2e_test_%s."%s"("highGold","eVe") VALUES ($1, $2) `, s.pgSuffix, "testMixedCase"), testKey, testValue) - require.NoError(s.t, err) + e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 20 rows into the source table") }() diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4f011a8e1b..58f95c5e40 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "os" + "runtime" "strings" "testing" "time" @@ -60,6 +61,31 @@ func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnv env.RegisterActivity(&activities.SnapshotActivity{}) } +// Helper function to assert errors in go routines running concurrent to workflows +// This achieves two goals: +// 1. cancel workflow to avoid waiting on goroutine which has failed +// 2. get around t.FailNow being incorrect when called from non initial goroutine +func EnvNoError(t *testing.T, env *testsuite.TestWorkflowEnvironment, err error) { + t.Helper() + + if err != nil { + t.Error(err.Error()) + env.CancelWorkflow() + runtime.Goexit() + } +} + +// See EnvNoError +func EnvEqual[T comparable](t *testing.T, env *testsuite.TestWorkflowEnvironment, x T, y T) { + t.Helper() + + if x != y { + t.Error("not equal", x, y) + env.CancelWorkflow() + runtime.Goexit() + } +} + func GetPgRows(pool *pgxpool.Pool, suffix string, tableName string, cols string) (*model.QRecordBatch, error) { pgQueryExecutor := connpostgres.NewQRepQueryExecutor(pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true)