Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deterministic verification of Postgres tests #395

Merged
merged 2 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
s.Error(err)
s.Contains(err.Error(), "continue as new")

err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,key,value")
s.NoError(err)

env.AssertExpectations(s.T())
Expand Down Expand Up @@ -121,7 +121,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

// verify we got our first row.
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
s.NoError(err)

// alter source table, add column c2 and insert another row.
Expand All @@ -136,7 +136,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
s.NoError(err)

// alter source table, add column c3, drop column c2 and insert another row.
Expand All @@ -151,7 +151,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 6)
err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
s.NoError(err)

// alter source table, drop column c3 and insert another row.
Expand All @@ -166,7 +166,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 8)
err = s.comparePGTables(srcTableName, dstTableName)
err = s.comparePGTables(srcTableName, dstTableName, "id,c1")
s.NoError(err)
}()

Expand Down
13 changes: 7 additions & 6 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int
s.NoError(err)
}

func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified string) error {
func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error {
// Execute the two EXCEPT queries
for {
err := s.compareQuery(srcSchemaQualified, dstSchemaQualified)
err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector)
// while testing, the prepared plan might break due to schema changes
// solution is to retry, prepared statement should be evicted upon the first error
if err != nil && !strings.Contains(err.Error(), "cached plan must not change result type") {
Expand All @@ -78,7 +78,7 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu
}

for {
err := s.compareQuery(dstSchemaQualified, srcSchemaQualified)
err := s.compareQuery(dstSchemaQualified, srcSchemaQualified, selector)
// while testing, the prepared plan might break due to schema changes
// solution is to retry, prepared statement should be evicted upon the first error
if err != nil && !strings.Contains(err.Error(), "cached plan must not change result type") {
Expand All @@ -93,8 +93,9 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu
return nil
}

func (s *PeerFlowE2ETestSuitePG) compareQuery(schema1, schema2 string) error {
query := fmt.Sprintf("SELECT * FROM %s EXCEPT SELECT * FROM %s", schema1, schema2)
func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error {
query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified,
selector, dstSchemaQualified)
rows, _ := s.pool.Query(context.Background(), query)
rowsPresent := false

Expand Down Expand Up @@ -163,7 +164,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
err = env.GetWorkflowError()
s.NoError(err)

err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified)
err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*")
if err != nil {
s.FailNow(err.Error())
}
Expand Down
Loading