diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 70f65df13f..ba2847d36a 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -95,7 +95,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 8 ./... -timeout 2400s + gotestsum --format testname -- -p 16 ./... -timeout 2400s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index a827f00fc8..c47e4bb56a 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -73,7 +73,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -147,7 +147,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 051b5eb379..b59d064334 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -49,7 +49,7 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { slog.Info("Unable to load .env file, using default values from env") } - suffix := "s3" + strings.ToLower(shared.RandomString(8)) + suffix := "s3_" + strings.ToLower(shared.RandomString(8)) pool, err := e2e.SetupPostgres(suffix) if err != nil || pool == nil { require.Fail(t, "failed to setup postgres", err) diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 632c550a09..6f6458aa46 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "os" + "strings" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" @@ -21,14 +23,13 @@ import ( "github.com/ysmood/got" ) -const sqlserverSuffix = "sqlserver" - type PeerFlowE2ETestSuiteSQLServer struct { got.G t *testing.T pool *pgxpool.Pool sqlsHelper *SQLServerHelper + suffix string } func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { @@ -55,7 +56,8 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSQLServer { slog.Info("Unable to load .env file, using default values from env") } - pool, err := e2e.SetupPostgres(sqlserverSuffix) + suffix := "sqls_" + strings.ToLower(shared.RandomString(8)) + pool, err := e2e.SetupPostgres(suffix) if err != nil || pool == nil { require.Fail(t, "failed to setup postgres", err) } @@ -65,11 +67,12 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSQLServer { t: t, pool: pool, sqlsHelper: setupSQLServer(t), + suffix: suffix, } } func (s PeerFlowE2ETestSuiteSQLServer) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, sqlserverSuffix) + err := e2e.TearDownPostgres(s.pool, s.suffix) if err != nil { require.Fail(s.t, "failed to drop Postgres schema", err) } @@ -111,12 +114,12 @@ func (s PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName st func (s PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string) { ctx := context.Background() - _, err := s.pool.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS e2e_test_%s.%s", sqlserverSuffix, tableName)) + _, err := s.pool.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS e2e_test_%s.%s", s.suffix, tableName)) require.NoError(s.t, err) _, err = s.pool.Exec(ctx, fmt.Sprintf("CREATE TABLE e2e_test_%s.%s (id TEXT, card_id TEXT, v_from TIMESTAMP, price NUMERIC, status INT)", - sqlserverSuffix, tableName)) + s.suffix, tableName)) require.NoError(s.t, err) } @@ -148,7 +151,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( s.insertRowsIntoSQLServerTable(tblName, numRows) s.setupPGDestinationTable(tblName) - dstTableName := fmt.Sprintf("e2e_test_%s.%s", sqlserverSuffix, tblName) + dstTableName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, tblName) query := fmt.Sprintf("SELECT * FROM %s.%s WHERE v_from BETWEEN {{.start}} AND {{.end}}", s.sqlsHelper.SchemaName, tblName) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index ab8168191c..03ed6dd3c0 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -273,7 +273,7 @@ func (s *SetupFlowExecution) executeSetupFlow( func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs, ) (*protos.FlowConnectionConfigs, error) { - tblNameMapping := make(map[string]string) + tblNameMapping := make(map[string]string, len(config.TableMappings)) for _, v := range config.TableMappings { tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier }