Skip to content

Commit

Permalink
Fix compareTableContents use for goroutines
Browse files Browse the repository at this point in the history
Split the two use cases into two functions,
e2e.RequireEqualTables & e2e.EnvEqualTables

Which uses interfaces to share code between
  • Loading branch information
serprex committed Jan 3, 2024
1 parent ca4321e commit 3070d6d
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 65 deletions.
54 changes: 37 additions & 17 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand All @@ -28,6 +29,25 @@ type PeerFlowE2ETestSuiteBQ struct {
bqHelper *BigQueryTestHelper
}

func (s PeerFlowE2ETestSuiteBQ) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteBQ) Pool() *pgxpool.Pool {
return s.pool
}

func (s PeerFlowE2ETestSuiteBQ) Suffix() string {
return s.bqSuffix
}

func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) {
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) {
err := e2e.TearDownPostgres(s.pool, s.bqSuffix)
Expand Down Expand Up @@ -404,7 +424,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -467,7 +487,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
<-done
}
Expand Down Expand Up @@ -542,7 +562,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -609,7 +629,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,k")
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -676,7 +696,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -952,7 +972,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// verify we got our first row.
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1")
e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1")

// alter source table, add column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -966,7 +986,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1,c2")
e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2")

// alter source table, add column c3, drop column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -980,7 +1000,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 6)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1,c3")
e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3")

// alter source table, drop column c3 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -994,7 +1014,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 8)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1")
e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1059,7 +1079,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {

// verify we got our 10 rows
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
s.compareTableContentsBQ(dstTableName, "id,c1,c2,t")
e2e.EnvEqualTables(env, s, dstTableName, "id,c1,c2,t")

_, err := s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
Expand All @@ -1077,7 +1097,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,c1,c2,t")
e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t")

env.AssertExpectations(s.t)
}
Expand Down Expand Up @@ -1154,7 +1174,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2")
e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t,t2")

env.AssertExpectations(s.t)
}
Expand Down Expand Up @@ -1227,7 +1247,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2")
e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t,t2")

env.AssertExpectations(s.t)
}
Expand Down Expand Up @@ -1440,7 +1460,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
wg.Wait()

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel", "id,c1,c2,t")
e2e.RequireEqualTables(s, "test_softdel", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
Expand Down Expand Up @@ -1524,7 +1544,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_iud", "id,c1,c2,t")
e2e.RequireEqualTables(s, "test_softdel_iud", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
Expand Down Expand Up @@ -1612,7 +1632,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_ud", "id,c1,c2,t")
e2e.RequireEqualTables(s, "test_softdel_ud", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
Expand Down Expand Up @@ -1688,7 +1708,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_iad", "id,c1,c2,t")
e2e.RequireEqualTables(s, "test_softdel_iad", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
Expand Down
16 changes: 1 addition & 15 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,6 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int)
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) {
pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, tableName, colsString)
require.NoError(s.t, err)

// read rows from destination table
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
require.NoError(s.t, err)

e2e.RequireEqualRecordBatches(s.t, pgRows, bqRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down Expand Up @@ -57,7 +43,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
err = env.GetWorkflowError()
require.NoError(s.t, err)

s.compareTableContentsBQ(tblName, "*")
e2e.RequireEqualTables(s, tblName, "*")

env.AssertExpectations(s.t)
}
Expand Down
Loading

0 comments on commit 3070d6d

Please sign in to comment.