diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index c49257fa5a..6a26e2e8c4 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -13,6 +13,7 @@ import ( "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" @@ -82,28 +83,11 @@ func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { // Execute the two EXCEPT queries - for { - err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector) - // while testing, the prepared plan might break due to schema changes - // solution is to retry, prepared statement should be evicted upon the first error - if err != nil && !strings.Contains(err.Error(), "cached plan must not change result type") { - return err - } - if err == nil { - break - } + if err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector); err != nil { + return err } - - for { - err := s.compareQuery(dstSchemaQualified, srcSchemaQualified, selector) - // while testing, the prepared plan might break due to schema changes - // solution is to retry, prepared statement should be evicted upon the first error - if err != nil && !strings.Contains(err.Error(), "cached plan must not change result type") { - return err - } - if err == nil { - break - } + if err := s.compareQuery(dstSchemaQualified, srcSchemaQualified, selector); err != nil { + return err } // If no error is returned, then the contents of the two tables are the same @@ -131,7 +115,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) - rows, err := s.pool.Query(context.Background(), query) + rows, err := s.pool.Query(context.Background(), query, pgx.QueryExecModeExec) if err != nil { return err }