From 73ecf16a14ec2b0368ac1f0f2450382a77cd7548 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 1 Jan 2024 01:17:35 +0000 Subject: [PATCH 1/4] e2e/s3: remove use of testify/suite Remove setupS3, calling it after setup leaked initial s3Helper, instead test creates helper & defers cleanup within test using --- flow/e2e/s3/cdc_s3_test.go | 91 ++++++++++++----------- flow/e2e/s3/qrep_flow_s3_test.go | 120 +++++++++++++++---------------- 2 files changed, 101 insertions(+), 110 deletions(-) diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index f28161b97b..e3ced5d469 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -10,22 +10,17 @@ import ( "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName) +func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) } -func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s3Suffix) +func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.suffix) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) - - setupErr := s.setupS3("s3") - if setupErr != nil { - s.Fail("failed to setup S3", setupErr) - } +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") @@ -37,7 +32,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -46,7 +41,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -56,7 +51,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -64,9 +59,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -76,49 +71,55 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - s.T().Logf("JobName: %s", flowJobName) + s.t.Logf("JobName: %s", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.T().Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) - require.NoError(s.T(), err) + s.t.Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) - setupErr := s.setupS3("gcs") - if setupErr != nil { - s.Fail("failed to setup S3", setupErr) +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + helper, err := NewS3TestHelper(true) + if err != nil { + require.Fail(s.t, "failed to setup S3", err) } + defer func() { + err = s.s3Helper.CleanUp() + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } + }() srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") flowJobName := s.attachSuffix("test_simple_flow_gcs_interop") - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, - Destination: s.s3Helper.GetPeer(), + Destination: helper.GetPeer(), } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -128,7 +129,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -136,10 +137,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } - s.T().Log("Inserted 20 rows into the source table") - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -149,17 +149,16 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - s.T().Logf("JobName: %s", flowJobName) - files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.T().Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) - require.NoError(s.T(), err) + s.t.Logf("JobName: %s", flowJobName) + files, err := helper.ListAllFiles(ctx, flowJobName) + s.t.Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 4fc4f7bf78..c61a6b5a6c 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -4,53 +4,52 @@ import ( "context" "fmt" "log/slog" + "strings" "testing" "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/ysmood/got" ) -const s3Suffix = "s3" - type PeerFlowE2ETestSuiteS3 struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool s3Helper *S3TestHelper + suffix string } func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteS3)) -} + e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteS3) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + require.Fail(s.t, "failed to drop Postgres schema", err) + } -func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, s3Suffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, s3Suffix, tableName, rowCount) - s.NoError(err) + err = s.s3Helper.CleanUp() + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } + }) } -func (s *PeerFlowE2ETestSuiteS3) setupS3(mode string) error { - switchToGCS := false - if mode == "gcs" { - switchToGCS = true - } - helper, err := NewS3TestHelper(switchToGCS) - if err != nil { - return err - } - - s.s3Helper = helper - return nil +func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) + require.NoError(s.t, err) + err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { +func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -58,43 +57,36 @@ func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { slog.Info("Unable to load .env file, using default values from env") } - pool, err := e2e.SetupPostgres(s3Suffix) + suffix := "s3_" + strings.ToLower(shared.RandomString(8)) + pool, err := e2e.SetupPostgres(suffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + require.Fail(t, "failed to setup postgres", err) } - s.pool = pool - err = s.setupS3("s3") + helper, err := NewS3TestHelper(false) if err != nil { - s.Fail("failed to setup S3", err) + require.Fail(t, "failed to setup S3", err) } -} -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteS3) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, s3Suffix) - if err != nil { - s.Fail("failed to drop Postgres schema", err) - } - - if s.s3Helper != nil { - err = s.s3Helper.CleanUp() - if err != nil { - s.Fail("failed to clean up s3", err) - } + return PeerFlowE2ETestSuiteS3{ + G: g, + t: t, + pool: pool, + s3Helper: helper, + suffix: suffix, } } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) jobName := "test_complete_flow_s3" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", @@ -109,7 +101,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -118,7 +110,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -127,23 +119,23 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 1, len(files)) + require.Equal(s.t, 1, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) jobName := "test_complete_flow_s3_ctid" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) @@ -157,7 +149,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url qrepConfig.NumRowsPerPartition = 2000 qrepConfig.InitialCopyOnly = true @@ -169,7 +161,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -178,9 +170,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 10, len(files)) + require.Equal(s.t, 10, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } From 1fcc2a5ce333f2e5f63ed99f129ae84e07564ee9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 1 Jan 2024 01:50:19 +0000 Subject: [PATCH 2/4] randomize prefix, don't trust system clock to've increased --- flow/e2e/s3/s3_helper.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 7c2117d538..ef9020e278 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -11,6 +11,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" ) @@ -52,7 +53,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { if err != nil { return nil, err } - prefix := fmt.Sprintf("peerdb_test/%d", time.Now().UnixNano()) + prefix := fmt.Sprintf("peerdb_test/%d_%s", time.Now().Unix(), shared.RandomString(6)) return &S3TestHelper{ client, &protos.S3Config{ From 07d7bb9b0579807fd23da1c004389862a5227042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 2 Jan 2024 16:18:47 +0000 Subject: [PATCH 3/4] raise go test parallelism to 24 --- .github/workflows/flow.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 156105ed91..777b23039c 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -101,7 +101,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 16 ./... -timeout 1200s + gotestsum --format testname -- -p 24 ./... -timeout 1200s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} From dbb036eca2245c598c4fe867985a5a1ee552828d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 2 Jan 2024 16:27:27 +0000 Subject: [PATCH 4/4] s3: run suite twice, once with s3, once with gcs --- flow/e2e/s3/cdc_s3_test.go | 78 -------------------------------- flow/e2e/s3/qrep_flow_s3_test.go | 42 +++++++++++------ flow/e2eshared/e2eshared.go | 1 + 3 files changed, 30 insertions(+), 91 deletions(-) diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index e3ced5d469..adf84c7b9b 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -84,81 +84,3 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { env.AssertExpectations(s.t) } - -func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { - env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.t, env) - - helper, err := NewS3TestHelper(true) - if err != nil { - require.Fail(s.t, "failed to setup S3", err) - } - defer func() { - err = s.s3Helper.CleanUp() - if err != nil { - require.Fail(s.t, "failed to clean up s3", err) - } - }() - - srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") - dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") - flowJobName := s.attachSuffix("test_simple_flow_gcs_interop") - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - require.NoError(s.t, err) - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: flowJobName, - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: helper.GetPeer(), - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) - - limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - ExitAfterRecords: 20, - MaxBatchSize: 5, - } - - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - require.NoError(s.t, err) - // insert 20 rows - for i := 1; i <= 20; i++ { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - require.NoError(s.t, err) - } - require.NoError(s.t, err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - require.Contains(s.t, err.Error(), "continue as new") - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - s.t.Logf("JobName: %s", flowJobName) - files, err := helper.ListAllFiles(ctx, flowJobName) - s.t.Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) - require.NoError(s.t, err) - - require.Equal(s.t, 4, len(files)) - - env.AssertExpectations(s.t) -} diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index c61a6b5a6c..56f5962232 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -26,18 +26,24 @@ type PeerFlowE2ETestSuiteS3 struct { suffix string } +func tearDownSuite(s PeerFlowE2ETestSuiteS3) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + require.Fail(s.t, "failed to drop Postgres schema", err) + } + + err = s.s3Helper.CleanUp() + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } +} + func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteS3) { - err := e2e.TearDownPostgres(s.pool, s.suffix) - if err != nil { - require.Fail(s.t, "failed to drop Postgres schema", err) - } - - err = s.s3Helper.CleanUp() - if err != nil { - require.Fail(s.t, "failed to clean up s3", err) - } - }) + e2eshared.GotSuite(t, SetupSuiteS3, tearDownSuite) +} + +func TestPeerFlowE2ETestSuiteGCS(t *testing.T) { + e2eshared.GotSuite(t, SetupSuiteGCS, tearDownSuite) } func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { @@ -47,7 +53,7 @@ func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) require.NoError(s.t, err) } -func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { +func setupSuite(t *testing.T, g got.G, gcs bool) PeerFlowE2ETestSuiteS3 { t.Helper() err := godotenv.Load() @@ -63,7 +69,7 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { require.Fail(t, "failed to setup postgres", err) } - helper, err := NewS3TestHelper(false) + helper, err := NewS3TestHelper(gcs) if err != nil { require.Fail(t, "failed to setup S3", err) } @@ -77,6 +83,16 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { } } +func SetupSuiteS3(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { + t.Helper() + return setupSuite(t, g, false) +} + +func SetupSuiteGCS(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { + t.Helper() + return setupSuite(t, g, true) +} + func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { s.t.Skip("Skipping S3 test") diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index e235536ade..150ef30459 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -13,6 +13,7 @@ import ( func GotSuite[T any](t *testing.T, setup func(t *testing.T, g got.G) T, teardown func(T)) { t.Helper() + t.Parallel() got.Each(t, func(t *testing.T) T { t.Helper()