From 19389769b8f84ec0dc3b40dae3e214d6ac485541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 21 Dec 2023 23:19:05 +0000 Subject: [PATCH] Remove use of testify/suite Also run everything in parallel, randomizing suffixes as necessary --- .../connectors/postgres/postgres_repl_test.go | 81 +++++--- .../postgres/postgres_schema_delta_test.go | 84 ++++---- .../cdc_records/cdc_records_storage_test.go | 61 +++--- flow/e2e/bigquery/bigquery_helper.go | 4 +- flow/e2e/bigquery/peer_flow_bq_test.go | 19 +- flow/e2e/postgres/peer_flow_pg_test.go | 189 +++++++++--------- flow/e2e/postgres/qrep_flow_pg_test.go | 106 +++++----- flow/e2e/s3/cdc_s3_test.go | 70 +++---- flow/e2e/s3/qrep_flow_s3_test.go | 111 +++++----- flow/e2e/s3/s3_helper.go | 4 +- flow/e2e/snowflake/peer_flow_sf_test.go | 20 +- flow/e2e/snowflake/snowflake_helper.go | 4 +- .../snowflake/snowflake_schema_delta_test.go | 17 +- .../e2e/sqlserver/qrep_flow_sqlserver_test.go | 68 ++++--- flow/e2e/test_utils.go | 20 -- flow/e2eshared/e2eshared.go | 43 ++++ 16 files changed, 456 insertions(+), 445 deletions(-) create mode 100644 flow/e2eshared/e2eshared.go diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index b50a1f89fc..82c65d899c 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -7,106 +7,121 @@ import ( "testing" "time" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" + "github.com/ysmood/got" ) type PostgresReplicationSnapshotTestSuite struct { - suite.Suite + got.G + t *testing.T + connector *PostgresConnector + schema string } -func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() { - var err error - suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ +func setupSuite(t *testing.T, g got.G) PostgresReplicationSnapshotTestSuite { + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }) - require.NoError(suite.T(), err) + require.NoError(t, err) - setupTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.T(), err) + setupTx, err := connector.pool.Begin(context.Background()) + require.NoError(t, err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - require.NoError(suite.T(), err) + require.NoError(t, err) } }() - _, err = setupTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_repl_test CASCADE") - require.NoError(suite.T(), err) + schema := "repltest_" + shared.RandomString(8) + + _, err = setupTx.Exec(context.Background(), + fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema)) + require.NoError(t, err) - _, err = setupTx.Exec(context.Background(), "CREATE SCHEMA pgpeer_repl_test") - require.NoError(suite.T(), err) + _, err = setupTx.Exec(context.Background(), + fmt.Sprintf("CREATE SCHEMA %s", schema)) + require.NoError(t, err) - // setup 3 tables in pgpeer_repl_test schema - // test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5 + // setup 3 tables test_1, test_2, test_3 + // all have 5 text columns c1, c2, c3, c4, c5 tables := []string{"test_1", "test_2", "test_3"} for _, table := range tables { _, err = setupTx.Exec(context.Background(), - fmt.Sprintf("CREATE TABLE pgpeer_repl_test.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", table)) - require.NoError(suite.T(), err) + fmt.Sprintf("CREATE TABLE %s.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", schema, table)) + require.NoError(t, err) } err = setupTx.Commit(context.Background()) - require.NoError(suite.T(), err) + require.NoError(t, err) + + return PostgresReplicationSnapshotTestSuite{ + G: g, + t: t, + connector: connector, + schema: schema, + } } -func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() { +func (suite PostgresReplicationSnapshotTestSuite) TearDownSuite() { teardownTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) defer func() { err := teardownTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - require.NoError(suite.T(), err) + require.NoError(suite.t, err) } }() - _, err = teardownTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_test CASCADE") - require.NoError(suite.T(), err) + _, err = teardownTx.Exec(context.Background(), + fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema)) + require.NoError(suite.t, err) // Fetch all the publications rows, err := teardownTx.Query(context.Background(), "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) // Iterate over the publications and drop them for rows.Next() { var pubname pgtype.Text err := rows.Scan(&pubname) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) // Drop the publication in a new transaction _, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String)) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) } _, err = teardownTx.Exec(context.Background(), "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) err = teardownTx.Commit(context.Background()) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) suite.True(suite.connector.ConnectionActive() == nil) err = suite.connector.Close() - require.NoError(suite.T(), err) + require.NoError(suite.t, err) suite.False(suite.connector.ConnectionActive() == nil) } -func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { +func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { tables := map[string]string{ - "pgpeer_repl_test.test_1": "test_1_dst", + suite.schema + ".test_1": "test_1_dst", } flowJobName := "test_simple_slot_creation" @@ -121,7 +136,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { // Moved to a go routine go func() { err := suite.connector.SetupReplication(signal, setupReplicationInput) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) }() slog.Info("waiting for slot creation to complete", flowLog) @@ -136,5 +151,5 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { } func TestPostgresReplTestSuite(t *testing.T) { - suite.Run(t, new(PostgresReplicationSnapshotTestSuite)) + got.Each(t, e2eshared.GotSuite(setupSuite)) } diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 98a6a47b99..20d855607f 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -5,79 +5,81 @@ import ( "fmt" "testing" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pgx/v5" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/require" + "github.com/ysmood/got" ) type PostgresSchemaDeltaTestSuite struct { - suite.Suite + got.G + t *testing.T + connector *PostgresConnector } const schemaDeltaTestSchemaName = "pgschema_delta_test" -func (suite *PostgresSchemaDeltaTestSuite) failTestError(err error) { - if err != nil { - suite.FailNow(err.Error()) - } -} - -func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() { - var err error - suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ +func setupSchemaDeltaSuite(t *testing.T, g got.G) PostgresSchemaDeltaTestSuite { + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }) - suite.failTestError(err) + require.NoError(t, err) - setupTx, err := suite.connector.pool.Begin(context.Background()) - suite.failTestError(err) + setupTx, err := connector.pool.Begin(context.Background()) + require.NoError(t, err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - suite.failTestError(err) + require.NoError(t, err) } }() _, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(t, err) _, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(t, err) err = setupTx.Commit(context.Background()) - suite.failTestError(err) + require.NoError(t, err) + return PostgresSchemaDeltaTestSuite{ + G: g, + t: t, + connector: connector, + } } -func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() { +func (suite PostgresSchemaDeltaTestSuite) TearDownSuite() { teardownTx, err := suite.connector.pool.Begin(context.Background()) - suite.failTestError(err) + require.NoError(suite.t, err) defer func() { err := teardownTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - suite.failTestError(err) + require.NoError(suite.t, err) } }() _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(suite.t, err) err = teardownTx.Commit(context.Background()) - suite.failTestError(err) + require.NoError(suite.t, err) suite.True(suite.connector.ConnectionActive() == nil) err = suite.connector.Close() - suite.failTestError(err) + require.NoError(suite.t, err) suite.False(suite.connector.ConnectionActive() == nil) } -func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { +func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.simple_add_column", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, @@ -87,12 +89,12 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { ColumnType: string(qvalue.QValueKindInt64), }}, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) + require.NoError(suite.t, err) suite.Equal(&protos.TableSchema{ TableIdentifier: tableName, Columns: map[string]string{ @@ -103,11 +105,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { }, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { +func (suite PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.add_drop_all_column_types", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -148,20 +150,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) + require.NoError(suite.t, err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { +func (suite PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -194,20 +196,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) + require.NoError(suite.t, err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { +func (suite PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -234,15 +236,15 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) + require.NoError(suite.t, err) suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { - suite.Run(t, new(PostgresSchemaDeltaTestSuite)) + got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite)) } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index b2dd84ea44..e767717d82 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -3,22 +3,17 @@ package cdc_records import ( "crypto/rand" + "testing" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/stretchr/testify/require" ) -type CDCRecordStorageTestSuite struct { - suite.Suite - testsuite.WorkflowTestSuite -} - -func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.Record) { - pkeyColVal := make([]byte, 0, 32) +func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { + pkeyColVal := make([]byte, 32) _, err := rand.Read(pkeyColVal) - s.NoError(err) + require.NoError(t, err) key := model.TableWithPkey{ TableName: "test_src_tbl", @@ -40,50 +35,52 @@ func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.R return key, rec } -func (s *CDCRecordStorageTestSuite) TestSingleRecord() { +func TestSingleRecord(t *testing.T) { + t.Parallel() cdcRecordsStore := NewCDCRecordsStore("test_single_record") cdcRecordsStore.numRecordsSwitchThreshold = 10 - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) + require.NoError(t, err) // should not spill into DB - s.Equal(1, len(cdcRecordsStore.inMemoryRecords)) - s.Nil(cdcRecordsStore.pebbleDB) + require.Equal(t, 1, len(cdcRecordsStore.inMemoryRecords)) + require.Nil(t, cdcRecordsStore.pebbleDB) reck, ok, err := cdcRecordsStore.Get(key) - s.NoError(err) - s.True(ok) - s.Equal(rec, reck) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, reck) - s.NoError(cdcRecordsStore.Close()) + require.NoError(t, cdcRecordsStore.Close()) } -func (s *CDCRecordStorageTestSuite) TestRecordsTillSpill() { +func TestRecordsTillSpill(t *testing.T) { + t.Parallel() cdcRecordsStore := NewCDCRecordsStore("test_records_till_spill") cdcRecordsStore.numRecordsSwitchThreshold = 10 // add records upto set limit for i := 0; i < 10; i++ { - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) - s.Equal(i+1, len(cdcRecordsStore.inMemoryRecords)) - s.Nil(cdcRecordsStore.pebbleDB) + require.NoError(t, err) + require.Equal(t, i+1, len(cdcRecordsStore.inMemoryRecords)) + require.Nil(t, cdcRecordsStore.pebbleDB) } // this record should be spilled to DB - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) + require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] - s.False(ok) - s.NotNil(cdcRecordsStore.pebbleDB) + require.False(t, ok) + require.NotNil(t, cdcRecordsStore.pebbleDB) reck, ok, err := cdcRecordsStore.Get(key) - s.NoError(err) - s.True(ok) - s.Equal(rec, reck) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, reck) - s.NoError(cdcRecordsStore.Close()) + require.NoError(t, cdcRecordsStore.Close()) } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index fb9dadb9ba..bc1fac64f0 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -12,7 +12,7 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" peer_bq "github.com/PeerDB-io/peer-flow/connectors/bigquery" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -46,7 +46,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) { return nil, fmt.Errorf("TEST_BQ_CREDS env var not set") } - content, err := e2e.ReadFileToBytes(jsonPath) + content, err := e2eshared.ReadFileToBytes(jsonPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index b28577f4d3..0988195bee 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -28,20 +29,7 @@ type PeerFlowE2ETestSuiteBQ struct { } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteBQ { - g := got.New(t) - - // Concurrently run each test - g.Parallel() - - suite := setupSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(setupSuite)) } func (s PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { @@ -142,8 +130,7 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { +func (s PeerFlowE2ETestSuiteBQ) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { slog.Error("failed to tear down postgres", slog.Any("error", err)) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index da050ccf64..30be647222 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -9,17 +9,18 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgtype" + "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", postgresSuffix, tableName) +func (s PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) } -func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, postgresSuffix) +func (s PeerFlowE2ETestSuitePG) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.suffix) } -func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { +func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { query := fmt.Sprintf(`SELECT "_PEERDB_IS_DELETED","_PEERDB_SYNCED_AT" FROM %s WHERE id = %d`, dstSchemaQualified, rowID) var isDeleted pgtype.Bool @@ -40,9 +41,9 @@ func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, r return nil } -func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_flow") dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") @@ -54,7 +55,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), @@ -64,7 +65,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -82,7 +83,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -90,22 +91,22 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,key,value") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") @@ -116,7 +117,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -126,7 +127,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -140,7 +141,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -156,19 +157,19 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - s.NoError(err) + require.NoError(s.t, err) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -185,19 +186,19 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") - s.NoError(err) + require.NoError(s.t, err) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -215,19 +216,19 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") - s.NoError(err) + require.NoError(s.t, err) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -245,28 +246,28 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst") @@ -280,7 +281,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -290,7 +291,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -307,41 +308,41 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - s.NoError(err) + require.NoError(s.t, err) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst") @@ -359,7 +360,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -369,7 +370,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -381,7 +382,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(s.t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -389,40 +390,40 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") @@ -440,7 +441,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -450,7 +451,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -468,38 +469,38 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") @@ -511,7 +512,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), @@ -522,7 +523,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -537,26 +538,26 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) // delete that row _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1 `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted and deleted a row for peerdb column check") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") checkErr := s.checkPeerdbColumns(dstTableName, 1) - s.NoError(checkErr) - env.AssertExpectations(s.T()) + require.NoError(s.t, checkErr) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 1c86c973b9..815c086529 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -9,31 +9,31 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/stretchr/testify/require" + "github.com/ysmood/got" ) -const postgresSuffix = "postgres" - type PeerFlowE2ETestSuitePG struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool peer *protos.Peer connector *connpostgres.PostgresConnector + suffix string } func TestPeerFlowE2ETestSuitePG(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuitePG)) + got.Each(t, e2eshared.GotSuite(setupSuite)) } -// Implement SetupAllSuite interface to setup the test suite -func (s *PeerFlowE2ETestSuitePG) SetupSuite() { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuitePG { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -41,14 +41,15 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { slog.Info("Unable to load .env file, using default values from env") } - pool, err := e2e.SetupPostgres(postgresSuffix) + suffix := "pgtest_" + shared.RandomString(8) + + pool, err := e2e.SetupPostgres(suffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + t.Fatal("failed to setup postgres", err) } - s.pool = pool - s.peer = generatePGPeer(e2e.GetTestPostgresConf()) + peer := generatePGPeer(e2e.GetTestPostgresConf()) - s.connector, err = connpostgres.NewPostgresConnector(context.Background(), + connector, err := connpostgres.NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, @@ -56,25 +57,32 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { Password: "postgres", Database: "postgres", }) - s.NoError(err) + require.NoError(t, err) + return PeerFlowE2ETestSuitePG{ + G: g, + t: t, + pool: pool, + peer: peer, + connector: connector, + suffix: suffix, + } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuitePG) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, postgresSuffix) +func (s PeerFlowE2ETestSuitePG) TearDownSuite() { + err := e2e.TearDownPostgres(s.pool, s.suffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + s.t.Fatal("failed to drop Postgres schema", err) } } -func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, postgresSuffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, postgresSuffix, tableName, rowCount) - s.NoError(err) +func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) + require.NoError(s.t, err) + err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { // Execute the two EXCEPT queries for { err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector) @@ -104,7 +112,7 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu return nil } -func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) rows, _ := s.pool.Query(context.Background(), query) @@ -135,7 +143,7 @@ func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQuali return nil } -func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { +func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) rows, _ := s.pool.Query(context.Background(), query) @@ -156,9 +164,9 @@ func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error return rows.Err() } -func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -169,14 +177,14 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { //nolint:gosec dstTable := "test_qrep_flow_avro_pg_2" - err := e2e.CreateTableForQRep(s.pool, postgresSuffix, dstTable) - s.NoError(err) + err := e2e.CreateTableForQRep(s.pool, s.suffix, dstTable) + require.NoError(s.t, err) - srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) - dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - postgresSuffix, srcTable) + s.suffix, srcTable) postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) @@ -190,7 +198,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { true, "", ) - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -198,19 +206,19 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*") if err != nil { - s.FailNow(err.Error()) + require.FailNow(s.t, err.Error()) } - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 @@ -221,11 +229,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ //nolint:gosec dstTable := "test_qrep_columns_pg_2" - srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) - dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - postgresSuffix, srcTable) + s.suffix, srcTable) postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) @@ -239,7 +247,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ true, "_PEERDB_SYNCED_AT", ) - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -247,12 +255,12 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) err = s.checkSyncedAt(dstSchemaQualified) if err != nil { - s.FailNow(err.Error()) + require.FailNow(s.t, err.Error()) } - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index a938f673b3..a827f00fc8 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -10,22 +10,23 @@ import ( "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName) +func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) } -func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s3Suffix) +func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.suffix) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) - setupErr := s.setupS3("s3") + helper, setupErr := setupS3("s3") if setupErr != nil { - s.Fail("failed to setup S3", setupErr) + require.Fail(s.t, "failed to setup S3", setupErr) } + s.s3Helper = helper srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") @@ -37,7 +38,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -46,7 +47,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -56,7 +57,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -64,9 +65,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -76,28 +77,29 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() fmt.Println("JobName: ", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) fmt.Println("Files in Test_Complete_Simple_Flow_S3: ", len(files)) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) - setupErr := s.setupS3("gcs") +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + helper, setupErr := setupS3("gcs") if setupErr != nil { - s.Fail("failed to setup S3", setupErr) + require.Fail(s.t, "failed to setup S3", setupErr) } + s.s3Helper = helper srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") @@ -109,7 +111,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -118,7 +120,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -128,7 +130,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -136,10 +138,10 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 20 rows into the source table") - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -149,17 +151,17 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() fmt.Println("JobName: ", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) fmt.Println("Files in Test_Complete_Simple_Flow_GCS: ", len(files)) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index fda57ced09..9a40b68631 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -8,49 +8,39 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/ysmood/got" ) -const s3Suffix = "s3" - type PeerFlowE2ETestSuiteS3 struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool s3Helper *S3TestHelper + suffix string } func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteS3)) + got.Each(t, e2eshared.GotSuite(setupSuite)) } -func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, s3Suffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, s3Suffix, tableName, rowCount) - s.NoError(err) +func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) + require.NoError(s.t, err) + err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteS3) setupS3(mode string) error { - switchToGCS := false - if mode == "gcs" { - switchToGCS = true - } - helper, err := NewS3TestHelper(switchToGCS) - if err != nil { - return err - } - - s.s3Helper = helper - return nil +func setupS3(mode string) (*S3TestHelper, error) { + return NewS3TestHelper(mode == "gcs") } -func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -58,43 +48,50 @@ func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { slog.Info("Unable to load .env file, using default values from env") } - pool, err := e2e.SetupPostgres(s3Suffix) + suffix := "s3" + shared.RandomString(8) + pool, err := e2e.SetupPostgres(suffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + require.Fail(t, "failed to setup postgres", err) } - s.pool = pool - err = s.setupS3("s3") + helper, err := setupS3("s3") if err != nil { - s.Fail("failed to setup S3", err) + require.Fail(t, "failed to setup S3", err) + } + + return PeerFlowE2ETestSuiteS3{ + G: g, + t: t, + pool: pool, + s3Helper: helper, + suffix: suffix, } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteS3) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, s3Suffix) +func (s PeerFlowE2ETestSuiteS3) TearDownSuite() { + err := e2e.TearDownPostgres(s.pool, s.suffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + require.Fail(s.t, "failed to drop Postgres schema", err) } if s.s3Helper != nil { err = s.s3Helper.CleanUp() if err != nil { - s.Fail("failed to clean up s3", err) + require.Fail(s.t, "failed to clean up s3", err) } } } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) jobName := "test_complete_flow_s3" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", @@ -109,16 +106,16 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -127,23 +124,23 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 1, len(files)) + require.Equal(s.t, 1, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) jobName := "test_complete_flow_s3_ctid" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) @@ -157,7 +154,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url qrepConfig.NumRowsPerPartition = 2000 qrepConfig.InitialCopyOnly = true @@ -166,10 +163,10 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -178,9 +175,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 10, len(files)) + require.Equal(s.t, 10, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 249745dd3c..7c2117d538 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -9,7 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" @@ -34,7 +34,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { bucketName = "peerdb_staging" } - content, err := e2e.ReadFileToBytes(credsPath) + content, err := e2eshared.ReadFileToBytes(credsPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 8d521dbb72..bcc0789cad 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -13,6 +13,7 @@ import ( connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -33,20 +34,7 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteSF { - g := got.New(t) - - // Concurrently run each test - g.Parallel() - - suite := SetupSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(setupSuite)) } func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { @@ -57,7 +45,7 @@ func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.pgSuffix) } -func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -100,7 +88,7 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { } // Implement TearDownAllSuite interface to tear down the test suite -func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { +func (s PeerFlowE2ETestSuiteSF) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { slog.Error("failed to tear down Postgres", slog.Any("error", err)) diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index 0401d34f58..8dd9bfa60a 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -9,7 +9,7 @@ import ( "time" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -37,7 +37,7 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) { return nil, fmt.Errorf("TEST_SF_CREDS env var not set") } - content, err := e2e.ReadFileToBytes(jsonPath) + content, err := e2eshared.ReadFileToBytes(jsonPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index f8a06733b1..2c38f63cf5 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -7,6 +7,7 @@ import ( "testing" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/ysmood/got" @@ -56,7 +57,7 @@ func setupSchemaDeltaSuite( } } -func (suite SnowflakeSchemaDeltaTestSuite) tearDownSuite() { +func (suite SnowflakeSchemaDeltaTestSuite) TearDownSuite() { err := suite.sfTestHelper.Cleanup() suite.failTestError(err) err = suite.connector.Close() @@ -220,17 +221,5 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { } func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { - got.Each(t, func(t *testing.T) SnowflakeSchemaDeltaTestSuite { - g := got.New(t) - - g.Parallel() - - suite := setupSchemaDeltaSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite)) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 2d327458af..632c550a09 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -17,38 +18,36 @@ import ( "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/ysmood/got" ) const sqlserverSuffix = "sqlserver" type PeerFlowE2ETestSuiteSQLServer struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool sqlsHelper *SQLServerHelper } func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteSQLServer)) + got.Each(t, e2eshared.GotSuite(setupSuite)) } // setup sql server connection -func (s *PeerFlowE2ETestSuiteSQLServer) setupSQLServer() { +func setupSQLServer(t *testing.T) *SQLServerHelper { env := os.Getenv("ENABLE_SQLSERVER_TESTS") if env != "true" { - s.sqlsHelper = nil - return + return nil } sqlsHelper, err := NewSQLServerHelper("test_sqlserver_peer") - require.NoError(s.T(), err) - s.sqlsHelper = sqlsHelper + require.NoError(t, err) + return sqlsHelper } -func (s *PeerFlowE2ETestSuiteSQLServer) SetupSuite() { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSQLServer { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -58,35 +57,38 @@ func (s *PeerFlowE2ETestSuiteSQLServer) SetupSuite() { pool, err := e2e.SetupPostgres(sqlserverSuffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + require.Fail(t, "failed to setup postgres", err) } - s.pool = pool - s.setupSQLServer() + return PeerFlowE2ETestSuiteSQLServer{ + G: g, + t: t, + pool: pool, + sqlsHelper: setupSQLServer(t), + } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteSQLServer) TearDownSuite() { +func (s PeerFlowE2ETestSuiteSQLServer) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, sqlserverSuffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + require.Fail(s.t, "failed to drop Postgres schema", err) } if s.sqlsHelper != nil { err = s.sqlsHelper.CleanUp() if err != nil { - s.Fail("failed to clean up sqlserver", err) + require.Fail(s.t, "failed to clean up sqlserver", err) } } } -func (s *PeerFlowE2ETestSuiteSQLServer) setupSQLServerTable(tableName string) { +func (s PeerFlowE2ETestSuiteSQLServer) setupSQLServerTable(tableName string) { schema := getSimpleTableSchema() err := s.sqlsHelper.CreateTable(schema, tableName) - require.NoError(s.T(), err) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName string, numRows int) { +func (s PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName string, numRows int) { schemaQualified := fmt.Sprintf("%s.%s", s.sqlsHelper.SchemaName, tableName) for i := 0; i < numRows; i++ { params := make(map[string]interface{}) @@ -102,20 +104,20 @@ func (s *PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName s params, ) - require.NoError(s.T(), err) + require.NoError(s.t, err) } } -func (s *PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string) { +func (s PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string) { ctx := context.Background() _, err := s.pool.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS e2e_test_%s.%s", sqlserverSuffix, tableName)) - require.NoError(s.T(), err) + require.NoError(s.t, err) _, err = s.pool.Exec(ctx, fmt.Sprintf("CREATE TABLE e2e_test_%s.%s (id TEXT, card_id TEXT, v_from TIMESTAMP, price NUMERIC, status INT)", sqlserverSuffix, tableName)) - require.NoError(s.T(), err) + require.NoError(s.t, err) } func getSimpleTableSchema() *model.QRecordSchema { @@ -130,13 +132,13 @@ func getSimpleTableSchema() *model.QRecordSchema { } } -func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append() { +func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append() { if s.sqlsHelper == nil { - s.T().Skip("Skipping SQL Server test") + s.t.Skip("Skipping SQL Server test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) numRows := 10 tblName := "test_qrep_flow_avro_ss_append" @@ -170,16 +172,16 @@ func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify that the destination table has the same number of rows as the source table var numRowsInDest pgtype.Int8 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s", dstTableName) err = s.pool.QueryRow(context.Background(), countQuery).Scan(&numRowsInDest) - s.NoError(err) + require.NoError(s.t, err) - s.Equal(numRows, int(numRowsInDest.Int64)) + require.Equal(s.t, numRows, int(numRowsInDest.Int64)) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 13ca8044e5..47f9b84bad 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "io" "log/slog" "os" "strings" @@ -24,25 +23,6 @@ import ( "go.temporal.io/sdk/testsuite" ) -// ReadFileToBytes reads a file to a byte array. -func ReadFileToBytes(path string) ([]byte, error) { - var ret []byte - - f, err := os.Open(path) - if err != nil { - return ret, fmt.Errorf("failed to open file: %w", err) - } - - defer f.Close() - - ret, err = io.ReadAll(f) - if err != nil { - return ret, fmt.Errorf("failed to read file: %w", err) - } - - return ret, nil -} - func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *testing.T) { conn, err := utils.GetCatalogConnectionPoolFromEnv() if err != nil { diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go new file mode 100644 index 0000000000..e68b6164a6 --- /dev/null +++ b/flow/e2eshared/e2eshared.go @@ -0,0 +1,43 @@ +package e2eshared + +import ( + "fmt" + "io" + "os" + "testing" + + "github.com/ysmood/got" +) + +func GotSuite[T interface{ TearDownSuite() }](setup func(t *testing.T, g got.G) T) func(t *testing.T) T { + return func(t *testing.T) T { + g := got.New(t) + g.Parallel() + + suite := setup(t, g) + g.Cleanup(func() { + suite.TearDownSuite() + }) + + return suite + } +} + +// ReadFileToBytes reads a file to a byte array. +func ReadFileToBytes(path string) ([]byte, error) { + var ret []byte + + f, err := os.Open(path) + if err != nil { + return ret, fmt.Errorf("failed to open file: %w", err) + } + + defer f.Close() + + ret, err = io.ReadAll(f) + if err != nil { + return ret, fmt.Errorf("failed to read file: %w", err) + } + + return ret, nil +}