From bd1a1a3fed34cc449daafccf19310e8b636df651 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 27 Dec 2023 19:42:55 +0000 Subject: [PATCH] Remove testify/suite from connectors tests Splitting up changes from #871 --- .../connectors/postgres/postgres_repl_test.go | 85 ++++++----- .../postgres/postgres_schema_delta_test.go | 94 ++++++------ .../cdc_records/cdc_records_storage_test.go | 61 ++++---- flow/e2e/bigquery/bigquery_helper.go | 4 +- flow/e2e/bigquery/peer_flow_bq_test.go | 130 +++++++--------- flow/e2e/bigquery/qrep_flow_bq_test.go | 10 +- flow/e2e/s3/s3_helper.go | 4 +- flow/e2e/snowflake/peer_flow_sf_test.go | 144 ++++++++---------- flow/e2e/snowflake/qrep_flow_sf_test.go | 24 +-- flow/e2e/snowflake/snowflake_helper.go | 4 +- flow/e2e/test_utils.go | 36 ++--- flow/e2eshared/e2eshared.go | 39 +++++ 12 files changed, 324 insertions(+), 311 deletions(-) create mode 100644 flow/e2eshared/e2eshared.go diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index df3a7de13f..67661d5b65 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -7,106 +7,121 @@ import ( "testing" "time" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" + "github.com/ysmood/got" ) type PostgresReplicationSnapshotTestSuite struct { - suite.Suite + t *testing.T + connector *PostgresConnector + schema string } -func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() { - var err error - suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ +func setupSuite(t *testing.T) PostgresReplicationSnapshotTestSuite { + t.Helper() + + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }, true) - require.NoError(suite.T(), err) + require.NoError(t, err) - setupTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.T(), err) + setupTx, err := connector.pool.Begin(context.Background()) + require.NoError(t, err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - require.NoError(suite.T(), err) + require.NoError(t, err) } }() - _, err = setupTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_repl_test CASCADE") - require.NoError(suite.T(), err) + schema := "repltest_" + shared.RandomString(8) + + _, err = setupTx.Exec(context.Background(), + fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema)) + require.NoError(t, err) - _, err = setupTx.Exec(context.Background(), "CREATE SCHEMA pgpeer_repl_test") - require.NoError(suite.T(), err) + _, err = setupTx.Exec(context.Background(), + fmt.Sprintf("CREATE SCHEMA %s", schema)) + require.NoError(t, err) - // setup 3 tables in pgpeer_repl_test schema - // test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5 + // setup 3 tables test_1, test_2, test_3 + // all have 5 text columns c1, c2, c3, c4, c5 tables := []string{"test_1", "test_2", "test_3"} for _, table := range tables { _, err = setupTx.Exec(context.Background(), - fmt.Sprintf("CREATE TABLE pgpeer_repl_test.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", table)) - require.NoError(suite.T(), err) + fmt.Sprintf("CREATE TABLE %s.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", schema, table)) + require.NoError(t, err) } err = setupTx.Commit(context.Background()) - require.NoError(suite.T(), err) + require.NoError(t, err) + + return PostgresReplicationSnapshotTestSuite{ + t: t, + connector: connector, + schema: schema, + } } -func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() { +func (suite PostgresReplicationSnapshotTestSuite) TearDownSuite() { teardownTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) defer func() { err := teardownTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - require.NoError(suite.T(), err) + require.NoError(suite.t, err) } }() - _, err = teardownTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_test CASCADE") - require.NoError(suite.T(), err) + _, err = teardownTx.Exec(context.Background(), + fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema)) + require.NoError(suite.t, err) // Fetch all the publications rows, err := teardownTx.Query(context.Background(), "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) // Iterate over the publications and drop them for rows.Next() { var pubname pgtype.Text err := rows.Scan(&pubname) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) // Drop the publication in a new transaction _, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String)) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) } _, err = teardownTx.Exec(context.Background(), "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) err = teardownTx.Commit(context.Background()) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) - suite.True(suite.connector.ConnectionActive() == nil) + require.True(suite.t, suite.connector.ConnectionActive() == nil) err = suite.connector.Close() - require.NoError(suite.T(), err) + require.NoError(suite.t, err) - suite.False(suite.connector.ConnectionActive() == nil) + require.False(suite.t, suite.connector.ConnectionActive() == nil) } -func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { +func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { tables := map[string]string{ - "pgpeer_repl_test.test_1": "test_1_dst", + suite.schema + ".test_1": "test_1_dst", } flowJobName := "test_simple_slot_creation" @@ -121,7 +136,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { // Moved to a go routine go func() { err := suite.connector.SetupReplication(signal, setupReplicationInput) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) }() slog.Info("waiting for slot creation to complete", flowLog) @@ -136,5 +151,5 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { } func TestPostgresReplTestSuite(t *testing.T) { - suite.Run(t, new(PostgresReplicationSnapshotTestSuite)) + got.Each(t, e2eshared.GotSuite(setupSuite)) } diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 8a919eb214..6b3f1132e1 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -5,79 +5,81 @@ import ( "fmt" "testing" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pgx/v5" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/require" + "github.com/ysmood/got" ) type PostgresSchemaDeltaTestSuite struct { - suite.Suite + t *testing.T + connector *PostgresConnector } const schemaDeltaTestSchemaName = "pgschema_delta_test" -func (suite *PostgresSchemaDeltaTestSuite) failTestError(err error) { - if err != nil { - suite.FailNow(err.Error()) - } -} +func setupSchemaDeltaSuite(t *testing.T) PostgresSchemaDeltaTestSuite { + t.Helper() -func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() { - var err error - suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }, false) - suite.failTestError(err) + require.NoError(t, err) - setupTx, err := suite.connector.pool.Begin(context.Background()) - suite.failTestError(err) + setupTx, err := connector.pool.Begin(context.Background()) + require.NoError(t, err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - suite.failTestError(err) + require.NoError(t, err) } }() _, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(t, err) _, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(t, err) err = setupTx.Commit(context.Background()) - suite.failTestError(err) + require.NoError(t, err) + return PostgresSchemaDeltaTestSuite{ + t: t, + connector: connector, + } } -func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() { +func (suite PostgresSchemaDeltaTestSuite) TearDownSuite() { teardownTx, err := suite.connector.pool.Begin(context.Background()) - suite.failTestError(err) + require.NoError(suite.t, err) defer func() { err := teardownTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - suite.failTestError(err) + require.NoError(suite.t, err) } }() _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(suite.t, err) err = teardownTx.Commit(context.Background()) - suite.failTestError(err) + require.NoError(suite.t, err) - suite.True(suite.connector.ConnectionActive() == nil) + require.True(suite.t, suite.connector.ConnectionActive() == nil) err = suite.connector.Close() - suite.failTestError(err) - suite.False(suite.connector.ConnectionActive() == nil) + require.NoError(suite.t, err) + require.False(suite.t, suite.connector.ConnectionActive() == nil) } -func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { +func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.simple_add_column", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, @@ -87,13 +89,13 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { ColumnType: string(qvalue.QValueKindInt64), }}, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ + require.NoError(suite.t, err) + require.Equal(suite.t, &protos.TableSchema{ TableIdentifier: tableName, Columns: map[string]string{ "id": string(qvalue.QValueKindInt32), @@ -103,11 +105,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { }, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { +func (suite PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.add_drop_all_column_types", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -148,20 +150,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(suite.t, err) + require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { +func (suite PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -194,20 +196,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(suite.t, err) + require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { +func (suite PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -234,15 +236,15 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(suite.t, err) + require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { - suite.Run(t, new(PostgresSchemaDeltaTestSuite)) + got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite)) } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 7d9b8e429c..13aff7f00c 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -2,22 +2,19 @@ package cdc_records import ( "crypto/rand" + "testing" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/stretchr/testify/require" ) -type CDCRecordStorageTestSuite struct { - suite.Suite - testsuite.WorkflowTestSuite -} +func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record) { + t.Helper() -func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.Record) { - pkeyColVal := make([]byte, 0, 32) + pkeyColVal := make([]byte, 32) _, err := rand.Read(pkeyColVal) - s.NoError(err) + require.NoError(t, err) key := model.TableWithPkey{ TableName: "test_src_tbl", @@ -39,50 +36,52 @@ func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.R return key, rec } -func (s *CDCRecordStorageTestSuite) TestSingleRecord() { +func TestSingleRecord(t *testing.T) { + t.Parallel() cdcRecordsStore := NewCDCRecordsStore("test_single_record") cdcRecordsStore.numRecordsSwitchThreshold = 10 - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) + require.NoError(t, err) // should not spill into DB - s.Equal(1, len(cdcRecordsStore.inMemoryRecords)) - s.Nil(cdcRecordsStore.pebbleDB) + require.Equal(t, 1, len(cdcRecordsStore.inMemoryRecords)) + require.Nil(t, cdcRecordsStore.pebbleDB) reck, ok, err := cdcRecordsStore.Get(key) - s.NoError(err) - s.True(ok) - s.Equal(rec, reck) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, reck) - s.NoError(cdcRecordsStore.Close()) + require.NoError(t, cdcRecordsStore.Close()) } -func (s *CDCRecordStorageTestSuite) TestRecordsTillSpill() { +func TestRecordsTillSpill(t *testing.T) { + t.Parallel() cdcRecordsStore := NewCDCRecordsStore("test_records_till_spill") cdcRecordsStore.numRecordsSwitchThreshold = 10 // add records upto set limit for i := 0; i < 10; i++ { - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) - s.Equal(i+1, len(cdcRecordsStore.inMemoryRecords)) - s.Nil(cdcRecordsStore.pebbleDB) + require.NoError(t, err) + require.Equal(t, i+1, len(cdcRecordsStore.inMemoryRecords)) + require.Nil(t, cdcRecordsStore.pebbleDB) } // this record should be spilled to DB - key, rec := s.genKeyAndRec() + key, rec := genKeyAndRec(t) err := cdcRecordsStore.Set(key, rec) - s.NoError(err) + require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] - s.False(ok) - s.NotNil(cdcRecordsStore.pebbleDB) + require.False(t, ok) + require.NotNil(t, cdcRecordsStore.pebbleDB) reck, ok, err := cdcRecordsStore.Get(key) - s.NoError(err) - s.True(ok) - s.Equal(rec, reck) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, rec, reck) - s.NoError(cdcRecordsStore.Close()) + require.NoError(t, cdcRecordsStore.Close()) } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 40a42ea64f..0cf697020b 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -12,7 +12,7 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" peer_bq "github.com/PeerDB-io/peer-flow/connectors/bigquery" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -46,7 +46,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) { return nil, fmt.Errorf("TEST_BQ_CREDS env var not set") } - content, err := e2e.ReadFileToBytes(jsonPath) + content, err := e2eshared.ReadFileToBytes(jsonPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 4d47d2ff96..3b738d5003 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -21,7 +22,6 @@ import ( ) type PeerFlowE2ETestSuiteBQ struct { - got.G t *testing.T bqSuffix string @@ -30,20 +30,7 @@ type PeerFlowE2ETestSuiteBQ struct { } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteBQ { - t.Helper() - - g := got.New(t) - g.Parallel() - - suite := setupSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(setupSuite)) } func (s PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { @@ -118,7 +105,7 @@ func setupBigQuery(t *testing.T) *BigQueryTestHelper { } // Implement SetupAllSuite interface to setup the test suite -func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { +func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { t.Helper() err := godotenv.Load() @@ -140,7 +127,6 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { bq := setupBigQuery(t) return PeerFlowE2ETestSuiteBQ{ - G: g, t: t, bqSuffix: bqSuffix, pool: pool, @@ -148,23 +134,22 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { } } -// Implement TearDownAllSuite interface to tear down the test suite -func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { +func (s PeerFlowE2ETestSuiteBQ) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { slog.Error("failed to tear down postgres", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } err = s.bqHelper.DropDataset(s.bqHelper.datasetName) if err != nil { slog.Error("failed to tear down bigquery", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } } func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. @@ -176,7 +161,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) // Verify workflow completes - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() // assert that error contains "invalid connection configs" @@ -186,7 +171,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { } func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_no_data") @@ -220,7 +205,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -230,7 +215,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_char_coltype") @@ -264,7 +249,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -277,7 +262,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") @@ -327,7 +312,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -335,16 +320,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { count, err := s.bqHelper.countRows(dstTableName) require.NoError(s.t, err) - s.Equal(10, count) + require.Equal(s.t, 10, count) - // TODO: verify that the data is correctly synced to the destination table - // on the bigquery side + // TODO: verify data is correctly synced to destination table on bigquery side env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_1") @@ -401,7 +385,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -412,7 +396,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_2") @@ -464,7 +448,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -476,7 +460,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_3") @@ -539,7 +523,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -550,7 +534,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_4") @@ -606,7 +590,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -617,7 +601,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_5") @@ -673,7 +657,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -684,7 +668,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_types_bq") @@ -741,7 +725,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -757,13 +741,13 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { fmt.Println("error %w", err) } // Make sure that there are no nulls - s.True(noNulls) + require.True(s.t, noNulls) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -817,15 +801,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { count2, err := s.bqHelper.countRows(dstTable2Name) require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) env.AssertExpectations(s.t) } // TODO: not checking schema exactly, add later func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -915,7 +899,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -925,7 +909,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -986,7 +970,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -998,7 +982,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1062,7 +1046,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1075,7 +1059,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1135,7 +1119,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1148,7 +1132,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") @@ -1198,7 +1182,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1211,7 +1195,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -1269,8 +1253,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name) require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) err = s.bqHelper.DropDataset(secondDataset) require.NoError(s.t, err) @@ -1279,7 +1263,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1349,7 +1333,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1363,11 +1347,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, 1, numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1435,7 +1419,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1447,11 +1431,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, 1, numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1523,7 +1507,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1535,11 +1519,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, 1, numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") @@ -1599,7 +1583,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1611,5 +1595,5 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(0, numNewRows) + require.Equal(s.t, 0, numNewRows) } diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index e6d3ac9f5b..d26f028cba 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -43,11 +43,11 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) require.NoError(s.t, err) - s.True(pgRows.Equals(bqRows)) + require.True(s.t, pgRows.Equals(bqRows)) } func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -71,7 +71,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -82,7 +82,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { } func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -105,7 +105,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 249745dd3c..7c2117d538 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -9,7 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/connectors/utils" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" @@ -34,7 +34,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { bucketName = "peerdb_staging" } - content, err := e2e.ReadFileToBytes(credsPath) + content, err := e2eshared.ReadFileToBytes(credsPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index f979198635..c524a047b0 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -13,6 +13,7 @@ import ( connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" @@ -23,7 +24,6 @@ import ( ) type PeerFlowE2ETestSuiteSF struct { - got.G t *testing.T pgSuffix string @@ -33,20 +33,7 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteSF { - t.Helper() - - g := got.New(t) - g.Parallel() - - suite := SetupSuite(t, g) - - g.Cleanup(func() { - suite.tearDownSuite() - }) - - return suite - }) + got.Each(t, e2eshared.GotSuite(SetupSuite)) } func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { @@ -57,7 +44,7 @@ func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.pgSuffix) } -func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { t.Helper() err := godotenv.Load() @@ -74,13 +61,13 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { pool, err := e2e.SetupPostgres(pgSuffix) if err != nil || pool == nil { slog.Error("failed to setup Postgres", slog.Any("error", err)) - g.FailNow() + t.FailNow() } sfHelper, err := NewSnowflakeTestHelper() if err != nil { slog.Error("failed to setup Snowflake", slog.Any("error", err)) - g.FailNow() + t.FailNow() } connector, err := connsnowflake.NewSnowflakeConnector( @@ -90,7 +77,6 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { require.NoError(t, err) suite := PeerFlowE2ETestSuiteSF{ - G: g, t: t, pgSuffix: pgSuffix, pool: pool, @@ -101,19 +87,18 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { return suite } -// Implement TearDownAllSuite interface to tear down the test suite -func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { +func (s PeerFlowE2ETestSuiteSF) TearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { slog.Error("failed to tear down Postgres", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } if s.sfHelper != nil { err = s.sfHelper.Cleanup() if err != nil { slog.Error("failed to tear down Snowflake", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } } @@ -121,12 +106,12 @@ func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { if err != nil { slog.Error("failed to close Snowflake connector", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } } func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") @@ -175,7 +160,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -183,7 +168,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { count, err := s.sfHelper.CountRows("test_simple_flow_sf") require.NoError(s.t, err) - s.Equal(20, count) + require.Equal(s.t, 20, count) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago // it should match the count. @@ -192,7 +177,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { `, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(20, numNewRows) + require.Equal(s.t, 20, numNewRows) // TODO: verify that the data is correctly synced to the destination table // on the Snowflake side @@ -201,7 +186,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_replica_identity_no_pkey") @@ -253,7 +238,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -261,13 +246,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { count, err := s.sfHelper.CountRows("test_replica_identity_no_pkey") require.NoError(s.t, err) - s.Equal(20, count) + require.Equal(s.t, 20, count) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") @@ -329,7 +314,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -339,11 +324,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { // They should have been filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") require.NoError(s.t, err) - s.Equal(6, lineCount) + require.Equal(s.t, 6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") require.NoError(s.t, err) - s.Equal(6, polyCount) + require.Equal(s.t, 6, polyCount) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side @@ -352,7 +337,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_1") @@ -408,7 +393,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -419,7 +404,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_2") @@ -466,7 +451,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { if err != nil { slog.Error("Error executing transaction", slog.Any("error", err)) - s.FailNow() } wg.Done() @@ -475,7 +459,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -488,7 +472,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_3") @@ -550,7 +534,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -561,7 +545,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_4") @@ -616,7 +600,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -627,7 +611,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_5") @@ -682,7 +666,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -693,7 +677,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_types_sf") @@ -750,7 +734,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -766,13 +750,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { fmt.Println("error %w", err) } // Make sure that there are no nulls - s.Equal(noNulls, true) + require.True(s.t, noNulls) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_sf") @@ -816,7 +800,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") @@ -824,14 +808,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { count2, err := s.sfHelper.CountRows("test2_sf") require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -885,7 +869,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. @@ -914,7 +898,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. @@ -944,7 +928,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. @@ -974,14 +958,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -991,7 +975,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -1051,7 +1035,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1064,7 +1048,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1127,7 +1111,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1140,7 +1124,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1199,7 +1183,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1212,7 +1196,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_exclude_sf") @@ -1278,7 +1262,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1290,12 +1274,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { for _, field := range sfRows.Schema.Fields { require.NotEqual(s.t, field.Name, "c2") } - s.Equal(5, len(sfRows.Schema.Fields)) - s.Equal(10, len(sfRows.Records)) + require.Equal(s.t, 5, len(sfRows.Schema.Fields)) + require.Equal(s.t, 10, len(sfRows.Records)) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1365,7 +1349,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1382,7 +1366,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1450,7 +1434,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1461,11 +1445,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(1, numNewRows) + require.Equal(s.t, 1, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1537,7 +1521,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1548,11 +1532,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(1, numNewRows) + require.Equal(s.t, 1, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") @@ -1612,7 +1596,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1623,5 +1607,5 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(0, numNewRows) + require.Equal(s.t, 0, numNewRows) } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 4499f7f437..3916795e60 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -57,7 +57,7 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selecto } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -86,7 +86,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -98,7 +98,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -131,7 +131,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -143,7 +143,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -173,7 +173,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -185,7 +185,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -219,7 +219,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { e2e.RunXminFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -231,7 +231,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -265,7 +265,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -277,7 +277,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() } func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -309,7 +309,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index 0401d34f58..8dd9bfa60a 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -9,7 +9,7 @@ import ( "time" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" - "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -37,7 +37,7 @@ func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) { return nil, fmt.Errorf("TEST_SF_CREDS env var not set") } - content, err := e2e.ReadFileToBytes(jsonPath) + content, err := e2eshared.ReadFileToBytes(jsonPath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 83f4760acd..d037654efd 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -4,9 +4,7 @@ import ( "context" "encoding/json" "fmt" - "io" "log/slog" - "os" "strings" "testing" "time" @@ -24,25 +22,6 @@ import ( "go.temporal.io/sdk/testsuite" ) -// ReadFileToBytes reads a file to a byte array. -func ReadFileToBytes(path string) ([]byte, error) { - var ret []byte - - f, err := os.Open(path) - if err != nil { - return ret, fmt.Errorf("failed to open file: %w", err) - } - - defer f.Close() - - ret, err = io.ReadAll(f) - if err != nil { - return ret, fmt.Errorf("failed to read file: %w", err) - } - - return ret, nil -} - func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnvironment) { t.Helper() @@ -390,12 +369,23 @@ func GetOwnersSelectorString() string { return strings.Join(fields, ",") } -func NewTemporalTestWorkflowEnvironment() *testsuite.TestWorkflowEnvironment { +type tlogWriter struct { + t *testing.T +} + +func (iw tlogWriter) Write(p []byte) (n int, err error) { + iw.t.Log(string(p)) + return len(p), nil +} + +func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnvironment { + t.Helper() + testSuite := &testsuite.WorkflowTestSuite{} logger := slog.New(logger.NewHandler( slog.NewJSONHandler( - os.Stdout, + tlogWriter{t}, &slog.HandlerOptions{Level: slog.LevelWarn}, ))) tLogger := NewTStructuredLogger(*logger) diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go new file mode 100644 index 0000000000..f53104d33e --- /dev/null +++ b/flow/e2eshared/e2eshared.go @@ -0,0 +1,39 @@ +package e2eshared + +import ( + "fmt" + "io" + "os" + "testing" +) + +func GotSuite[T interface{ TearDownSuite() }](setup func(t *testing.T) T) func(t *testing.T) T { + return func(t *testing.T) T { + t.Helper() + t.Parallel() + suite := setup(t) + t.Cleanup(func() { + suite.TearDownSuite() + }) + return suite + } +} + +// ReadFileToBytes reads a file to a byte array. +func ReadFileToBytes(path string) ([]byte, error) { + var ret []byte + + f, err := os.Open(path) + if err != nil { + return ret, fmt.Errorf("failed to open file: %w", err) + } + + defer f.Close() + + ret, err = io.ReadAll(f) + if err != nil { + return ret, fmt.Errorf("failed to read file: %w", err) + } + + return ret, nil +}