From e552b5db8ed48f21d8e43ec23f4d79bcf8ca3dd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 27 Dec 2023 21:10:12 +0000 Subject: [PATCH] postmerge fixes --- flow/connectors/external_metadata/store.go | 5 - .../connectors/postgres/postgres_repl_test.go | 95 +++++++++---------- .../postgres/postgres_schema_delta_test.go | 45 +++++---- flow/e2e/bigquery/peer_flow_bq_test.go | 37 ++++---- flow/e2e/postgres/peer_flow_pg_test.go | 8 +- flow/e2e/postgres/qrep_flow_pg_test.go | 17 ++-- flow/e2e/s3/qrep_flow_s3_test.go | 31 +++--- flow/e2e/snowflake/peer_flow_sf_test.go | 11 +-- .../snowflake/snowflake_schema_delta_test.go | 5 +- .../e2e/sqlserver/qrep_flow_sqlserver_test.go | 34 +++---- flow/e2e/test_utils.go | 3 +- flow/e2eshared/e2eshared.go | 9 +- 12 files changed, 143 insertions(+), 157 deletions(-) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 05d6797ddf..9fe72828ad 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -104,11 +104,6 @@ func (p *PostgresMetadataStore) NeedsSetupMetadata() bool { return true } -func isUniqueError(err error) bool { - var pgerr *pgconn.PgError - return errors.As(err, &pgerr) && pgerr.Code == pgerrcode.UniqueViolation -} - func (p *PostgresMetadataStore) SetupMetadata() error { // start a transaction tx, err := p.pool.Begin(p.ctx) diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index 4fb1012d4b..b0e990ac2a 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -13,7 +13,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PostgresReplicationSnapshotTestSuite struct { @@ -24,6 +23,8 @@ type PostgresReplicationSnapshotTestSuite struct { } func setupSuite(t *testing.T) PostgresReplicationSnapshotTestSuite { + t.Helper() + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, @@ -71,52 +72,6 @@ func setupSuite(t *testing.T) PostgresReplicationSnapshotTestSuite { } } -func (suite PostgresReplicationSnapshotTestSuite) TearDownSuite() { - teardownTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.t, err) - defer func() { - err := teardownTx.Rollback(context.Background()) - if err != pgx.ErrTxClosed { - require.NoError(suite.t, err) - } - }() - - _, err = teardownTx.Exec(context.Background(), - fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema)) - require.NoError(suite.t, err) - - // Fetch all the publications - rows, err := teardownTx.Query(context.Background(), - "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.t, err) - - // Iterate over the publications and drop them - for rows.Next() { - var pubname pgtype.Text - err := rows.Scan(&pubname) - require.NoError(suite.t, err) - - // Drop the publication in a new transaction - _, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String)) - require.NoError(suite.t, err) - } - - _, err = teardownTx.Exec(context.Background(), - "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", - fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.t, err) - - err = teardownTx.Commit(context.Background()) - require.NoError(suite.t, err) - - require.True(suite.t, suite.connector.ConnectionActive() == nil) - - err = suite.connector.Close() - require.NoError(suite.t, err) - - require.False(suite.t, suite.connector.ConnectionActive() == nil) -} - func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { tables := map[string]string{ suite.schema + ".test_1": "test_1_dst", @@ -149,5 +104,49 @@ func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { } func TestPostgresReplTestSuite(t *testing.T) { - got.Each(t, e2eshared.GotSuite(setupSuite)) + e2eshared.GotSuite(t, setupSuite, func(suite PostgresReplicationSnapshotTestSuite) { + teardownTx, err := suite.connector.pool.Begin(context.Background()) + require.NoError(suite.t, err) + defer func() { + err := teardownTx.Rollback(context.Background()) + if err != pgx.ErrTxClosed { + require.NoError(suite.t, err) + } + }() + + _, err = teardownTx.Exec(context.Background(), + fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema)) + require.NoError(suite.t, err) + + // Fetch all the publications + rows, err := teardownTx.Query(context.Background(), + "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) + require.NoError(suite.t, err) + + // Iterate over the publications and drop them + for rows.Next() { + var pubname pgtype.Text + err := rows.Scan(&pubname) + require.NoError(suite.t, err) + + // Drop the publication in a new transaction + _, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String)) + require.NoError(suite.t, err) + } + + _, err = teardownTx.Exec(context.Background(), + "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", + fmt.Sprintf("%%%s", "test_simple_slot_creation")) + require.NoError(suite.t, err) + + err = teardownTx.Commit(context.Background()) + require.NoError(suite.t, err) + + require.True(suite.t, suite.connector.ConnectionActive() == nil) + + err = suite.connector.Close() + require.NoError(suite.t, err) + + require.False(suite.t, suite.connector.ConnectionActive() == nil) + }) } diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 13e07c3ba6..24fa43dbfc 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -10,7 +10,6 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PostgresSchemaDeltaTestSuite struct { @@ -22,6 +21,8 @@ type PostgresSchemaDeltaTestSuite struct { const schemaDeltaTestSchemaName = "pgschema_delta_test" func setupSchemaDeltaSuite(t *testing.T) PostgresSchemaDeltaTestSuite { + t.Helper() + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, @@ -52,27 +53,6 @@ func setupSchemaDeltaSuite(t *testing.T) PostgresSchemaDeltaTestSuite { } } -func (suite PostgresSchemaDeltaTestSuite) TearDownSuite() { - teardownTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.t, err) - defer func() { - err := teardownTx.Rollback(context.Background()) - if err != pgx.ErrTxClosed { - require.NoError(suite.t, err) - } - }() - _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", - schemaDeltaTestSchemaName)) - require.NoError(suite.t, err) - err = teardownTx.Commit(context.Background()) - require.NoError(suite.t, err) - - require.True(suite.t, suite.connector.ConnectionActive() == nil) - err = suite.connector.Close() - require.NoError(suite.t, err) - require.False(suite.t, suite.connector.ConnectionActive() == nil) -} - func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.simple_add_column", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), @@ -244,5 +224,24 @@ func (suite PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { - got.Each(t, e2eshared.GotSuite(setupSchemaDeltaSuite)) + e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(suite PostgresSchemaDeltaTestSuite) { + teardownTx, err := suite.connector.pool.Begin(context.Background()) + require.NoError(suite.t, err) + defer func() { + err := teardownTx.Rollback(context.Background()) + if err != pgx.ErrTxClosed { + require.NoError(suite.t, err) + } + }() + _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", + schemaDeltaTestSchemaName)) + require.NoError(suite.t, err) + err = teardownTx.Commit(context.Background()) + require.NoError(suite.t, err) + + require.True(suite.t, suite.connector.ConnectionActive() == nil) + err = suite.connector.Close() + require.NoError(suite.t, err) + require.False(suite.t, suite.connector.ConnectionActive() == nil) + }) } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 0b4084e9c5..2bdd14c5b1 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -18,7 +18,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteBQ struct { @@ -34,13 +33,13 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { slog.Error("failed to tear down postgres", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } err = s.bqHelper.DropDataset(s.bqHelper.datasetName) if err != nil { slog.Error("failed to tear down bigquery", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } }) } @@ -148,7 +147,7 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. limits := peerflow.CDCFlowLimits{ @@ -1194,7 +1193,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -1252,8 +1251,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name) require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) err = s.bqHelper.DropDataset(secondDataset) require.NoError(s.t, err) @@ -1262,7 +1261,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1332,7 +1331,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1346,11 +1345,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, int64(1), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1418,7 +1417,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1430,11 +1429,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, int64(1), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1506,7 +1505,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1518,11 +1517,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, int64(1), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") @@ -1582,7 +1581,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1594,5 +1593,5 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(0, numNewRows) + require.Equal(s.t, numNewRows, int64(0)) } diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index a1989a0245..b04ef9e913 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -158,7 +158,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") require.NoError(s.t, err) @@ -187,7 +187,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") require.NoError(s.t, err) @@ -217,7 +217,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") require.NoError(s.t, err) @@ -247,7 +247,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") require.NoError(s.t, err) }() diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 706a13c7fc..ddfae1b535 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -16,7 +16,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuitePG struct { @@ -29,10 +28,17 @@ type PeerFlowE2ETestSuitePG struct { } func TestPeerFlowE2ETestSuitePG(t *testing.T) { - got.Each(t, e2eshared.GotSuite(setupSuite)) + e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuitePG) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + s.t.Fatal("failed to drop Postgres schema", err) + } + }) } func setupSuite(t *testing.T) PeerFlowE2ETestSuitePG { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -66,13 +72,6 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuitePG { } } -func (s PeerFlowE2ETestSuitePG) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, s.suffix) - if err != nil { - s.t.Fatal("failed to drop Postgres schema", err) - } -} - func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) require.NoError(s.t, err) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index b53c85d94e..7dcb05f840 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -14,7 +14,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteS3 struct { @@ -26,7 +25,19 @@ type PeerFlowE2ETestSuiteS3 struct { } func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - got.Each(t, e2eshared.GotSuite(setupSuite)) + e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteS3) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + require.Fail(s.t, "failed to drop Postgres schema", err) + } + + if s.s3Helper != nil { + err = s.s3Helper.CleanUp() + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } + } + }) } func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { @@ -41,6 +52,8 @@ func setupS3(mode string) (*S3TestHelper, error) { } func setupSuite(t *testing.T) PeerFlowE2ETestSuiteS3 { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -67,20 +80,6 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteS3 { } } -func (s PeerFlowE2ETestSuiteS3) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, s.suffix) - if err != nil { - require.Fail(s.t, "failed to drop Postgres schema", err) - } - - if s.s3Helper != nil { - err = s.s3Helper.CleanUp() - if err != nil { - require.Fail(s.t, "failed to clean up s3", err) - } - } -} - func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { s.t.Skip("Skipping S3 test") diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index bf2666baeb..4cfe6dd470 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -9,8 +9,6 @@ import ( "testing" "time" - "github.com/ysmood/got" - connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" @@ -33,18 +31,18 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { + e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteSF) { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { slog.Error("failed to tear down Postgres", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } if s.sfHelper != nil { err = s.sfHelper.Cleanup() if err != nil { slog.Error("failed to tear down Snowflake", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } } @@ -52,7 +50,7 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { if err != nil { slog.Error("failed to close Snowflake connector", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } }) } @@ -449,7 +447,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { if err != nil { slog.Error("Error executing transaction", slog.Any("error", err)) - s.t.FailNow() } wg.Done() diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index ba3426928f..5b46e125dd 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -11,7 +11,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) const schemaDeltaTestSchemaName = "PUBLIC" @@ -206,7 +205,7 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(suite SnowflakeSchemaDeltaTestSuite) { - suite.failTestError(suite.sfTestHelper.Cleanup()) - suite.failTestError(suite.connector.Close()) + require.NoError(suite.t, suite.sfTestHelper.Cleanup()) + require.NoError(suite.t, suite.connector.Close()) }) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index b7513b72a0..32ab5ce47b 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -20,7 +20,6 @@ import ( "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteSQLServer struct { @@ -32,11 +31,24 @@ type PeerFlowE2ETestSuiteSQLServer struct { } func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { - got.Each(t, e2eshared.GotSuite(setupSuite)) + e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteSQLServer) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + require.Fail(s.t, "failed to drop Postgres schema", err) + } + + if s.sqlsHelper != nil { + err = s.sqlsHelper.CleanUp() + if err != nil { + require.Fail(s.t, "failed to clean up sqlserver", err) + } + } + }) } -// setup sql server connection func setupSQLServer(t *testing.T) *SQLServerHelper { + t.Helper() + env := os.Getenv("ENABLE_SQLSERVER_TESTS") if env != "true" { return nil @@ -48,6 +60,8 @@ func setupSQLServer(t *testing.T) *SQLServerHelper { } func setupSuite(t *testing.T) PeerFlowE2ETestSuiteSQLServer { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -69,20 +83,6 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteSQLServer { } } -func (s PeerFlowE2ETestSuiteSQLServer) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, s.suffix) - if err != nil { - require.Fail(s.t, "failed to drop Postgres schema", err) - } - - if s.sqlsHelper != nil { - err = s.sqlsHelper.CleanUp() - if err != nil { - require.Fail(s.t, "failed to clean up sqlserver", err) - } - } -} - func (s PeerFlowE2ETestSuiteSQLServer) setupSQLServerTable(tableName string) { schema := getSimpleTableSchema() err := s.sqlsHelper.CreateTable(schema, tableName) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index d2506ece33..2c4e20a0e1 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -379,7 +379,7 @@ func (iw tlogWriter) Write(p []byte) (n int, err error) { } func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnvironment { - testSuite := &testsuite.WorkflowTestSuite{} + t.Helper() logger := slog.New(logger.NewHandler( slog.NewJSONHandler( @@ -388,6 +388,7 @@ func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnv ))) tLogger := NewTStructuredLogger(*logger) + testSuite := &testsuite.WorkflowTestSuite{} testSuite.SetLogger(tLogger) return testSuite.NewTestWorkflowEnvironment() } diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 56b9ffc985..64454c364e 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -9,15 +9,14 @@ import ( "github.com/ysmood/got" ) -func GotSuite[T any](t *testing.T, setup func(t *testing.T, g got.G) T, teardown func(T)) { +func GotSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) { t.Helper() got.Each(t, func(t *testing.T) T { t.Helper() - g := got.New(t) - g.Parallel() - suite := setup(t, g) - g.Cleanup(func() { + t.Parallel() + suite := setup(t) + t.Cleanup(func() { teardown(suite) }) return suite