diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 156105ed9..777b23039 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -101,7 +101,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 16 ./... -timeout 1200s + gotestsum --format testname -- -p 24 ./... -timeout 1200s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index f28161b97..adf84c7b9 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -10,22 +10,17 @@ import ( "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName) +func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) } -func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s3Suffix) +func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.suffix) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) - - setupErr := s.setupS3("s3") - if setupErr != nil { - s.Fail("failed to setup S3", setupErr) - } +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") @@ -37,79 +32,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) - connectionGen := e2e.FlowConnectionGenerationConfig{ - FlowJobName: flowJobName, - TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, - Destination: s.s3Helper.GetPeer(), - } - - flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) - - limits := peerflow.CDCFlowLimits{ - TotalSyncFlows: 4, - ExitAfterRecords: 20, - MaxBatchSize: 5, - } - - go func() { - e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) - // insert 20 rows - for i := 1; i <= 20; i++ { - testKey := fmt.Sprintf("test_key_%d", i) - testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - s.NoError(err) - } - s.NoError(err) - }() - - env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) - - // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) - err = env.GetWorkflowError() - - // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - s.T().Logf("JobName: %s", flowJobName) - files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.T().Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) - require.NoError(s.T(), err) - - require.Equal(s.T(), 4, len(files)) - - env.AssertExpectations(s.T()) -} - -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) - setupErr := s.setupS3("gcs") - if setupErr != nil { - s.Fail("failed to setup S3", setupErr) - } - - srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") - dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") - flowJobName := s.attachSuffix("test_simple_flow_gcs_interop") - _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` - CREATE TABLE %s ( - id SERIAL PRIMARY KEY, - key TEXT NOT NULL, - value TEXT NOT NULL - ); - `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -118,7 +41,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -128,7 +51,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) @@ -136,10 +59,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } - s.T().Log("Inserted 20 rows into the source table") - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -149,17 +71,16 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - s.T().Logf("JobName: %s", flowJobName) + s.t.Logf("JobName: %s", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.T().Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) - require.NoError(s.T(), err) + s.t.Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 4fc4f7bf7..56f596223 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -4,53 +4,58 @@ import ( "context" "fmt" "log/slog" + "strings" "testing" "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/ysmood/got" ) -const s3Suffix = "s3" - type PeerFlowE2ETestSuiteS3 struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pool *pgxpool.Pool s3Helper *S3TestHelper + suffix string } -func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteS3)) +func tearDownSuite(s PeerFlowE2ETestSuiteS3) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + require.Fail(s.t, "failed to drop Postgres schema", err) + } + + err = s.s3Helper.CleanUp() + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } } -func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, s3Suffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, s3Suffix, tableName, rowCount) - s.NoError(err) +func TestPeerFlowE2ETestSuiteS3(t *testing.T) { + e2eshared.GotSuite(t, SetupSuiteS3, tearDownSuite) } -func (s *PeerFlowE2ETestSuiteS3) setupS3(mode string) error { - switchToGCS := false - if mode == "gcs" { - switchToGCS = true - } - helper, err := NewS3TestHelper(switchToGCS) - if err != nil { - return err - } +func TestPeerFlowE2ETestSuiteGCS(t *testing.T) { + e2eshared.GotSuite(t, SetupSuiteGCS, tearDownSuite) +} - s.s3Helper = helper - return nil +func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) + require.NoError(s.t, err) + err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { +func setupSuite(t *testing.T, g got.G, gcs bool) PeerFlowE2ETestSuiteS3 { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -58,43 +63,46 @@ func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { slog.Info("Unable to load .env file, using default values from env") } - pool, err := e2e.SetupPostgres(s3Suffix) + suffix := "s3_" + strings.ToLower(shared.RandomString(8)) + pool, err := e2e.SetupPostgres(suffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + require.Fail(t, "failed to setup postgres", err) } - s.pool = pool - err = s.setupS3("s3") + helper, err := NewS3TestHelper(gcs) if err != nil { - s.Fail("failed to setup S3", err) + require.Fail(t, "failed to setup S3", err) } -} -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteS3) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, s3Suffix) - if err != nil { - s.Fail("failed to drop Postgres schema", err) + return PeerFlowE2ETestSuiteS3{ + G: g, + t: t, + pool: pool, + s3Helper: helper, + suffix: suffix, } +} - if s.s3Helper != nil { - err = s.s3Helper.CleanUp() - if err != nil { - s.Fail("failed to clean up s3", err) - } - } +func SetupSuiteS3(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { + t.Helper() + return setupSuite(t, g, false) +} + +func SetupSuiteGCS(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { + t.Helper() + return setupSuite(t, g, true) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) jobName := "test_complete_flow_s3" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", @@ -109,7 +117,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -118,7 +126,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -127,23 +135,23 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 1, len(files)) + require.Equal(s.t, 1, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) jobName := "test_complete_flow_s3_ctid" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) @@ -157,7 +165,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url qrepConfig.NumRowsPerPartition = 2000 qrepConfig.InitialCopyOnly = true @@ -169,7 +177,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -178,9 +186,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 10, len(files)) + require.Equal(s.t, 10, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 7c2117d53..ef9020e27 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -11,6 +11,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" ) @@ -52,7 +53,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { if err != nil { return nil, err } - prefix := fmt.Sprintf("peerdb_test/%d", time.Now().UnixNano()) + prefix := fmt.Sprintf("peerdb_test/%d_%s", time.Now().Unix(), shared.RandomString(6)) return &S3TestHelper{ client, &protos.S3Config{ diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index e235536ad..150ef3045 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -13,6 +13,7 @@ import ( func GotSuite[T any](t *testing.T, setup func(t *testing.T, g got.G) T, teardown func(T)) { t.Helper() + t.Parallel() got.Each(t, func(t *testing.T) T { t.Helper()