Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compareTableContents use for goroutines #979

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand All @@ -28,6 +29,25 @@ type PeerFlowE2ETestSuiteBQ struct {
bqHelper *BigQueryTestHelper
}

func (s PeerFlowE2ETestSuiteBQ) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteBQ) Pool() *pgxpool.Pool {
return s.pool
}

func (s PeerFlowE2ETestSuiteBQ) Suffix() string {
return s.bqSuffix
}

func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) {
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
}

func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) {
err := e2e.TearDownPostgres(s.pool, s.bqSuffix)
Expand Down Expand Up @@ -404,7 +424,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -467,7 +487,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
<-done
}
Expand Down Expand Up @@ -542,7 +562,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -609,7 +629,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,k")
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -676,7 +696,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,t1,t2,k")
e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k")
env.AssertExpectations(s.t)
}

Expand Down Expand Up @@ -952,7 +972,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// verify we got our first row.
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1")
e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1")

// alter source table, add column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -966,7 +986,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 4)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1,c2")
e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2")

// alter source table, add column c3, drop column c2 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -980,7 +1000,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 6)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1,c3")
e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3")

// alter source table, drop column c3 and insert another row.
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand All @@ -994,7 +1014,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {

// verify we got our two rows, if schema did not match up it will error.
e2e.NormalizeFlowCountQuery(env, connectionGen, 8)
s.compareTableContentsBQ("test_simple_schema_changes", "id,c1")
e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1")
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
Expand Down Expand Up @@ -1059,7 +1079,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {

// verify we got our 10 rows
e2e.NormalizeFlowCountQuery(env, connectionGen, 2)
s.compareTableContentsBQ(dstTableName, "id,c1,c2,t")
e2e.EnvEqualTables(env, s, dstTableName, "id,c1,c2,t")

_, err := s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
Expand All @@ -1077,7 +1097,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
// allow only continue as new error
require.Contains(s.t, err.Error(), "continue as new")

s.compareTableContentsBQ(dstTableName, "id,c1,c2,t")
e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t")

env.AssertExpectations(s.t)
}
Expand Down Expand Up @@ -1154,7 +1174,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2")
e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t,t2")

env.AssertExpectations(s.t)
}
Expand Down Expand Up @@ -1227,7 +1247,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2")
e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t,t2")

env.AssertExpectations(s.t)
}
Expand Down Expand Up @@ -1440,7 +1460,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
wg.Wait()

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel", "id,c1,c2,t")
e2e.RequireEqualTables(s, "test_softdel", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
Expand Down Expand Up @@ -1524,7 +1544,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_iud", "id,c1,c2,t")
e2e.RequireEqualTables(s, "test_softdel_iud", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
Expand Down Expand Up @@ -1612,7 +1632,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_ud", "id,c1,c2,t")
e2e.RequireEqualTables(s, "test_softdel_ud", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
Expand Down Expand Up @@ -1688,7 +1708,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
require.Contains(s.t, err.Error(), "continue as new")

// verify our updates and delete happened
s.compareTableContentsBQ("test_softdel_iad", "id,c1,c2,t")
e2e.RequireEqualTables(s, "test_softdel_iad", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`,
Expand Down
16 changes: 1 addition & 15 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,6 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int)
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) {
pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, tableName, colsString)
require.NoError(s.t, err)

// read rows from destination table
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName)
s.t.Logf("running query on bigquery: %s", bqSelQuery)
bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery)
require.NoError(s.t, err)

e2e.RequireEqualRecordBatches(s.t, pgRows, bqRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
env := e2e.NewTemporalTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(s.t, env)
Expand Down Expand Up @@ -57,7 +43,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
err = env.GetWorkflowError()
require.NoError(s.t, err)

s.compareTableContentsBQ(tblName, "*")
e2e.RequireEqualTables(s, tblName, "*")

env.AssertExpectations(s.t)
}
Expand Down
Loading
Loading