diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 8ee321ee95..6198f605e5 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -73,7 +73,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { s.Error(err) s.Contains(err.Error(), "continue as new") - err = s.comparePGTables(srcTableName, dstTableName) + err = s.comparePGTables(srcTableName, dstTableName, "id,key,value") s.NoError(err) env.AssertExpectations(s.T()) @@ -121,7 +121,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - err = s.comparePGTables(srcTableName, dstTableName) + err = s.comparePGTables(srcTableName, dstTableName, "id,c1") s.NoError(err) // alter source table, add column c2 and insert another row. @@ -136,7 +136,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) - err = s.comparePGTables(srcTableName, dstTableName) + err = s.comparePGTables(srcTableName, dstTableName, "id,c1") s.NoError(err) // alter source table, add column c3, drop column c2 and insert another row. @@ -151,7 +151,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) - err = s.comparePGTables(srcTableName, dstTableName) + err = s.comparePGTables(srcTableName, dstTableName, "id,c1") s.NoError(err) // alter source table, drop column c3 and insert another row. @@ -166,7 +166,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) - err = s.comparePGTables(srcTableName, dstTableName) + err = s.comparePGTables(srcTableName, dstTableName, "id,c1") s.NoError(err) }() diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 1f084409a2..0aa56ea924 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -63,10 +63,10 @@ func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int s.NoError(err) } -func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified string) error { +func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { // Execute the two EXCEPT queries for { - err := s.compareQuery(srcSchemaQualified, dstSchemaQualified) + err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector) // while testing, the prepared plan might break due to schema changes // solution is to retry, prepared statement should be evicted upon the first error if err != nil && !strings.Contains(err.Error(), "cached plan must not change result type") { @@ -78,7 +78,7 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu } for { - err := s.compareQuery(dstSchemaQualified, srcSchemaQualified) + err := s.compareQuery(dstSchemaQualified, srcSchemaQualified, selector) // while testing, the prepared plan might break due to schema changes // solution is to retry, prepared statement should be evicted upon the first error if err != nil && !strings.Contains(err.Error(), "cached plan must not change result type") { @@ -93,8 +93,9 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu return nil } -func (s *PeerFlowE2ETestSuitePG) compareQuery(schema1, schema2 string) error { - query := fmt.Sprintf("SELECT * FROM %s EXCEPT SELECT * FROM %s", schema1, schema2) +func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { + query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, + selector, dstSchemaQualified) rows, _ := s.pool.Query(context.Background(), query) rowsPresent := false @@ -163,7 +164,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { err = env.GetWorkflowError() s.NoError(err) - err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified) + err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*") if err != nil { s.FailNow(err.Error()) }