From 3425e3fe757e59c556943de4714b94548a5ed121 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 3 Jan 2024 21:19:46 +0000 Subject: [PATCH] Fix compareTableContents use for goroutines (#979) Split the two use cases into two functions, e2e.RequireEqualTables & e2e.EnvEqualTables Which uses interfaces to share code between Followup for #977 --- flow/e2e/bigquery/peer_flow_bq_test.go | 54 +++++++++++++++++-------- flow/e2e/bigquery/qrep_flow_bq_test.go | 16 +------- flow/e2e/snowflake/peer_flow_sf_test.go | 54 +++++++++++++++++-------- flow/e2e/snowflake/qrep_flow_sf_test.go | 15 +------ flow/e2e/test_utils.go | 44 +++++++++++++++++++- flow/e2eshared/e2eshared.go | 8 ++++ 6 files changed, 127 insertions(+), 64 deletions(-) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 4fb20929b3..601172faa4 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -28,6 +29,25 @@ type PeerFlowE2ETestSuiteBQ struct { bqHelper *BigQueryTestHelper } +func (s PeerFlowE2ETestSuiteBQ) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteBQ) Pool() *pgxpool.Pool { + return s.pool +} + +func (s PeerFlowE2ETestSuiteBQ) Suffix() string { + return s.bqSuffix +} + +func (s PeerFlowE2ETestSuiteBQ) GetRows(tableName string, colsString string) (*model.QRecordBatch, error) { + qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) + bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) + s.t.Logf("running query on bigquery: %s", bqSelQuery) + return s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) +} + func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { e2eshared.RunSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteBQ) { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) @@ -404,7 +424,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") + e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k") env.AssertExpectations(s.t) } @@ -467,7 +487,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") + e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k") env.AssertExpectations(s.t) <-done } @@ -542,7 +562,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") + e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k") env.AssertExpectations(s.t) } @@ -609,7 +629,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsBQ(dstTableName, "id,t1,k") + e2e.RequireEqualTables(s, dstTableName, "id,t1,k") env.AssertExpectations(s.t) } @@ -676,7 +696,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") + e2e.RequireEqualTables(s, dstTableName, "id,t1,t2,k") env.AssertExpectations(s.t) } @@ -952,7 +972,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - s.compareTableContentsBQ("test_simple_schema_changes", "id,c1") + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -966,7 +986,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) - s.compareTableContentsBQ("test_simple_schema_changes", "id,c1,c2") + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -980,7 +1000,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) - s.compareTableContentsBQ("test_simple_schema_changes", "id,c1,c3") + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -994,7 +1014,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) - s.compareTableContentsBQ("test_simple_schema_changes", "id,c1") + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1059,7 +1079,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - s.compareTableContentsBQ(dstTableName, "id,c1,c2,t") + e2e.EnvEqualTables(env, s, dstTableName, "id,c1,c2,t") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) @@ -1077,7 +1097,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsBQ(dstTableName, "id,c1,c2,t") + e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t") env.AssertExpectations(s.t) } @@ -1154,7 +1174,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2") + e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t,t2") env.AssertExpectations(s.t) } @@ -1227,7 +1247,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2") + e2e.RequireEqualTables(s, dstTableName, "id,c1,c2,t,t2") env.AssertExpectations(s.t) } @@ -1440,7 +1460,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { wg.Wait() // verify our updates and delete happened - s.compareTableContentsBQ("test_softdel", "id,c1,c2,t") + e2e.RequireEqualTables(s, "test_softdel", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, @@ -1524,7 +1544,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsBQ("test_softdel_iud", "id,c1,c2,t") + e2e.RequireEqualTables(s, "test_softdel_iud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, @@ -1612,7 +1632,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsBQ("test_softdel_ud", "id,c1,c2,t") + e2e.RequireEqualTables(s, "test_softdel_ud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, @@ -1688,7 +1708,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsBQ("test_softdel_iad", "id,c1,c2,t") + e2e.RequireEqualTables(s, "test_softdel_iad", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED = TRUE`, diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index ddfd8b6373..24d845c500 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -14,20 +14,6 @@ func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) require.NoError(s.t, err) } -func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { - pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, tableName, colsString) - require.NoError(s.t, err) - - // read rows from destination table - qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) - s.t.Logf("running query on bigquery: %s", bqSelQuery) - bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) - require.NoError(s.t, err) - - e2e.RequireEqualRecordBatches(s.t, pgRows, bqRows) -} - func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) @@ -57,7 +43,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { err = env.GetWorkflowError() require.NoError(s.t, err) - s.compareTableContentsBQ(tblName, "*") + e2e.RequireEqualTables(s, tblName, "*") env.AssertExpectations(s.t) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 818bf56d21..8314032bea 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -13,6 +13,7 @@ import ( "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -30,6 +31,25 @@ type PeerFlowE2ETestSuiteSF struct { connector *connsnowflake.SnowflakeConnector } +func (s PeerFlowE2ETestSuiteSF) T() *testing.T { + return s.t +} + +func (s PeerFlowE2ETestSuiteSF) Pool() *pgxpool.Pool { + return s.pool +} + +func (s PeerFlowE2ETestSuiteSF) Suffix() string { + return s.pgSuffix +} + +func (s PeerFlowE2ETestSuiteSF) GetRows(tableName string, sfSelector string) (*model.QRecordBatch, error) { + qualifiedTableName := fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) + sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) + s.t.Logf("running query on snowflake: %s", sfSelQuery) + return s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) +} + func TestPeerFlowE2ETestSuiteSF(t *testing.T) { e2eshared.RunSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) @@ -391,7 +411,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_1", `id,t1,t2,k`) + e2e.RequireEqualTables(s, "test_toast_sf_1", `id,t1,t2,k`) env.AssertExpectations(s.t) } @@ -453,7 +473,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_2", `id,t1,t2,k`) + e2e.RequireEqualTables(s, "test_toast_sf_2", `id,t1,t2,k`) env.AssertExpectations(s.t) wg.Wait() } @@ -527,7 +547,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_3", `id,t1,t2,k`) + e2e.RequireEqualTables(s, "test_toast_sf_3", `id,t1,t2,k`) env.AssertExpectations(s.t) } @@ -593,7 +613,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_4", `id,t1,k`) + e2e.RequireEqualTables(s, "test_toast_sf_4", `id,t1,k`) env.AssertExpectations(s.t) } @@ -659,7 +679,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_5", `id,t1,t2,k`) + e2e.RequireEqualTables(s, "test_toast_sf_5", `id,t1,t2,k`) env.AssertExpectations(s.t) } @@ -868,7 +888,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { }) e2e.EnvNoError(s.t, env, err) e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1") + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -904,7 +924,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { }) e2e.EnvNoError(s.t, env, err) e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2") + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -942,7 +962,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { }) e2e.EnvNoError(s.t, env, err) e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3") + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -980,7 +1000,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { }) e2e.EnvNoError(s.t, env, err) e2e.EnvEqual(s.t, env, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1") + e2e.EnvEqualTables(env, s, "test_simple_schema_changes", "id,c1") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1044,7 +1064,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t") + e2e.EnvEqualTables(env, s, "test_simple_cpkey", "id,c1,c2,t") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) @@ -1063,7 +1083,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t") + e2e.RequireEqualTables(s, "test_simple_cpkey", "id,c1,c2,t") env.AssertExpectations(s.t) } @@ -1139,7 +1159,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_cpkey_toast1", "id,c1,c2,t,t2") + e2e.RequireEqualTables(s, "test_cpkey_toast1", "id,c1,c2,t,t2") env.AssertExpectations(s.t) } @@ -1211,7 +1231,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_cpkey_toast2", "id,c1,c2,t,t2") + e2e.RequireEqualTables(s, "test_cpkey_toast2", "id,c1,c2,t,t2") env.AssertExpectations(s.t) } @@ -1376,7 +1396,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { wg.Wait() // verify our updates and delete happened - s.compareTableContentsSF("test_softdel", "id,c1,c2,t") + e2e.RequireEqualTables(s, "test_softdel", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) @@ -1459,7 +1479,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_softdel_iud", "id,c1,c2,t") + e2e.RequireEqualTables(s, "test_softdel_iud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) @@ -1546,7 +1566,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_softdel_ud", "id,c1,c2,t") + e2e.RequireEqualTables(s, "test_softdel_ud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) @@ -1621,7 +1641,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_softdel_iad", "id,c1,c2,t") + e2e.RequireEqualTables(s, "test_softdel_iad", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index e48d4d62a6..3c032e35f3 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -32,27 +32,16 @@ func (s PeerFlowE2ETestSuiteSF) checkJSONValue(tableName, colName, fieldName, va return nil } -func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName, selector string) { - s.compareTableContentsWithDiffSelectorsSF(tableName, selector, selector, false) -} - func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableName, pgSelector, sfSelector string, tableCaseSensitive bool, ) { pgRows, err := e2e.GetPgRows(s.pool, s.pgSuffix, tableName, pgSelector) require.NoError(s.t, err) - // read rows from destination table - var qualifiedTableName string if tableCaseSensitive { - qualifiedTableName = fmt.Sprintf(`%s.%s."%s"`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) - } else { - qualifiedTableName = fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) + tableName = fmt.Sprintf("\"%s\"", tableName) } - - sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) - s.t.Logf("running query on snowflake: %s", sfSelQuery) - sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) + sfRows, err := s.GetRows(tableName, sfSelector) require.NoError(s.t, err) e2e.RequireEqualRecordBatches(s.t, pgRows, sfRows) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 58f95c5e40..99903fbdfa 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -86,15 +86,45 @@ func EnvEqual[T comparable](t *testing.T, env *testsuite.TestWorkflowEnvironment } } -func GetPgRows(pool *pgxpool.Pool, suffix string, tableName string, cols string) (*model.QRecordBatch, error) { +func GetPgRows(pool *pgxpool.Pool, suffix string, table string, cols string) (*model.QRecordBatch, error) { pgQueryExecutor := connpostgres.NewQRepQueryExecutor(pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) return pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf(`SELECT %s FROM e2e_test_%s."%s" ORDER BY id`, cols, suffix, tableName), + fmt.Sprintf(`SELECT %s FROM e2e_test_%s."%s" ORDER BY id`, cols, suffix, table), ) } +func RequireEqualTables(suite e2eshared.RowSource, table string, cols string) { + t := suite.T() + t.Helper() + + suffix := suite.Suffix() + pool := suite.Pool() + pgRows, err := GetPgRows(pool, suffix, table, cols) + require.NoError(t, err) + + rows, err := suite.GetRows(table, cols) + require.NoError(t, err) + + require.True(t, e2eshared.CheckEqualRecordBatches(t, pgRows, rows)) +} + +func EnvEqualTables(env *testsuite.TestWorkflowEnvironment, suite e2eshared.RowSource, table string, cols string) { + t := suite.T() + t.Helper() + + suffix := suite.Suffix() + pool := suite.Pool() + pgRows, err := GetPgRows(pool, suffix, table, cols) + EnvNoError(t, env, err) + + rows, err := suite.GetRows(table, cols) + EnvNoError(t, env, err) + + EnvEqualRecordBatches(t, env, pgRows, rows) +} + func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, connectionGen FlowConnectionGenerationConfig, ) { @@ -475,3 +505,13 @@ func RequireEqualRecordBatches(t *testing.T, q *model.QRecordBatch, other *model t.Helper() require.True(t, e2eshared.CheckEqualRecordBatches(t, q, other)) } + +// See EnvNoError +func EnvEqualRecordBatches(t *testing.T, env *testsuite.TestWorkflowEnvironment, q *model.QRecordBatch, other *model.QRecordBatch) { + t.Helper() + + if !e2eshared.CheckEqualRecordBatches(t, q, other) { + env.CancelWorkflow() + runtime.Goexit() + } +} diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 59b2087de4..c9a5eca325 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -9,8 +9,16 @@ import ( "testing" "github.com/PeerDB-io/peer-flow/model" + "github.com/jackc/pgx/v5/pgxpool" ) +type RowSource interface { + T() *testing.T + Pool() *pgxpool.Pool + Suffix() string + GetRows(table, cols string) (*model.QRecordBatch, error) +} + func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) { t.Helper() t.Parallel()