Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: avoid require in non-initial go routines #977

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
stream = model.NewQRecordStream(bufferSize)
wg.Add(1)

pullPgRecords := func() {
go func() {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(config, partition, stream)
numRecords := int64(tmp)
Expand All @@ -610,9 +610,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
}
wg.Done()
}

go pullPgRecords()
}()
} else {
recordBatch, err := srcConn.PullQRepRecords(config, partition)
if err != nil {
Expand Down
105 changes: 52 additions & 53 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")
}()
Expand Down Expand Up @@ -391,7 +391,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
UPDATE %s SET t1='dummy' WHERE id=2;
END;
`, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
}()

Expand Down Expand Up @@ -453,7 +453,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
UPDATE %s SET t1='dummy' WHERE id=2;
END;
`, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
done <- struct{}{}
}()
Expand Down Expand Up @@ -529,7 +529,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
END;
`, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName,
srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
}()

Expand Down Expand Up @@ -596,7 +596,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
UPDATE %s SET k=4 WHERE id=1;
END;
`, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
}()

Expand Down Expand Up @@ -663,7 +663,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
UPDATE %s SET t2='dummy' WHERE id=1;
END;
`, srcTableName, srcTableName, srcTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed a transaction touching toast columns")
}()

Expand Down Expand Up @@ -732,7 +732,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'];
`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -809,7 +809,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
"5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+
"c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf",
)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 4 invalid geography rows into the source table")
for i := 4; i < 10; i++ {
Expand All @@ -819,7 +819,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
"010300000001000000050000000000000000000000000000000000000000000000"+
"00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+
"00f03f000000000000000000000000000000000000000000000000")
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 6 valid geography rows and 10 total rows into source")
}()
Expand Down Expand Up @@ -887,7 +887,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
INSERT INTO %s (c1,c2) VALUES (1,'dummy_1');
INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1');
`, srcTable1Name, srcTable2Name))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed an insert on two tables")
}()

Expand Down Expand Up @@ -947,7 +947,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted initial row in the source table")

// verify we got our first row.
Expand All @@ -957,11 +957,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// alter source table, add column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, added column c2")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c2 in the source table")

// verify we got our two rows, if schema did not match up it will error.
Expand All @@ -971,11 +971,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// alter source table, add column c3, drop column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, dropped column c2 and added column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c3 in the source table")

// verify we got our two rows, if schema did not match up it will error.
Expand All @@ -985,11 +985,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// alter source table, drop column c3 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s DROP COLUMN c3`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, dropped column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row after dropping all columns in the source table")

// verify we got our two rows, if schema did not match up it will error.
Expand Down Expand Up @@ -1053,7 +1053,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t) VALUES ($1,$2)
`, srcTableName), i, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")

Expand All @@ -1063,9 +1063,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {

_, err := s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1122,26 +1122,26 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
go func() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
rowsTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
`, srcTableName), i, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")

_, err = rowsTx.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

err = rowsTx.Commit(context.Background())
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1205,16 +1205,16 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
`, srcTableName), i, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 10 rows into the source table")

e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1271,13 +1271,13 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(key, value) VALUES ($1, $2)
`, srcTableName), testKey, testValue)
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

// delete that row
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1
`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1339,7 +1339,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
INSERT INTO %s (c1,c2) VALUES (1,'dummy_1');
INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1');
`, srcTable1Name, srcTable2Name))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
s.t.Log("Executed an insert on two tables")
}()

Expand Down Expand Up @@ -1412,25 +1412,24 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
// in a separate goroutine, wait for PeerFlowStatusQuery to finish setup
// and then insert, update and delete rows in the table.
go func() {
defer wg.Done()
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
// since we delete stuff, create another table to compare with
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)

wg.Done()
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
Expand Down Expand Up @@ -1500,23 +1499,23 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
e2e.SetupCDCFlowStatusQuery(env, connectionGen)

insertTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

require.NoError(s.t, insertTx.Commit(context.Background()))
e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
Expand Down Expand Up @@ -1585,26 +1584,26 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)

insertTx, err := s.pool.Begin(context.Background())
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
// since we delete stuff, create another table to compare with
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
_, err = insertTx.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)

require.NoError(s.t, insertTx.Commit(context.Background()))
e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
Expand Down Expand Up @@ -1672,15 +1671,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {

_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 1)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
DELETE FROM %s WHERE id=1`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName))
require.NoError(s.t, err)
e2e.EnvNoError(s.t, env, err)
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
Expand Down
Loading