Skip to content

Commit

Permalink
Little fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 22, 2023
1 parent b447eac commit 24c9214
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 8 ./... -timeout 2400s
gotestsum --format testname -- -p 16 ./... -timeout 2400s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
Expand Down Expand Up @@ -147,7 +147,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() {
env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
require.True(s.t, env.IsWorkflowCompleted())
err = env.GetWorkflowError()

// allow only continue as new error
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 {
slog.Info("Unable to load .env file, using default values from env")
}

suffix := "s3" + strings.ToLower(shared.RandomString(8))
suffix := "s3_" + strings.ToLower(shared.RandomString(8))
pool, err := e2e.SetupPostgres(suffix)
if err != nil || pool == nil {
require.Fail(t, "failed to setup postgres", err)
Expand Down
17 changes: 10 additions & 7 deletions flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"os"
"strings"
"testing"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/joho/godotenv"
Expand All @@ -21,14 +23,13 @@ import (
"github.com/ysmood/got"
)

const sqlserverSuffix = "sqlserver"

type PeerFlowE2ETestSuiteSQLServer struct {
got.G
t *testing.T

pool *pgxpool.Pool
sqlsHelper *SQLServerHelper
suffix string
}

func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) {
Expand All @@ -55,7 +56,8 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSQLServer {
slog.Info("Unable to load .env file, using default values from env")
}

pool, err := e2e.SetupPostgres(sqlserverSuffix)
suffix := "sqls_" + strings.ToLower(shared.RandomString(8))
pool, err := e2e.SetupPostgres(suffix)
if err != nil || pool == nil {
require.Fail(t, "failed to setup postgres", err)
}
Expand All @@ -65,11 +67,12 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSQLServer {
t: t,
pool: pool,
sqlsHelper: setupSQLServer(t),
suffix: suffix,
}
}

func (s PeerFlowE2ETestSuiteSQLServer) TearDownSuite() {
err := e2e.TearDownPostgres(s.pool, sqlserverSuffix)
err := e2e.TearDownPostgres(s.pool, s.suffix)
if err != nil {
require.Fail(s.t, "failed to drop Postgres schema", err)
}
Expand Down Expand Up @@ -111,12 +114,12 @@ func (s PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName st
func (s PeerFlowE2ETestSuiteSQLServer) setupPGDestinationTable(tableName string) {
ctx := context.Background()

_, err := s.pool.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS e2e_test_%s.%s", sqlserverSuffix, tableName))
_, err := s.pool.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS e2e_test_%s.%s", s.suffix, tableName))
require.NoError(s.t, err)

_, err = s.pool.Exec(ctx,
fmt.Sprintf("CREATE TABLE e2e_test_%s.%s (id TEXT, card_id TEXT, v_from TIMESTAMP, price NUMERIC, status INT)",
sqlserverSuffix, tableName))
s.suffix, tableName))
require.NoError(s.t, err)
}

Expand Down Expand Up @@ -148,7 +151,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append(
s.insertRowsIntoSQLServerTable(tblName, numRows)

s.setupPGDestinationTable(tblName)
dstTableName := fmt.Sprintf("e2e_test_%s.%s", sqlserverSuffix, tblName)
dstTableName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, tblName)

query := fmt.Sprintf("SELECT * FROM %s.%s WHERE v_from BETWEEN {{.start}} AND {{.end}}",
s.sqlsHelper.SchemaName, tblName)
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (s *SetupFlowExecution) executeSetupFlow(
func SetupFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*protos.FlowConnectionConfigs, error) {
tblNameMapping := make(map[string]string)
tblNameMapping := make(map[string]string, len(config.TableMappings))
for _, v := range config.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
}
Expand Down

0 comments on commit 24c9214

Please sign in to comment.