Skip to content

Commit

Permalink
reduce noise from waitfor
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 6, 2024
1 parent 4af0d01 commit aea6c65
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
14 changes: 7 additions & 7 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
e2e.RegisterWorkflowsAndActivities(s.t, env)

tableName := "test_simple_schema_changes"
srcTableName := s.attachSchemaSuffix("test_simple_schema_changes")
srcTableName := s.attachSchemaSuffix(tableName)

_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
Expand All @@ -934,7 +934,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_simple_schema_changes"),
FlowJobName: s.attachSuffix(tableName),
TableNameMapping: map[string]string{srcTableName: tableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
Expand All @@ -955,19 +955,19 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
// insert first row.
e2e.SetupCDCFlowStatusQuery(env, connectionGen)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1)
INSERT INTO %s(c1) VALUES (1)`, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted initial row in the source table")

e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", tableName, "id,c1")
e2e.EnvWaitForEqualTables(env, s, "normalize insert", tableName, "id,c1")

// alter source table, add column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, added column c2")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2)
INSERT INTO %s(c1,c2) VALUES (2,2)`, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c2 in the source table")

Expand All @@ -980,7 +980,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, dropped column c2 and added column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3)
INSERT INTO %s(c1,c3) VALUES (3,3)`, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row with added c3 in the source table")

Expand All @@ -993,7 +993,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
e2e.EnvNoError(s.t, env, err)
s.t.Log("Altered source table, dropped column c3")
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4)
INSERT INTO %s(c1) VALUES (4)`, srcTableName))
e2e.EnvNoError(s.t, env, err)
s.t.Log("Inserted row after dropping all columns in the source table")

Expand Down
2 changes: 0 additions & 2 deletions flow/e2eshared/e2eshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ func CheckEqualRecordBatches(t *testing.T, q *model.QRecordBatch, other *model.Q
for i, record := range q.Records {
if !CheckQRecordEquality(t, record, other.Records[i]) {
t.Logf("Record %d is not equal", i)
t.Logf("Record 1: %v", record)
t.Logf("Record 2: %v", other.Records[i])
return false
}
}
Expand Down

0 comments on commit aea6c65

Please sign in to comment.