diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index b50a1f89fc..01217c1cdf 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -7,44 +7,46 @@ import ( "testing" "time" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" + "github.com/ysmood/got" ) type PostgresReplicationSnapshotTestSuite struct { - suite.Suite + got.G + t *testing.T + connector *PostgresConnector } -func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() { - var err error - suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ +func setupSuite(t *testing.T, g got.G) PostgresReplicationSnapshotTestSuite { + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }) - require.NoError(suite.T(), err) + require.NoError(t, err) - setupTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.T(), err) + setupTx, err := connector.pool.Begin(context.Background()) + require.NoError(t, err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - require.NoError(suite.T(), err) + require.NoError(t, err) } }() _, err = setupTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_repl_test CASCADE") - require.NoError(suite.T(), err) + require.NoError(t, err) _, err = setupTx.Exec(context.Background(), "CREATE SCHEMA pgpeer_repl_test") - require.NoError(suite.T(), err) + require.NoError(t, err) // setup 3 tables in pgpeer_repl_test schema // test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5 @@ -52,59 +54,65 @@ func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() { for _, table := range tables { _, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE TABLE pgpeer_repl_test.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", table)) - require.NoError(suite.T(), err) + require.NoError(t, err) } err = setupTx.Commit(context.Background()) - require.NoError(suite.T(), err) + require.NoError(t, err) + + return PostgresReplicationSnapshotTestSuite{ + G: g, + t: t, + connector: connector, + } } -func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() { +func (suite PostgresReplicationSnapshotTestSuite) TearDownSuite() { teardownTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) defer func() { err := teardownTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - require.NoError(suite.T(), err) + require.NoError(suite.t, err) } }() _, err = teardownTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_test CASCADE") - require.NoError(suite.T(), err) + require.NoError(suite.t, err) // Fetch all the publications rows, err := teardownTx.Query(context.Background(), "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) // Iterate over the publications and drop them for rows.Next() { var pubname pgtype.Text err := rows.Scan(&pubname) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) // Drop the publication in a new transaction _, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String)) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) } _, err = teardownTx.Exec(context.Background(), "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) err = teardownTx.Commit(context.Background()) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) suite.True(suite.connector.ConnectionActive() == nil) err = suite.connector.Close() - require.NoError(suite.T(), err) + require.NoError(suite.t, err) suite.False(suite.connector.ConnectionActive() == nil) } -func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { +func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { tables := map[string]string{ "pgpeer_repl_test.test_1": "test_1_dst", } @@ -121,7 +129,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { // Moved to a go routine go func() { err := suite.connector.SetupReplication(signal, setupReplicationInput) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) }() slog.Info("waiting for slot creation to complete", flowLog) @@ -136,5 +144,5 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { } func TestPostgresReplTestSuite(t *testing.T) { - suite.Run(t, new(PostgresReplicationSnapshotTestSuite)) + got.Each(t, e2eshared.GotSuite(setupSuite)) } diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 98a6a47b99..4ef2e47d5c 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -5,79 +5,81 @@ import ( "fmt" "testing" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pgx/v5" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/require" + "github.com/ysmood/got" ) type PostgresSchemaDeltaTestSuite struct { - suite.Suite + got.G + t *testing.T + connector *PostgresConnector } const schemaDeltaTestSchemaName = "pgschema_delta_test" -func (suite *PostgresSchemaDeltaTestSuite) failTestError(err error) { - if err != nil { - suite.FailNow(err.Error()) - } -} - -func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() { - var err error - suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ +func setupSchemaDeltaSuite(t *testing.T, g got.G) PostgresSchemaDeltaTestSuite { + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }) - suite.failTestError(err) + require.NoError(t, err) - setupTx, err := suite.connector.pool.Begin(context.Background()) - suite.failTestError(err) + setupTx, err := connector.pool.Begin(context.Background()) + require.NoError(t, err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - suite.failTestError(err) + require.NoError(t, err) } }() _, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(t, err) _, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(t, err) err = setupTx.Commit(context.Background()) - suite.failTestError(err) + require.NoError(t, err) + return PostgresSchemaDeltaTestSuite{ + G: g, + t: t, + connector: connector, + } } -func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() { +func (suite PostgresSchemaDeltaTestSuite) TearDownSuite() { teardownTx, err := suite.connector.pool.Begin(context.Background()) - suite.failTestError(err) + require.NoError(suite.t, err) defer func() { err := teardownTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - suite.failTestError(err) + require.NoError(suite.t, err) } }() _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(suite.t, err) err = teardownTx.Commit(context.Background()) - suite.failTestError(err) + require.NoError(suite.t, err) suite.True(suite.connector.ConnectionActive() == nil) err = suite.connector.Close() - suite.failTestError(err) + require.NoError(suite.t, err) suite.False(suite.connector.ConnectionActive() == nil) } -func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { +func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.simple_add_column", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, @@ -87,12 +89,12 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { ColumnType: string(qvalue.QValueKindInt64), }}, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) + require.NoError(suite.t, err) suite.Equal(&protos.TableSchema{ TableIdentifier: tableName, Columns: map[string]string{ @@ -103,11 +105,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { }, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { +func (suite PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.add_drop_all_column_types", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -148,20 +150,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) + require.NoError(suite.t, err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { +func (suite PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -194,20 +196,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) + require.NoError(suite.t, err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { +func (suite PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -234,15 +236,15 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) + require.NoError(suite.t, err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { - suite.Run(t, new(PostgresSchemaDeltaTestSuite)) + got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite)) } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index b2dd84ea44..e767717d82 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -3,22 +3,17 @@ package cdc_records import ( "crypto/rand" + "testing" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/stretchr/testify/require" ) -type CDCRecordStorageTestSuite struct { - suite.Suite - testsuite.WorkflowTestSuite -} - -func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.Record) { - pkeyColVal := make([]byte, 0, 32) +func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { + pkeyColVal := make([]byte, 32) _, err := rand.Read(pkeyColVal) - s.NoError(err) + require.NoError(t, err) key := model.TableWithPkey{ TableName: "test_src_tbl", @@ -40,50 +35,52 @@ func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.R return key, rec } -func (s *CDCRecordStorageTestSuite) TestSingleRecord() { +func TestSingleRecord(t *testing.T) { + t.Parallel() cdcRecordsStore := NewCDCRecordsStore("test_single_record") cdcRecordsStore.numRecordsSwitchThreshold = 10 - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) + require.NoError(t, err) // should not spill into DB - s.Equal(1, len(cdcRecordsStore.inMemoryRecords)) - s.Nil(cdcRecordsStore.pebbleDB) + require.Equal(t, 1, len(cdcRecordsStore.inMemoryRecords)) + require.Nil(t, cdcRecordsStore.pebbleDB) reck, ok, err := cdcRecordsStore.Get(key) - s.NoError(err) - s.True(ok) - s.Equal(rec, reck) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, reck) - s.NoError(cdcRecordsStore.Close()) + require.NoError(t, cdcRecordsStore.Close()) } -func (s *CDCRecordStorageTestSuite) TestRecordsTillSpill() { +func TestRecordsTillSpill(t *testing.T) { + t.Parallel() cdcRecordsStore := NewCDCRecordsStore("test_records_till_spill") cdcRecordsStore.numRecordsSwitchThreshold = 10 // add records upto set limit for i := 0; i < 10; i++ { - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) - s.Equal(i+1, len(cdcRecordsStore.inMemoryRecords)) - s.Nil(cdcRecordsStore.pebbleDB) + require.NoError(t, err) + require.Equal(t, i+1, len(cdcRecordsStore.inMemoryRecords)) + require.Nil(t, cdcRecordsStore.pebbleDB) } // this record should be spilled to DB - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) + require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] - s.False(ok) - s.NotNil(cdcRecordsStore.pebbleDB) + require.False(t, ok) + require.NotNil(t, cdcRecordsStore.pebbleDB) reck, ok, err := cdcRecordsStore.Get(key) - s.NoError(err) - s.True(ok) - s.Equal(rec, reck) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, reck) - s.NoError(cdcRecordsStore.Close()) + require.NoError(t, cdcRecordsStore.Close()) } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index fb9dadb9ba..9d4ca9c6ce 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -12,7 +12,7 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" peer_bq "github.com/PeerDB-io/peer-flow/connectors/bigquery" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -46,7 +46,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) { return nil, fmt.Errorf("TEST_BQ_CREDS env var not set") } - content, err := e2e.ReadFileToBytes(jsonPath) + content, err := e2eshared.ReadFileToBytes(jsonPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index b28577f4d3..4f3f3026c6 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -28,20 +29,7 @@ type PeerFlowE2ETestSuiteBQ struct { } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteBQ { - g := got.New(t) - - // Concurrently run each test - g.Parallel() - - suite := setupSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(setupSuite)) } func (s PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { @@ -142,8 +130,7 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { +func (s PeerFlowE2ETestSuiteBQ) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { slog.Error("failed to tear down postgres", slog.Any("error", err)) diff --git a/flow/e2e/e2eshared/utils.go b/flow/e2e/e2eshared/utils.go new file mode 100644 index 0000000000..e68b6164a6 --- /dev/null +++ b/flow/e2e/e2eshared/utils.go @@ -0,0 +1,43 @@ +package e2eshared + +import ( + "fmt" + "io" + "os" + "testing" + + "github.com/ysmood/got" +) + +func GotSuite[T interface{ TearDownSuite() }](setup func(t *testing.T, g got.G) T) func(t *testing.T) T { + return func(t *testing.T) T { + g := got.New(t) + g.Parallel() + + suite := setup(t, g) + g.Cleanup(func() { + suite.TearDownSuite() + }) + + return suite + } +} + +// ReadFileToBytes reads a file to a byte array. +func ReadFileToBytes(path string) ([]byte, error) { + var ret []byte + + f, err := os.Open(path) + if err != nil { + return ret, fmt.Errorf("failed to open file: %w", err) + } + + defer f.Close() + + ret, err = io.ReadAll(f) + if err != nil { + return ret, fmt.Errorf("failed to read file: %w", err) + } + + return ret, nil +} diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index da050ccf64..899d48a0aa 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -9,17 +9,18 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgtype" + "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { +func (s PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { return fmt.Sprintf("e2e_test_%s.%s", postgresSuffix, tableName) } -func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string { +func (s PeerFlowE2ETestSuitePG) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, postgresSuffix) } -func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { +func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { query := fmt.Sprintf(`SELECT "_PEERDB_IS_DELETED","_PEERDB_SYNCED_AT" FROM %s WHERE id = %d`, dstSchemaQualified, rowID) var isDeleted pgtype.Bool @@ -40,9 +41,9 @@ func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, r return nil } -func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_flow") dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") @@ -54,7 +55,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), @@ -64,7 +65,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -82,7 +83,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -90,22 +91,22 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,key,value") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") @@ -116,7 +117,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -126,7 +127,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -140,7 +141,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -156,19 +157,19 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - s.NoError(err) + require.NoError(s.t, err) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -185,19 +186,19 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") - s.NoError(err) + require.NoError(s.t, err) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -215,19 +216,19 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") - s.NoError(err) + require.NoError(s.t, err) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -245,28 +246,28 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst") @@ -280,7 +281,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -290,7 +291,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -307,41 +308,41 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - s.NoError(err) + require.NoError(s.t, err) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst") @@ -359,7 +360,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -369,7 +370,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -381,7 +382,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(s.t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -389,40 +390,40 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") @@ -440,7 +441,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -450,7 +451,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -468,38 +469,38 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") @@ -511,7 +512,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), @@ -522,7 +523,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -537,26 +538,26 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) // delete that row _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1 `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted and deleted a row for peerdb column check") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") checkErr := s.checkPeerdbColumns(dstTableName, 1) - s.NoError(checkErr) - env.AssertExpectations(s.T()) + require.NoError(s.t, checkErr) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 1c86c973b9..dc9a098fba 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -9,19 +9,20 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/stretchr/testify/require" + "github.com/ysmood/got" ) const postgresSuffix = "postgres" type PeerFlowE2ETestSuitePG struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool peer *protos.Peer @@ -29,11 +30,10 @@ type PeerFlowE2ETestSuitePG struct { } func TestPeerFlowE2ETestSuitePG(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuitePG)) + got.Each(t, e2eshared.GotSuite(setupSuite)) } -// Implement SetupAllSuite interface to setup the test suite -func (s *PeerFlowE2ETestSuitePG) SetupSuite() { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuitePG { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -43,12 +43,11 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { pool, err := e2e.SetupPostgres(postgresSuffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + t.Fatal("failed to setup postgres", err) } - s.pool = pool - s.peer = generatePGPeer(e2e.GetTestPostgresConf()) + peer := generatePGPeer(e2e.GetTestPostgresConf()) - s.connector, err = connpostgres.NewPostgresConnector(context.Background(), + connector, err := connpostgres.NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, @@ -56,25 +55,31 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { Password: "postgres", Database: "postgres", }) - s.NoError(err) + require.NoError(t, err) + return PeerFlowE2ETestSuitePG{ + G: g, + t: t, + pool: pool, + peer: peer, + connector: connector, + } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuitePG) TearDownSuite() { +func (s PeerFlowE2ETestSuitePG) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, postgresSuffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + s.t.Fatal("failed to drop Postgres schema", err) } } -func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { +func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { err := e2e.CreateTableForQRep(s.pool, postgresSuffix, tableName) - s.NoError(err) + require.NoError(s.t, err) err = e2e.PopulateSourceTable(s.pool, postgresSuffix, tableName, rowCount) - s.NoError(err) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { // Execute the two EXCEPT queries for { err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector) @@ -104,7 +109,7 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu return nil } -func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) rows, _ := s.pool.Query(context.Background(), query) @@ -135,7 +140,7 @@ func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQuali return nil } -func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { +func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) rows, _ := s.pool.Query(context.Background(), query) @@ -156,9 +161,9 @@ func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error return rows.Err() } -func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -170,7 +175,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { dstTable := "test_qrep_flow_avro_pg_2" err := e2e.CreateTableForQRep(s.pool, postgresSuffix, dstTable) - s.NoError(err) + require.NoError(s.t, err) srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) @@ -190,7 +195,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { true, "", ) - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -198,19 +203,19 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*") if err != nil { - s.FailNow(err.Error()) + require.FailNow(s.t, err.Error()) } - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -239,7 +244,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ true, "_PEERDB_SYNCED_AT", ) - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -247,12 +252,12 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) err = s.checkSyncedAt(dstSchemaQualified) if err != nil { - s.FailNow(err.Error()) + require.FailNow(s.t, err.Error()) } - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index a938f673b3..408d44d776 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -10,22 +10,23 @@ import ( "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { +func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName) } -func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { +func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s3Suffix) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) - setupErr := s.setupS3("s3") + helper, setupErr := setupS3("s3") if setupErr != nil { - s.Fail("failed to setup S3", setupErr) + require.Fail(s.t, "failed to setup S3", setupErr) } + s.s3Helper = helper srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") @@ -37,7 +38,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -46,7 +47,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -56,7 +57,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -64,9 +65,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -76,28 +77,29 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() fmt.Println("JobName: ", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) - setupErr := s.setupS3("gcs") +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + helper, setupErr := setupS3("gcs") if setupErr != nil { - s.Fail("failed to setup S3", setupErr) + require.Fail(s.t, "failed to setup S3", setupErr) } + s.s3Helper = helper srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") @@ -109,7 +111,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -118,7 +120,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -128,7 +130,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -136,10 +138,10 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 20 rows into the source table") - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -149,17 +151,17 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() fmt.Println("JobName: ", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files)) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index fda57ced09..f9a399f049 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -8,49 +8,39 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/ysmood/got" ) const s3Suffix = "s3" type PeerFlowE2ETestSuiteS3 struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool s3Helper *S3TestHelper } func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteS3)) + got.Each(t, e2eshared.GotSuite(setupSuite)) } -func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { +func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { err := e2e.CreateTableForQRep(s.pool, s3Suffix, tableName) - s.NoError(err) + require.NoError(s.t, err) err = e2e.PopulateSourceTable(s.pool, s3Suffix, tableName, rowCount) - s.NoError(err) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteS3) setupS3(mode string) error { - switchToGCS := false - if mode == "gcs" { - switchToGCS = true - } - helper, err := NewS3TestHelper(switchToGCS) - if err != nil { - return err - } - - s.s3Helper = helper - return nil +func setupS3(mode string) (*S3TestHelper, error) { + return NewS3TestHelper(mode == "gcs") } -func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -60,38 +50,43 @@ func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { pool, err := e2e.SetupPostgres(s3Suffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + require.Fail(t, "failed to setup postgres", err) } - s.pool = pool - err = s.setupS3("s3") + helper, err := setupS3("s3") if err != nil { - s.Fail("failed to setup S3", err) + require.Fail(t, "failed to setup S3", err) + } + + return PeerFlowE2ETestSuiteS3{ + G: g, + t: t, + pool: pool, + s3Helper: helper, } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteS3) TearDownSuite() { +func (s PeerFlowE2ETestSuiteS3) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, s3Suffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + require.Fail(s.t, "failed to drop Postgres schema", err) } if s.s3Helper != nil { err = s.s3Helper.CleanUp() if err != nil { - s.Fail("failed to clean up s3", err) + require.Fail(s.t, "failed to clean up s3", err) } } } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) jobName := "test_complete_flow_s3" schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) @@ -109,16 +104,16 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -127,20 +122,20 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 1, len(files)) + require.Equal(s.t, 1, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) jobName := "test_complete_flow_s3_ctid" schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) @@ -157,7 +152,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url qrepConfig.NumRowsPerPartition = 2000 qrepConfig.InitialCopyOnly = true @@ -166,10 +161,10 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -178,9 +173,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 10, len(files)) + require.Equal(s.t, 10, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 249745dd3c..08c56d4790 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -9,7 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" @@ -34,7 +34,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { bucketName = "peerdb_staging" } - content, err := e2e.ReadFileToBytes(credsPath) + content, err := e2eshared.ReadFileToBytes(credsPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 8d521dbb72..09532fa528 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -13,6 +13,7 @@ import ( connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -33,20 +34,7 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteSF { - g := got.New(t) - - // Concurrently run each test - g.Parallel() - - suite := SetupSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(setupSuite)) } func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { @@ -57,7 +45,7 @@ func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.pgSuffix) } -func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -100,7 +88,7 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { } // Implement TearDownAllSuite interface to tear down the test suite -func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { +func (s PeerFlowE2ETestSuiteSF) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { slog.Error("failed to tear down Postgres", slog.Any("error", err)) diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index 0401d34f58..c714d2a5bd 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -9,7 +9,7 @@ import ( "time" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -37,7 +37,7 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) { return nil, fmt.Errorf("TEST_SF_CREDS env var not set") } - content, err := e2e.ReadFileToBytes(jsonPath) + content, err := e2eshared.ReadFileToBytes(jsonPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index f8a06733b1..1a448dd97e 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -7,6 +7,7 @@ import ( "testing" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/ysmood/got" @@ -56,7 +57,7 @@ func setupSchemaDeltaSuite( } } -func (suite SnowflakeSchemaDeltaTestSuite) tearDownSuite() { +func (suite SnowflakeSchemaDeltaTestSuite) TearDownSuite() { err := suite.sfTestHelper.Cleanup() suite.failTestError(err) err = suite.connector.Close() @@ -220,17 +221,5 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { } func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { - got.Each(t, func(t *testing.T) SnowflakeSchemaDeltaTestSuite { - g := got.New(t) - - g.Parallel() - - suite := setupSchemaDeltaSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite)) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 2d327458af..00807706fa 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2e/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -17,38 +18,36 @@ import ( "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/ysmood/got" ) const sqlserverSuffix = "sqlserver" type PeerFlowE2ETestSuiteSQLServer struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool sqlsHelper *SQLServerHelper } func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteSQLServer)) + got.Each(t, e2eshared.GotSuite(setupSuite)) } // setup sql server connection -func (s *PeerFlowE2ETestSuiteSQLServer) setupSQLServer() { +func setupSQLServer(t *testing.T) *SQLServerHelper { env := os.Getenv("ENABLE_SQLSERVER_TESTS") if env != "true" { - s.sqlsHelper = nil - return + return nil } sqlsHelper, err := NewSQLServerHelper("test_sqlserver_peer") - require.NoError(s.T(), err) - s.sqlsHelper = sqlsHelper + require.NoError(t, err) + return sqlsHelper } -func (s *PeerFlowE2ETestSuiteSQLServer) SetupSuite() { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSQLServer { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -58,35 +57,38 @@ func (s *PeerFlowE2ETestSuiteSQLServer) SetupSuite() { pool, err := e2e.SetupPostgres(sqlserverSuffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + require.Fail(t, "failed to setup postgres", err) } - s.pool = pool - s.setupSQLServer() + return PeerFlowE2ETestSuiteSQLServer{ + G: g, + t: t, + pool: pool, + sqlsHelper: setupSQLServer(t), + } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteSQLServer) TearDownSuite() { +func (s PeerFlowE2ETestSuiteSQLServer) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, sqlserverSuffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + require.Fail(s.t, "failed to drop Postgres schema", err) } if s.sqlsHelper != nil { err = s.sqlsHelper.CleanUp() if err != nil { - s.Fail("failed to clean up sqlserver", err) + require.Fail(s.t, "failed to clean up sqlserver", err) } } } -func (s *PeerFlowE2ETestSuiteSQLServer) setupSQLServerTable(tableName string) { +func (s PeerFlowE2ETestSuiteSQLServer) setupSQLServerTable(tableName string) { schema := getSimpleTableSchema() err := s.sqlsHelper.CreateTable(schema, tableName) - require.NoError(s.T(), err) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName string, numRows int) { +func (s PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName string, numRows int) { schemaQualified := fmt.Sprintf("%s.%s", s.sqlsHelper.SchemaName, tableName) for i := 0; i < numRows; i++ { params := make(map[string]interface{}) @@ -102,20 +104,20 @@ func (s *PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName s params, ) - require.NoError(s.T(), err) + require.NoError(s.t, err) } } -func (s *PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string) { +func (s PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string) { ctx := context.Background() _, err := s.pool.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS e2e_test_%s.%s", sqlserverSuffix, tableName)) - require.NoError(s.T(), err) + require.NoError(s.t, err) _, err = s.pool.Exec(ctx, fmt.Sprintf("CREATE TABLE e2e_test_%s.%s (id TEXT, card_id TEXT, v_from TIMESTAMP, price NUMERIC, status INT)", sqlserverSuffix, tableName)) - require.NoError(s.T(), err) + require.NoError(s.t, err) } func getSimpleTableSchema() *model.QRecordSchema { @@ -130,13 +132,13 @@ func getSimpleTableSchema() *model.QRecordSchema { } } -func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append() { +func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append() { if s.sqlsHelper == nil { - s.T().Skip("Skipping SQL Server test") + s.t.Skip("Skipping SQL Server test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 tblName := "test_qrep_flow_avro_ss_append" @@ -170,16 +172,16 @@ func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify that the destination table has the same number of rows as the source table var numRowsInDest pgtype.Int8 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s", dstTableName) err = s.pool.QueryRow(context.Background(), countQuery).Scan(&numRowsInDest) - s.NoError(err) + require.NoError(s.t, err) - s.Equal(numRows, int(numRowsInDest.Int64)) + require.Equal(s.t, numRows, int(numRowsInDest.Int64)) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 13ca8044e5..47f9b84bad 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log/slog" "os" "strings" @@ -24,25 +23,6 @@ import ( "go.temporal.io/sdk/testsuite" ) -// ReadFileToBytes reads a file to a byte array. -func ReadFileToBytes(path string) ([]byte, error) { - var ret []byte - - f, err := os.Open(path) - if err != nil { - return ret, fmt.Errorf("failed to open file: %w", err) - } - - defer f.Close() - - ret, err = io.ReadAll(f) - if err != nil { - return ret, fmt.Errorf("failed to read file: %w", err) - } - - return ret, nil -} - func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *testing.T) { conn, err := utils.GetCatalogConnectionPoolFromEnv() if err != nil {