Skip to content

Commit

Permalink
tests: avoid require in non-initial go routines
Browse files Browse the repository at this point in the history
docs:
> FailNow must be called from the goroutine running the test or benchmark function,
> not from other goroutines created during the test

This would cause workflow to hang if it was waiting on goroutine,
so introduce helper functions which:
1. Check assertion
2. Log failure & mark test failed
3. Cancel workflow
4. Abort goroutine

A followup PR will fix calls to compareTableContents after #967 merges too
  • Loading branch information
serprex committed Jan 3, 2024
1 parent 4bc7879 commit b4fabdb
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 186 deletions.
6 changes: 2 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
stream = model.NewQRecordStream(bufferSize)
wg.Add(1)

pullPgRecords := func() {
go func() {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(config, partition, stream)
numRecords := int64(tmp)
Expand All @@ -610,9 +610,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
}
wg.Done()
}

go pullPgRecords()
}()
} else {
recordBatch, err := srcConn.PullQRepRecords(config, partition)
if err != nil {
Expand Down
105 changes: 52 additions & 53 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")
}()
Expand Down Expand Up @@ -391,7 +391,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
UPDATE %s SET t1='dummy' WHERE id=2;
END;
`, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
}()

Expand Down Expand Up @@ -453,7 +453,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
UPDATE %s SET t1='dummy' WHERE id=2;
END;
`, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
done <- struct{}{}
}()
Expand Down Expand Up @@ -529,7 +529,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
END;
`, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName,
srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
}()

Expand Down Expand Up @@ -596,7 +596,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
UPDATE %s SET k=4 WHERE id=1;
END;
`, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
}()

Expand Down Expand Up @@ -663,7 +663,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
UPDATE %s SET t2='dummy' WHERE id=1;
END;
`, srcTableName, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
}()

Expand Down Expand Up @@ -732,7 +732,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'];
`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -809,7 +809,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
"5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+
"c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf",
)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 4 invalid geography rows into the source table")
for i := 4; i < 10; i++ {
Expand All @@ -819,7 +819,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
"010300000001000000050000000000000000000000000000000000000000000000"+
"00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+
"00f03f000000000000000000000000000000000000000000000000")
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 6 valid geography rows and 10 total rows into source")
}()
Expand Down Expand Up @@ -887,7 +887,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
INSERT INTO %s (c1,c2) VALUES (1,'dummy_1');
INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1');
`, srcTable1Name, srcTable2Name))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed an insert on two tables")
}()

Expand Down Expand Up @@ -947,7 +947,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted initial row in the source table")

// verify we got our first row.
Expand All @@ -957,11 +957,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// alter source table, add column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, added column c2")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c2 in the source table")

// verify we got our two rows, if schema did not match up it will error.
Expand All @@ -971,11 +971,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// alter source table, add column c3, drop column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, dropped column c2 and added column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c3 in the source table")

// verify we got our two rows, if schema did not match up it will error.
Expand All @@ -985,11 +985,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// alter source table, drop column c3 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c3`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, dropped column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row after dropping all columns in the source table")

// verify we got our two rows, if schema did not match up it will error.
Expand Down Expand Up @@ -1053,7 +1053,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t) VALUES ($1,$2)
`, srcTableName), i, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")

Expand All @@ -1063,9 +1063,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {

_, err := s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1122,26 +1122,26 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
rowsTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
`, srcTableName), i, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")

_, err = rowsTx.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

err = rowsTx.Commit(context.Background())
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1205,16 +1205,16 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
`, srcTableName), i, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")

e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1271,13 +1271,13 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

// delete that row
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1
`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1339,7 +1339,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
INSERT INTO %s (c1,c2) VALUES (1,'dummy_1');
INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1');
`, srcTable1Name, srcTable2Name))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed an insert on two tables")
}()

Expand Down Expand Up @@ -1412,25 +1412,24 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
defer wg.Done()
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
// since we delete stuff, create another table to compare with
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)

wg.Done()
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
Expand Down Expand Up @@ -1500,23 +1499,23 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

insertTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

require.NoError(s.t, insertTx.Commit(context.Background()))
e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
Expand Down Expand Up @@ -1585,26 +1584,26 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)

insertTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

require.NoError(s.t, insertTx.Commit(context.Background()))
e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
Expand Down Expand Up @@ -1672,15 +1671,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
Expand Down
Loading

0 comments on commit b4fabdb

Please sign in to comment.