diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index bfbec22431..c18c14d518 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -923,7 +923,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.RegisterWorkflowsAndActivities(s.t, env) tableName := "test_simple_schema_changes" - srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") + srcTableName := s.attachSchemaSuffix(tableName) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -934,7 +934,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: s.attachSuffix("test_simple_schema_changes"), + FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: tableName}, PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, @@ -955,11 +955,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // insert first row. e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) + INSERT INTO %s(c1) VALUES (1)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted initial row in the source table") - e2e.EnvWaitForEqualTables(env, s, "normalize reinsert", tableName, "id,c1") + e2e.EnvWaitForEqualTables(env, s, "normalize insert", tableName, "id,c1") // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -967,7 +967,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) + INSERT INTO %s(c1,c2) VALUES (2,2)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c2 in the source table") @@ -980,7 +980,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) + INSERT INTO %s(c1,c3) VALUES (3,3)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row with added c3 in the source table") @@ -993,7 +993,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.EnvNoError(s.t, env, err) s.t.Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) + INSERT INTO %s(c1) VALUES (4)`, srcTableName)) e2e.EnvNoError(s.t, env, err) s.t.Log("Inserted row after dropping all columns in the source table") diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index c9a5eca325..84ab661e1b 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -111,8 +111,6 @@ func CheckEqualRecordBatches(t *testing.T, q *model.QRecordBatch, other *model.Q for i, record := range q.Records { if !CheckQRecordEquality(t, record, other.Records[i]) { t.Logf("Record %d is not equal", i) - t.Logf("Record 1: %v", record) - t.Logf("Record 2: %v", other.Records[i]) return false } }