diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index ecc912f655..bc81f0497d 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -9,17 +9,18 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgtype" + "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", postgresSuffix, tableName) +func (s PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) } -func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, postgresSuffix) +func (s PeerFlowE2ETestSuitePG) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.suffix) } -func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { +func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { query := fmt.Sprintf(`SELECT "_PEERDB_IS_DELETED","_PEERDB_SYNCED_AT" FROM %s WHERE id = %d`, dstSchemaQualified, rowID) var isDeleted pgtype.Bool @@ -40,9 +41,9 @@ func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, r return nil } -func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow") dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") @@ -54,7 +55,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), @@ -64,7 +65,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -82,9 +83,9 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } - s.T().Log("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -94,18 +95,17 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,key,value") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") @@ -116,7 +116,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -126,7 +126,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -140,8 +140,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) - s.T().Log("Inserted initial row in the source table") + require.NoError(s.t, err) + s.t.Log("Inserted initial row in the source table") // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) @@ -154,20 +154,20 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - s.NoError(err) + require.NoError(s.t, err) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) - s.T().Log("Altered source table, added column c2") + require.NoError(s.t, err) + s.t.Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) - s.T().Log("Inserted row with added c2 in the source table") + require.NoError(s.t, err) + s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) @@ -184,20 +184,20 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") - s.NoError(err) + require.NoError(s.t, err) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) - s.T().Log("Altered source table, dropped column c2 and added column c3") + require.NoError(s.t, err) + s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) - s.T().Log("Inserted row with added c3 in the source table") + require.NoError(s.t, err) + s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) @@ -215,20 +215,20 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") - s.NoError(err) + require.NoError(s.t, err) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) - s.T().Log("Altered source table, dropped column c3") + require.NoError(s.t, err) + s.t.Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) - s.T().Log("Inserted row after dropping all columns in the source table") + require.NoError(s.t, err) + s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) @@ -246,10 +246,10 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -259,15 +259,14 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst") @@ -281,7 +280,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -291,7 +290,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -308,20 +307,20 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } - s.T().Log("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - s.NoError(err) + require.NoError(s.t, err) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -331,20 +330,20 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") + randomString := s.attachSchemaSuffix("random_string") dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -355,12 +354,12 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + );CREATE OR REPLACE FUNCTION %s( int ) RETURNS TEXT as $$ SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; - `, srcTableName)) - s.NoError(err) + `, srcTableName, randomString)) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -370,7 +369,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -382,26 +381,26 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(s.t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) - `, srcTableName), i, testValue) - s.NoError(err) + INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000)) + `, srcTableName, randomString), i, testValue) + require.NoError(s.t, err) } - s.T().Log("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -411,21 +410,21 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") + randomString := s.attachSchemaSuffix("random_string") dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -436,12 +435,12 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { t TEXT, t2 TEXT, PRIMARY KEY(id,t) - );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + );CREATE OR REPLACE FUNCTION %s( int ) RETURNS TEXT as $$ SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; - `, srcTableName)) - s.NoError(err) + `, srcTableName, randomString)) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -451,7 +450,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -467,18 +466,18 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { for i := 0; i < 10; i++ { testValue := fmt.Sprintf("test_value_%d", i) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) - `, srcTableName), i, testValue) - s.NoError(err) + INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000)) + `, srcTableName, randomString), i, testValue) + require.NoError(s.t, err) } - s.T().Log("Inserted 10 rows into the source table") + s.t.Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -488,19 +487,18 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") @@ -512,7 +510,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), @@ -523,7 +521,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -538,14 +536,14 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) // delete that row _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1 `, srcTableName)) - s.NoError(err) - s.T().Log("Inserted and deleted a row for peerdb column check") + require.NoError(s.t, err) + s.t.Log("Inserted and deleted a row for peerdb column check") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -555,9 +553,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") checkErr := s.checkPeerdbColumns(dstTableName, 1) - s.NoError(checkErr) - env.AssertExpectations(s.T()) + require.NoError(s.t, checkErr) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index d408451939..c54980d55c 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -9,31 +9,36 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/stretchr/testify/require" + "github.com/ysmood/got" ) -const postgresSuffix = "postgres" - type PeerFlowE2ETestSuitePG struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool peer *protos.Peer connector *connpostgres.PostgresConnector + suffix string } func TestPeerFlowE2ETestSuitePG(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuitePG)) + e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuitePG) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + require.Fail(s.t, "failed to drop Postgres schema", err) + } + }) } -// Implement SetupAllSuite interface to setup the test suite -func (s *PeerFlowE2ETestSuitePG) SetupSuite() { +func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuitePG { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -41,14 +46,14 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { slog.Info("Unable to load .env file, using default values from env") } - pool, err := e2e.SetupPostgres(postgresSuffix) - if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + suffix := "pg_" + strings.ToLower(shared.RandomString(8)) + pool, err := e2e.SetupPostgres(suffix) + if err != nil { + require.Fail(t, "failed to setup postgres", err) } - s.pool = pool - s.peer = generatePGPeer(e2e.GetTestPostgresConf()) - s.connector, err = connpostgres.NewPostgresConnector(context.Background(), + var connector *connpostgres.PostgresConnector + connector, err = connpostgres.NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, @@ -56,25 +61,26 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { Password: "postgres", Database: "postgres", }, false) - s.NoError(err) -} - -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuitePG) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, postgresSuffix) - if err != nil { - s.Fail("failed to drop Postgres schema", err) + require.NoError(t, err) + + return PeerFlowE2ETestSuitePG{ + G: g, + t: t, + pool: pool, + peer: generatePGPeer(e2e.GetTestPostgresConf()), + connector: connector, + suffix: suffix, } } -func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, postgresSuffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, postgresSuffix, tableName, rowCount) - s.NoError(err) +func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) + require.NoError(s.t, err) + err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { // Execute the two EXCEPT queries for { err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector) @@ -104,7 +110,7 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu return nil } -func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) rows, err := s.pool.Query(context.Background(), query) @@ -138,7 +144,7 @@ func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQuali return nil } -func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { +func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) rows, _ := s.pool.Query(context.Background(), query) @@ -159,9 +165,9 @@ func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error return rows.Err() } -func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -170,14 +176,14 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { dstTable := "test_qrep_flow_avro_pg_2" - err := e2e.CreateTableForQRep(s.pool, postgresSuffix, dstTable) - s.NoError(err) + err := e2e.CreateTableForQRep(s.pool, s.suffix, dstTable) + require.NoError(s.t, err) - srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) - dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - postgresSuffix, srcTable) + s.suffix, srcTable) postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) @@ -191,7 +197,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { true, "", ) - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -199,19 +205,17 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*") - if err != nil { - s.FailNow(err.Error()) - } + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -220,11 +224,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ dstTable := "test_qrep_columns_pg_2" - srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) - dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - postgresSuffix, srcTable) + s.suffix, srcTable) postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) @@ -238,7 +242,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ true, "_PEERDB_SYNCED_AT", ) - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -246,12 +250,10 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) err = s.checkSyncedAt(dstSchemaQualified) - if err != nil { - s.FailNow(err.Error()) - } + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) }