Skip to content

Commit

Permalink
deterministic verification of Postgres tests (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Sep 18, 2023
1 parent d2bb6ae commit 54f8948
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
10 changes: 5 additions & 5 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
s.Error(err)
s.Contains(err.Error(), "continue as new")

err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,key,value")
s.NoError(err)

env.AssertExpectations(s.T())
Expand Down Expand Up @@ -121,7 +121,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

// verify we got our first row.
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
s.NoError(err)

// alter source table, add column c2 and insert another row.
Expand All @@ -136,7 +136,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
s.NoError(err)

// alter source table, add column c3, drop column c2 and insert another row.
Expand All @@ -151,7 +151,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 6)
err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
s.NoError(err)

// alter source table, drop column c3 and insert another row.
Expand All @@ -166,7 +166,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 8)
err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
s.NoError(err)
}()

Expand Down
13 changes: 7 additions & 6 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int
s.NoError(err)
}

func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified string) error {
func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error {
// Execute the two EXCEPT queries
for {
err := s.compareQuery(srcSchemaQualified, dstSchemaQualified)
err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector)
// while testing, the prepared plan might break due to schema changes
// solution is to retry, prepared statement should be evicted upon the first error
if err != nil && !strings.Contains(err.Error(), "cached plan must not change result type") {
Expand All @@ -78,7 +78,7 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu
}

for {
err := s.compareQuery(dstSchemaQualified, srcSchemaQualified)
err := s.compareQuery(dstSchemaQualified, srcSchemaQualified, selector)
// while testing, the prepared plan might break due to schema changes
// solution is to retry, prepared statement should be evicted upon the first error
if err != nil && !strings.Contains(err.Error(), "cached plan must not change result type") {
Expand All @@ -93,8 +93,9 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu
return nil
}

func (s *PeerFlowE2ETestSuitePG) compareQuery(schema1, schema2 string) error {
query := fmt.Sprintf("SELECT * FROM %s EXCEPT SELECT * FROM %s", schema1, schema2)
func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error {
query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified,
selector, dstSchemaQualified)
rows, _ := s.pool.Query(context.Background(), query)
rowsPresent := false

Expand Down Expand Up @@ -163,7 +164,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
err = env.GetWorkflowError()
s.NoError(err)

err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified)
err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*")
if err != nil {
s.FailNow(err.Error())
}
Expand Down

0 comments on commit 54f8948

Please sign in to comment.