From 7f61ec00fe83f92261addc4b6b61dad16b30fd6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 1 Jan 2024 01:17:35 +0000 Subject: [PATCH] e2e/s3: remove use of testify/suite Remove setupS3, calling it after setup leaked initial s3Helper, instead test creates helper & defers cleanup within test using --- flow/e2e/s3/cdc_s3_test.go | 90 +++++++++++------------ flow/e2e/s3/qrep_flow_s3_test.go | 120 +++++++++++++++---------------- 2 files changed, 101 insertions(+), 109 deletions(-) diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index f28161b97b..b469e9ec12 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -10,22 +10,17 @@ import ( "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName) +func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) } -func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s3Suffix) +func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.suffix) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) - - setupErr := s.setupS3("s3") - if setupErr != nil { - s.Fail("failed to setup S3", setupErr) - } +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") @@ -37,7 +32,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -46,7 +41,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -56,7 +51,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -64,9 +59,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -77,48 +72,55 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - s.T().Logf("JobName: %s", flowJobName) + s.t.Logf("JobName: %s", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.T().Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) - require.NoError(s.T(), err) + s.t.Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) - setupErr := s.setupS3("gcs") - if setupErr != nil { - s.Fail("failed to setup S3", setupErr) +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + helper, err := NewS3TestHelper(true) + if err != nil { + require.Fail(s.t, "failed to setup S3", err) } + defer func() { + err = s.s3Helper.CleanUp() + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } + }() srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") flowJobName := s.attachSuffix("test_simple_flow_gcs_interop") - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s ( id SERIAL PRIMARY KEY, key TEXT NOT NULL, value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, PostgresPort: e2e.PostgresPort, - Destination: s.s3Helper.GetPeer(), + Destination: helper.GetPeer(), } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -128,7 +130,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -136,10 +138,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } - s.T().Log("Inserted 20 rows into the source table") - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -149,17 +150,16 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - s.T().Logf("JobName: %s", flowJobName) - files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.T().Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) - require.NoError(s.T(), err) + s.t.Logf("JobName: %s", flowJobName) + files, err := helper.ListAllFiles(ctx, flowJobName) + s.t.Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 4fc4f7bf78..c61a6b5a6c 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -4,53 +4,52 @@ import ( "context" "fmt" "log/slog" + "strings" "testing" "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/ysmood/got" ) -const s3Suffix = "s3" - type PeerFlowE2ETestSuiteS3 struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool s3Helper *S3TestHelper + suffix string } func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteS3)) -} + e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteS3) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + require.Fail(s.t, "failed to drop Postgres schema", err) + } -func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, s3Suffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, s3Suffix, tableName, rowCount) - s.NoError(err) + err = s.s3Helper.CleanUp() + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } + }) } -func (s *PeerFlowE2ETestSuiteS3) setupS3(mode string) error { - switchToGCS := false - if mode == "gcs" { - switchToGCS = true - } - helper, err := NewS3TestHelper(switchToGCS) - if err != nil { - return err - } - - s.s3Helper = helper - return nil +func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) + require.NoError(s.t, err) + err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { +func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -58,43 +57,36 @@ func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { slog.Info("Unable to load .env file, using default values from env") } - pool, err := e2e.SetupPostgres(s3Suffix) + suffix := "s3_" + strings.ToLower(shared.RandomString(8)) + pool, err := e2e.SetupPostgres(suffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + require.Fail(t, "failed to setup postgres", err) } - s.pool = pool - err = s.setupS3("s3") + helper, err := NewS3TestHelper(false) if err != nil { - s.Fail("failed to setup S3", err) + require.Fail(t, "failed to setup S3", err) } -} -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteS3) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, s3Suffix) - if err != nil { - s.Fail("failed to drop Postgres schema", err) - } - - if s.s3Helper != nil { - err = s.s3Helper.CleanUp() - if err != nil { - s.Fail("failed to clean up s3", err) - } + return PeerFlowE2ETestSuiteS3{ + G: g, + t: t, + pool: pool, + s3Helper: helper, + suffix: suffix, } } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) jobName := "test_complete_flow_s3" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", @@ -109,7 +101,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -118,7 +110,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -127,23 +119,23 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 1, len(files)) + require.Equal(s.t, 1, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) jobName := "test_complete_flow_s3_ctid" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) @@ -157,7 +149,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url qrepConfig.NumRowsPerPartition = 2000 qrepConfig.InitialCopyOnly = true @@ -169,7 +161,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -178,9 +170,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 10, len(files)) + require.Equal(s.t, 10, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) }