diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 628d5a5cb3..dad2551196 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - runner: [ubicloud-standard-8] + runner: [ubicloud-standard-8-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} timeout-minutes: 30 services: diff --git a/.github/workflows/customer-docker.yml b/.github/workflows/customer-docker.yml index faede44f28..e710c7d860 100644 --- a/.github/workflows/customer-docker.yml +++ b/.github/workflows/customer-docker.yml @@ -12,7 +12,7 @@ jobs: docker-build: strategy: matrix: - runner: [ubicloud] + runner: [ubicloud-standard-2-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} permissions: contents: read diff --git a/.github/workflows/dev-docker.yml b/.github/workflows/dev-docker.yml index d5f1a2dc88..c4e3e313ec 100644 --- a/.github/workflows/dev-docker.yml +++ b/.github/workflows/dev-docker.yml @@ -10,7 +10,7 @@ jobs: docker-build: strategy: matrix: - runner: [ubicloud] + runner: [ubuntu-latest] runs-on: ${{ matrix.runner }} permissions: contents: read diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index e969bc0924..6d965122a3 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -11,12 +11,12 @@ jobs: flow_test: strategy: matrix: - runner: [ubicloud-standard-8, ubuntu-latest] + runner: [ubicloud-standard-8-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} - timeout-minutes: 30 + timeout-minutes: 40 services: pg_cdc: - image: postgis/postgis:15-3.4-alpine + image: imresamu/postgis:15-3.4-alpine ports: - 7132:5432 env: diff --git a/.github/workflows/golang-lint.yml b/.github/workflows/golang-lint.yml index 65da8f7e4d..803a9e09b2 100644 --- a/.github/workflows/golang-lint.yml +++ b/.github/workflows/golang-lint.yml @@ -13,7 +13,7 @@ jobs: pull-requests: write strategy: matrix: - runner: [ubicloud] + runner: [ubicloud-standard-2-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} steps: - name: checkout diff --git a/.github/workflows/rust-lint.yml b/.github/workflows/rust-lint.yml index 3618a93ad1..76487769a4 100644 --- a/.github/workflows/rust-lint.yml +++ b/.github/workflows/rust-lint.yml @@ -13,7 +13,7 @@ jobs: pull-requests: write strategy: matrix: - runner: [ubicloud-standard-4] + runner: [ubicloud-standard-4-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} steps: - name: checkout diff --git a/.github/workflows/stable-docker.yml b/.github/workflows/stable-docker.yml index e1a1f0c4ff..f4b7c9e9a1 100644 --- a/.github/workflows/stable-docker.yml +++ b/.github/workflows/stable-docker.yml @@ -9,7 +9,7 @@ jobs: docker-build: strategy: matrix: - runner: [ubicloud] + runner: [ubicloud-standard-2-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} permissions: contents: read diff --git a/.github/workflows/ui-build.yml b/.github/workflows/ui-build.yml index 3a6228e2dc..ec59e13969 100644 --- a/.github/workflows/ui-build.yml +++ b/.github/workflows/ui-build.yml @@ -13,7 +13,7 @@ jobs: name: Build & Test UI strategy: matrix: - runner: [ubicloud] + runner: [ubicloud-standard-2-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} steps: - name: checkout diff --git a/.github/workflows/ui-lint.yml b/.github/workflows/ui-lint.yml index ba895eb782..84fe76b0bc 100644 --- a/.github/workflows/ui-lint.yml +++ b/.github/workflows/ui-lint.yml @@ -17,7 +17,7 @@ jobs: name: Run UI linters strategy: matrix: - runner: [ubicloud] + runner: [ubicloud-standard-2-ubuntu-2204-arm] runs-on: ${{ matrix.runner }} steps: - name: checkout diff --git a/.gitignore b/.gitignore index 0b458e3e8c..c8909ff74a 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ tmp/ private/ nexus/server/tests/assets/*.json nexus/server/tests/results/actual/ + +go.work +go.work.sum diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 005328e169..4a314a30a1 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -160,50 +160,55 @@ func (a *FlowableActivity) CreateNormalizedTable( return conn.SetupNormalizedTables(config) } +func (a *FlowableActivity) handleSlotInfo( + ctx context.Context, + srcConn connectors.CDCPullConnector, + slotName string, + peerName string, +) error { + slotInfo, err := srcConn.GetSlotInfo(slotName) + if err != nil { + log.Warnf("warning: failed to get slot info: %v", err) + return err + } + + if len(slotInfo) != 0 { + return a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + } + return nil +} + func (a *FlowableActivity) recordSlotSizePeriodically( ctx context.Context, srcConn connectors.CDCPullConnector, slotName string, - done <-chan struct{}, peerName string, ) { - timeout := 10 * time.Minute ticker := time.NewTicker(timeout) defer ticker.Stop() for { - slotInfo, err := srcConn.GetSlotInfo(slotName) - if err != nil { - log.Warnf("warning: failed to get slot info: %v", err) - } - - if len(slotInfo) == 0 { - continue - } - select { case <-ticker.C: - a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) - case <-done: - a.CatalogMirrorMonitor.AppendSlotSizeInfo(ctx, peerName, slotInfo[0]) + err := a.handleSlotInfo(ctx, srcConn, slotName, peerName) + if err != nil { + return + } + case <-ctx.Done(): + return } ticker.Stop() ticker = time.NewTicker(timeout) } - } // StartFlow implements StartFlow. func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) { activity.RecordHeartbeat(ctx, "starting flow...") - done := make(chan struct{}) - defer close(done) conn := input.FlowConnectionConfigs - ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor) - dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -246,7 +251,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName } - go a.recordSlotSizePeriodically(ctx, srcConn, slotNameForMetrics, done, input.FlowConnectionConfigs.Source.Name) + go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name) + // start a goroutine to pull records from the source errGroup.Go(func() error { return srcConn.PullRecords(&model.PullRecordsRequest{ @@ -371,7 +377,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) activity.RecordHeartbeat(ctx, pushedRecordsWithCount) - done <- struct{}{} return res, nil } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 2a287b5781..559f03ba62 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -14,13 +14,12 @@ import ( "github.com/joho/godotenv" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteBQ struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T bqSuffix string pool *pgxpool.Pool @@ -28,43 +27,49 @@ type PeerFlowE2ETestSuiteBQ struct { } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteBQ)) + got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteBQ { + g := got.New(t) + + // Concurrently run each test + g.Parallel() + + suite := setupSuite(t, g) + + g.Cleanup(func() { + suite.tearDownSuite() + }) + + return suite + }) } -func (s *PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { +func (s PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { return fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tableName) } -func (s *PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { +func (s PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.bqSuffix) } // setupBigQuery sets up the bigquery connection. -func (s *PeerFlowE2ETestSuiteBQ) setupBigQuery() error { +func setupBigQuery(t *testing.T) *BigQueryTestHelper { bqHelper, err := NewBigQueryTestHelper() if err != nil { - return fmt.Errorf("failed to create bigquery helper: %w", err) + log.Errorf("Error in test: %v", err) + t.FailNow() } err = bqHelper.RecreateDataset() if err != nil { - return fmt.Errorf("failed to recreate bigquery dataset: %w", err) + log.Errorf("Error in test: %v", err) + t.FailNow() } - s.bqHelper = bqHelper - return nil -} - -func (s *PeerFlowE2ETestSuiteBQ) setupTemporalLogger() { - logger := log.New() - logger.SetReportCaller(true) - logger.SetLevel(log.WarnLevel) - tlogger := e2e.NewTLogrusLogger(logger) - s.SetLogger(tlogger) + return bqHelper } // Implement SetupAllSuite interface to setup the test suite -func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -75,38 +80,43 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { log.SetReportCaller(true) log.SetLevel(log.WarnLevel) - s.setupTemporalLogger() - suffix := util.RandomString(8) tsSuffix := time.Now().Format("20060102150405") - s.bqSuffix = fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) - pool, err := e2e.SetupPostgres(s.bqSuffix) + bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) + pool, err := e2e.SetupPostgres(bqSuffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + log.Errorf("failed to setup postgres: %v", err) + t.FailNow() } - s.pool = pool - err = s.setupBigQuery() - if err != nil { - s.Fail("failed to setup bigquery", err) + bq := setupBigQuery(t) + + return PeerFlowE2ETestSuiteBQ{ + G: g, + t: t, + bqSuffix: bqSuffix, + pool: pool, + bqHelper: bq, } } // Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { +func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + log.Errorf("failed to tear down postgres: %v", err) + s.FailNow() } err = s.bqHelper.DropDataset() if err != nil { - s.Fail("failed to drop bigquery dataset", err) + log.Errorf("failed to tear down bigquery: %v", err) + s.FailNow() } } -func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. @@ -122,14 +132,13 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { err := env.GetWorkflowError() // assert that error contains "invalid connection configs" - s.Error(err) - s.Contains(err.Error(), "invalid connection configs") + require.Contains(s.t, err.Error(), "invalid connection configs") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_no_data") @@ -142,7 +151,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { value VARCHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_flow_no_data"), @@ -153,7 +162,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -167,14 +176,13 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_char_coltype") @@ -187,7 +195,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { value CHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_char_table"), @@ -198,7 +206,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -212,17 +220,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } // Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") @@ -235,7 +242,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_simple_flow"), @@ -246,7 +253,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -264,7 +271,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -276,21 +283,20 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - s.NoError(err) + require.NoError(s.t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_1") @@ -304,7 +310,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_1"), @@ -315,7 +321,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -340,7 +346,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -351,15 +357,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_2") @@ -373,7 +378,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_2"), @@ -384,7 +389,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -402,7 +407,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -413,15 +418,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_3") @@ -435,7 +439,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_3"), @@ -446,7 +450,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 11, @@ -477,7 +481,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -488,15 +492,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_4") @@ -509,7 +512,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_4"), @@ -520,7 +523,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 6, @@ -545,7 +548,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -556,15 +559,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_5") @@ -578,7 +580,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_5"), @@ -589,7 +591,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -613,7 +615,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -624,15 +626,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_types_bq") @@ -646,7 +647,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_bq"), @@ -657,7 +658,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ @@ -684,7 +685,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { ARRAY[0.0003, 8902.0092], ARRAY['hello','bye']; `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -694,8 +695,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -707,11 +707,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { // Make sure that there are no nulls s.True(noNulls) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -723,7 +723,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s(id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table_bq"), @@ -734,7 +734,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -750,30 +750,30 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed an insert on two tables") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.T(), env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() count1, err := s.bqHelper.countRows(dstTable1Name) - s.NoError(err) + require.NoError(s.t, err) count2, err := s.bqHelper.countRows(dstTable2Name) - s.NoError(err) + require.NoError(s.t, err) s.Equal(1, count1) s.Equal(1, count2) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } // TODO: not checking schema exactly, add later -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -785,7 +785,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -796,7 +796,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -810,7 +810,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -820,11 +820,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -834,11 +834,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -848,11 +848,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -867,14 +867,13 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -889,7 +888,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -900,7 +899,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -917,7 +916,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") @@ -927,9 +926,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -939,16 +938,15 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,c1,c2,t") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -964,7 +962,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -975,7 +973,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -987,7 +985,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(s.t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -995,18 +993,18 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1016,17 +1014,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1042,7 +1039,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1053,7 +1050,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -1071,16 +1068,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1090,11 +1087,10 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 79e4201852..c12634cec8 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -9,24 +9,24 @@ import ( "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { +func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { err := e2e.CreateSourceTableQRep(s.pool, s.bqSuffix, tableName) - s.NoError(err) + require.NoError(s.t, err) err = e2e.PopulateSourceTable(s.pool, s.bqSuffix, tableName, rowCount) - s.NoError(err) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) { +func (s PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) { schema := e2e.GetOwnersSchema() err := s.bqHelper.CreateTable(dstTable, schema) // fail if table creation fails - require.NoError(s.T(), err) + require.NoError(s.t, err) fmt.Printf("created table on bigquery: %s.%s. %v\n", s.bqHelper.Config.DatasetId, dstTable, err) } -func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { +func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) @@ -34,20 +34,20 @@ func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsSt pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", colsString, s.bqSuffix, tableName), ) - s.NoError(err) + require.NoError(s.t, err) // read rows from destination table qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) fmt.Printf("running query on bigquery: %s\n", bqSelQuery) bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) - s.NoError(err) + require.NoError(s.t, err) - s.True(pgRows.Equals(bqRows), "rows from source and destination tables are not equal") + s.True(pgRows.Equals(bqRows)) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -65,17 +65,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { query, s.bqHelper.Peer, "") - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) - // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) s.compareTableContentsBQ(tblName, "*") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 5326a58e3d..c9b82a4c6c 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -175,7 +175,6 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) - // assert that error contains "invalid connection configs" err = env.GetWorkflowError() s.NoError(err) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index f77d74e948..759ab34b05 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -4,9 +4,12 @@ import ( "context" "fmt" "strings" + "sync" "testing" "time" + "github.com/ysmood/got" + connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -17,13 +20,11 @@ import ( "github.com/joho/godotenv" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" ) type PeerFlowE2ETestSuiteSF struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T pgSuffix string pool *pgxpool.Pool @@ -32,45 +33,31 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteSF)) -} - -func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tableName) -} + got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteSF { + g := got.New(t) -func (s *PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s.pgSuffix) -} + // Concurrently run each test + g.Parallel() -// setupSnowflake sets up the snowflake connection. -func (s *PeerFlowE2ETestSuiteSF) setupSnowflake() error { - sfHelper, err := NewSnowflakeTestHelper() - if err != nil { - return fmt.Errorf("failed to create snowflake helper: %w", err) - } + suite := SetupSuite(t, g) - s.sfHelper = sfHelper + g.Cleanup(func() { + suite.tearDownSuite() + }) - return nil + return suite + }) } -func (s *PeerFlowE2ETestSuiteSF) setupTemporalLogger() { - logger := log.New() - logger.SetReportCaller(true) - logger.SetLevel(log.WarnLevel) - tlogger := e2e.NewTLogrusLogger(logger) - s.SetLogger(tlogger) +func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tableName) } -type logWriterType struct{ t *testing.T } - -func (l logWriterType) Write(p []byte) (n int, err error) { - l.t.Logf(string(p)) - return len(p), nil +func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.pgSuffix) } -func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { +func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -80,50 +67,67 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { log.SetReportCaller(true) log.SetLevel(log.WarnLevel) - log.SetOutput(logWriterType{t: s.T()}) - - s.setupTemporalLogger() suffix := util.RandomString(8) tsSuffix := time.Now().Format("20060102150405") - s.pgSuffix = fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) + pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) - pool, err := e2e.SetupPostgres(s.pgSuffix) + pool, err := e2e.SetupPostgres(pgSuffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + log.Errorf("failed to setup Postgres: %v", err) + g.FailNow() } - s.pool = pool - err = s.setupSnowflake() + sfHelper, err := NewSnowflakeTestHelper() if err != nil { - s.Fail("failed to setup snowflake", err) + log.Errorf("failed to setup Snowflake: %v", err) + g.FailNow() } - s.connector, err = connsnowflake.NewSnowflakeConnector(context.Background(), - s.sfHelper.Config) - require.NoError(s.T(), err) + connector, err := connsnowflake.NewSnowflakeConnector( + context.Background(), + sfHelper.Config, + ) + require.NoError(t, err) + + suite := PeerFlowE2ETestSuiteSF{ + G: g, + t: t, + pgSuffix: pgSuffix, + pool: pool, + sfHelper: sfHelper, + connector: connector, + } + + return suite } // Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { +func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + log.Errorf("failed to tear down Postgres: %v", err) + s.FailNow() } if s.sfHelper != nil { err = s.sfHelper.Cleanup() if err != nil { - s.Fail("failed to clean up Snowflake", err) + log.Errorf("failed to tear down Snowflake: %v", err) + s.FailNow() } } err = s.connector.Close() - require.NoError(s.T(), err) + + if err != nil { + log.Errorf("failed to close Snowflake connector: %v", err) + s.FailNow() + } } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") @@ -136,7 +140,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), @@ -146,7 +150,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -164,7 +168,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 20 rows into the source table") }() @@ -176,11 +180,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf") - s.NoError(err) + require.NoError(s.t, err) s.Equal(20, count) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago @@ -189,17 +192,17 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE' `, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - s.NoError(err) + require.NoError(s.t, err) s.Equal(20, numNewRows) // TODO: verify that the data is correctly synced to the destination table // on the Snowflake side - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") @@ -212,7 +215,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { poly GEOGRAPHY(POLYGON) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), @@ -222,7 +225,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -243,7 +246,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -253,7 +256,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -265,27 +268,26 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // We inserted 4 invalid shapes in each. // They should have been filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") - s.NoError(err) + require.NoError(s.t, err) s.Equal(6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") - s.NoError(err) + require.NoError(s.t, err) s.Equal(6, polyCount) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_1") @@ -299,7 +301,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_1"), @@ -309,7 +311,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -334,7 +336,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -345,15 +347,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_1", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_2") @@ -368,7 +369,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { ); `, srcTableName)) log.Infof("Creating table '%s', err: %v", srcTableName, err) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_2"), @@ -378,13 +379,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, MaxBatchSize: 100, } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) /* transaction updating no rows */ @@ -394,7 +398,13 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + + if err != nil { + log.Errorf("Error executing transaction: %v", err) + s.FailNow() + } + + wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -404,15 +414,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_2", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) + + wg.Wait() } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_3") @@ -426,7 +437,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_3"), @@ -436,7 +447,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 11, @@ -467,7 +478,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -478,15 +489,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_3", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_4") @@ -499,7 +509,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_4"), @@ -509,7 +519,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 6, @@ -534,7 +544,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -545,15 +555,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_4", `id,t1,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_5") @@ -567,7 +576,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_5"), @@ -577,7 +586,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -601,7 +610,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -612,15 +621,14 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_5", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_types_sf") @@ -635,7 +643,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT), c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON)); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -645,7 +653,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -671,7 +679,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -681,8 +689,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -694,11 +701,11 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { // Make sure that there are no nulls s.Equal(noNulls, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTable1Name := s.attachSchemaSuffix("test1_sf") @@ -710,7 +717,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); CREATE TABLE IF NOT EXISTS %s (id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table"), @@ -720,7 +727,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -736,7 +743,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -746,18 +753,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") - s.NoError(err) + require.NoError(s.t, err) count2, err := s.sfHelper.CountRows("test2_sf") - s.NoError(err) + require.NoError(s.t, err) s.Equal(1, count1) s.Equal(1, count2) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -769,7 +776,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -779,7 +786,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -793,7 +800,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -810,18 +817,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -839,18 +846,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -869,18 +876,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -899,7 +906,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) + require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) }() @@ -911,14 +918,13 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -933,7 +939,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -943,7 +949,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -960,7 +966,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") @@ -970,9 +976,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -982,17 +988,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1008,7 +1013,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1018,7 +1023,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -1030,7 +1035,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(s.t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1038,18 +1043,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1059,17 +1064,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsSF("test_cpkey_toast1", "id,c1,c2,t,t2", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1085,7 +1089,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1095,7 +1099,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -1113,16 +1117,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1132,17 +1136,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsSF("test_cpkey_toast2", "id,c1,c2,t,t2", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_exclude_sf") @@ -1158,7 +1161,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_exclude_flow"), @@ -1194,38 +1197,37 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") query := fmt.Sprintf("SELECT * FROM %s.%s.test_exclude_sf ORDER BY id", s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(query) - s.NoError(err) + require.NoError(s.t, err) for _, field := range sfRows.Schema.Fields { - s.NotEqual(field.Name, "c2") + require.NotEqual(s.t, field.Name, "c2") } s.Equal(4, len(sfRows.Schema.Fields)) s.Equal(10, len(sfRows.Records)) } -func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1240,7 +1242,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { t TEXT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_softdel"), @@ -1266,6 +1268,9 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { MaxBatchSize: 100, } + wg := sync.WaitGroup{} + wg.Add(1) + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup // and then insert, update and delete rows in the table. go func() { @@ -1273,26 +1278,29 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 2) // since we delete stuff, create another table to compare with _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) + + wg.Done() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") + + wg.Wait() // verify our updates and delete happened s.compareTableContentsSF("test_softdel", "id,c1,c2,t", false) @@ -1300,12 +1308,12 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - s.NoError(err) - s.Equal(1, numNewRows) + require.NoError(s.t, err) + require.Equal(s.t, 1, numNewRows) } -func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1320,7 +1328,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { t TEXT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_softdel_iud"), @@ -1352,30 +1360,29 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) insertTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(s.t, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) // since we delete stuff, create another table to compare with _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) - s.NoError(insertTx.Commit(context.Background())) + require.NoError(s.t, insertTx.Commit(context.Background())) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsSF("test_softdel_iud", "id,c1,c2,t", false) @@ -1383,12 +1390,12 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - s.NoError(err) + require.NoError(s.t, err) s.Equal(1, numNewRows) } -func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1403,7 +1410,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { t TEXT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_softdel_ud"), @@ -1436,33 +1443,32 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) insertTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(s.t, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) // since we delete stuff, create another table to compare with _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) - s.NoError(insertTx.Commit(context.Background())) + require.NoError(s.t, insertTx.Commit(context.Background())) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsSF("test_softdel_ud", "id,c1,c2,t", false) @@ -1470,12 +1476,12 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - s.NoError(err) + require.NoError(s.t, err) s.Equal(1, numNewRows) } -func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") @@ -1489,7 +1495,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { t TEXT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_softdel_iad"), @@ -1522,22 +1528,21 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 1) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsSF("test_softdel_iad", "id,c1,c2,t", false) @@ -1545,6 +1550,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - s.NoError(err) + require.NoError(s.t, err) s.Equal(0, numNewRows) } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 5f5b01404d..81884d2e3d 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -8,36 +8,37 @@ import ( "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/google/uuid" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, rowCount int) { +func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) { err := e2e.CreateSourceTableQRep(s.pool, s.pgSuffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, s.pgSuffix, tableName, rowCount) - s.NoError(err) + require.NoError(s.t, err) + err = e2e.PopulateSourceTable(s.pool, s.pgSuffix, tableName, numRows) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { +func (s PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { schema := e2e.GetOwnersSchema() err := s.sfHelper.CreateTable(dstTable, schema) // fail if table creation fails if err != nil { - s.FailNow("unable to create table on snowflake", err) + require.FailNow(s.t, "unable to create table on snowflake", err) } - fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err) + log.Infof("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err) } -func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) { +func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, s.pgSuffix, tableName), ) - require.NoError(s.T(), err) + require.NoError(s.t, err) // read rows from destination table qualifiedTableName := fmt.Sprintf("%s.%s.%s", s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) @@ -50,13 +51,13 @@ func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, select fmt.Printf("running query on snowflake: %s\n", sfSelQuery) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) - require.NoError(s.T(), err) + require.NoError(s.t, err) - s.True(pgRows.Equals(sfRows), "rows from source and destination tables are not equal") + require.True(s.t, pgRows.Equals(sfRows), "rows from source and destination tables are not equal") } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -78,25 +79,24 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { s.sfHelper.Peer, "", ) - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) - // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) sel := e2e.GetOwnersSelectorString() s.compareTableContentsSF(tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -122,25 +122,24 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, UpsertKeyColumns: []string{"id"}, } - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) - // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) sel := e2e.GetOwnersSelectorString() s.compareTableContentsSF(tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -162,7 +161,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { s.sfHelper.Peer, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -171,15 +170,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) sel := e2e.GetOwnersSelectorString() s.compareTableContentsSF(tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { - env := s.NewTestWorkflowEnvironment() + +func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -206,7 +206,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { UpsertKeyColumns: []string{"id"}, } qrepConfig.WatermarkColumn = "xmin" - s.NoError(err) + require.NoError(s.t, err) e2e.RunXminFlowWorkflow(env, qrepConfig) @@ -214,16 +214,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) sel := e2e.GetOwnersSelectorString() s.compareTableContentsSF(tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -249,7 +249,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( sfPeer, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -258,10 +258,10 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) sel := e2e.GetOwnersSelectorString() s.compareTableContentsSF(tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index d17b60f798..bb33fc121d 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -8,42 +8,62 @@ import ( connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/stretchr/testify/suite" + "github.com/sirupsen/logrus" + "github.com/ysmood/got" ) const schemaDeltaTestSchemaName = "PUBLIC" type SnowflakeSchemaDeltaTestSuite struct { - suite.Suite + got.G + t *testing.T + connector *connsnowflake.SnowflakeConnector sfTestHelper *SnowflakeTestHelper } -func (suite *SnowflakeSchemaDeltaTestSuite) failTestError(err error) { +func (suite SnowflakeSchemaDeltaTestSuite) failTestError(err error) { if err != nil { - suite.FailNow(err.Error()) + logrus.Errorf("Error in test: %v", err) + suite.FailNow() } } -func (suite *SnowflakeSchemaDeltaTestSuite) SetupSuite() { - var err error +func setupSchemaDeltaSuite( + t *testing.T, + g got.G, +) SnowflakeSchemaDeltaTestSuite { + sfTestHelper, err := NewSnowflakeTestHelper() + if err != nil { + logrus.Errorf("Error in test: %v", err) + t.FailNow() + } - suite.sfTestHelper, err = NewSnowflakeTestHelper() - suite.failTestError(err) + connector, err := connsnowflake.NewSnowflakeConnector( + context.Background(), + sfTestHelper.Config, + ) + if err != nil { + logrus.Errorf("Error in test: %v", err) + t.FailNow() + } - suite.connector, err = connsnowflake.NewSnowflakeConnector(context.Background(), - suite.sfTestHelper.Config) - suite.failTestError(err) + return SnowflakeSchemaDeltaTestSuite{ + G: g, + t: t, + connector: connector, + sfTestHelper: sfTestHelper, + } } -func (suite *SnowflakeSchemaDeltaTestSuite) TearDownSuite() { +func (suite SnowflakeSchemaDeltaTestSuite) tearDownSuite() { err := suite.sfTestHelper.Cleanup() suite.failTestError(err) err = suite.connector.Close() suite.failTestError(err) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { +func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.SIMPLE_ADD_COLUMN", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) suite.failTestError(err) @@ -71,7 +91,7 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { }, output.TableNameSchemaMapping[tableName]) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { +func (suite SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.ADD_DROP_ALL_COLUMN_TYPES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) suite.failTestError(err) @@ -117,7 +137,7 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { +func (suite SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_TRICKY_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(id TEXT PRIMARY KEY)", tableName)) suite.failTestError(err) @@ -161,7 +181,7 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { +func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_WHITESPACE_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(\" \" TEXT PRIMARY KEY)", tableName)) suite.failTestError(err) @@ -200,5 +220,17 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { } func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { - suite.Run(t, new(SnowflakeSchemaDeltaTestSuite)) + got.Each(t, func(t *testing.T) SnowflakeSchemaDeltaTestSuite { + g := got.New(t) + + g.Parallel() + + suite := setupSchemaDeltaSuite(t, g) + + g.Cleanup(func() { + suite.tearDownSuite() + }) + + return suite + }) } diff --git a/flow/e2e/sqlserver/sqlserver_helper.go b/flow/e2e/sqlserver/sqlserver_helper.go index ed6ccfa97e..775b7f83d3 100644 --- a/flow/e2e/sqlserver/sqlserver_helper.go +++ b/flow/e2e/sqlserver/sqlserver_helper.go @@ -23,7 +23,7 @@ type SQLServerHelper struct { } func NewSQLServerHelper(name string) (*SQLServerHelper, error) { - port, err := strconv.Atoi(os.Getenv("SQLSERVER_PORT")) + port, err := strconv.ParseUint(os.Getenv("SQLSERVER_PORT"), 10, 16) if err != nil { return nil, fmt.Errorf("invalid SQLSERVER_PORT: %s", os.Getenv("SQLSERVER_PORT")) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 2d561c74c2..c16040062a 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -364,6 +364,18 @@ func GetOwnersSelectorString() string { return strings.Join(fields, ",") } +func NewTemporalTestWorkflowEnvironment() *testsuite.TestWorkflowEnvironment { + testSuite := &testsuite.WorkflowTestSuite{} + + logger := log.New() + logger.SetReportCaller(true) + logger.SetLevel(log.WarnLevel) + tLogger := NewTLogrusLogger(logger) + + testSuite.SetLogger(tLogger) + return testSuite.NewTestWorkflowEnvironment() +} + // implement temporal logger interface with logrus // // type Logger interface { diff --git a/flow/go.mod b/flow/go.mod index c606d7573d..3a83ee0cf7 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -11,6 +11,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0 github.com/aws/aws-sdk-go v1.47.9 github.com/cenkalti/backoff/v4 v4.2.1 + github.com/cockroachdb/pebble v0.0.0-20231130180345-51fca96df08f github.com/google/uuid v1.4.0 github.com/grafana/pyroscope-go v1.0.4 github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 @@ -30,6 +31,7 @@ require ( github.com/twpayne/go-geos v0.14.0 github.com/urfave/cli/v2 v2.25.7 github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a + github.com/ysmood/got v0.38.2 go.temporal.io/api v1.26.0 go.temporal.io/sdk v1.25.1 go.uber.org/atomic v1.11.0 @@ -47,7 +49,6 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cockroachdb/errors v1.11.1 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect - github.com/cockroachdb/pebble v0.0.0-20231130180345-51fca96df08f // indirect github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/getsentry/sentry-go v0.18.0 // indirect @@ -60,6 +61,7 @@ require ( github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect + github.com/ysmood/gop v0.2.0 // indirect ) require ( diff --git a/flow/go.sum b/flow/go.sum index 1186bc4a22..f2af155f24 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -156,10 +156,14 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/errors v1.11.1 h1:xSEW75zKaKCWzR3OfxXUxgrk/NtT4G1MiOv5lWZazG8= github.com/cockroachdb/errors v1.11.1/go.mod h1:8MUxA3Gi6b25tYlFEBGLf+D8aISL+M4MIpiWMSNRfxw= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= +github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= +github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA= github.com/cockroachdb/pebble v0.0.0-20231130180345-51fca96df08f h1:XkkAFIUJ3t1Wt/z9wXqZaebwiZteIzmQfAkN4IHQKLY= github.com/cockroachdb/pebble v0.0.0-20231130180345-51fca96df08f/go.mod h1:BHuaMa/lK7fUe75BlsteiiTu8ptIG+qSAuDtGMArP18= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= @@ -193,6 +197,8 @@ github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcP github.com/getsentry/sentry-go v0.18.0 h1:MtBW5H9QgdcJabtZcuJG80BMOwaBpkRDZkxRkNC1sN0= github.com/getsentry/sentry-go v0.18.0/go.mod h1:Kgon4Mby+FJ7ZWHFUAZgVaIa8sxHtnRJRLTXZr51aKQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -276,6 +282,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= @@ -409,6 +416,8 @@ github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -485,6 +494,10 @@ github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRT github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= +github.com/ysmood/gop v0.2.0 h1:+tFrG0TWPxT6p9ZaZs+VY+opCvHU8/3Fk6BaNv6kqKg= +github.com/ysmood/gop v0.2.0/go.mod h1:rr5z2z27oGEbyB787hpEcx4ab8cCiPnKxn0SUHt6xzk= +github.com/ysmood/got v0.38.2 h1:h2RYvAe5nIK+oBRMLzNIrkZaX5kjmkOBqfRMIsFzLyU= +github.com/ysmood/got v0.38.2/go.mod h1:W7DdpuX6skL3NszLmAsC5hT7JAhuLZhByVzHTq874Qg= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/ui/app/dto/MirrorsDTO.ts b/ui/app/dto/MirrorsDTO.ts index f63a93a15f..4a76200fd4 100644 --- a/ui/app/dto/MirrorsDTO.ts +++ b/ui/app/dto/MirrorsDTO.ts @@ -21,3 +21,10 @@ export type TableMapRow = { exclude: string[]; selected: boolean; }; + +export type SyncStatusRow = { + batchId: bigint; + startTime: Date; + endTime: Date | null; + numRows: number; +}; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx index 4afeedd5d4..89dc82329b 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx @@ -1,5 +1,6 @@ 'use client'; +import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import TimeLabel from '@/components/TimeComponent'; import { CDCMirrorStatus, @@ -272,13 +273,6 @@ const Trigger = styled( } `; -type SyncStatusRow = { - batchId: number; - startTime: Date; - endTime: Date | null; - numRows: number; -}; - type CDCMirrorStatusProps = { cdc: CDCMirrorStatus; rows: SyncStatusRow[]; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index f52450ade0..32992f871d 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -1,4 +1,5 @@ 'use client'; +import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import MirrorInfo from '@/components/MirrorInfo'; import PeerButton from '@/components/PeerComponent'; import TimeLabel from '@/components/TimeComponent'; @@ -11,13 +12,6 @@ import moment from 'moment'; import MirrorValues from './configValues'; import TablePairs from './tablePairs'; -type SyncStatusRow = { - batchId: number; - startTime: Date; - endTime: Date | null; - numRows: number; -}; - type props = { syncs: SyncStatusRow[]; mirrorConfig: FlowConnectionConfigs | undefined; diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx index e241a07ffc..a1c315459b 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx @@ -1,4 +1,5 @@ 'use client'; +import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import { formatGraphLabel, timeOptions } from '@/app/utils/graph'; import { Label } from '@/lib/Label'; import { BarChart } from '@tremor/react'; @@ -6,13 +7,6 @@ import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; import aggregateCountsByInterval from './aggregatedCountsByInterval'; -type SyncStatusRow = { - batchId: number; - startTime: Date; - endTime: Date | null; - numRows: number; -}; - function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) { let [aggregateType, setAggregateType] = useState('hour'); const initialCount: [string, number][] = []; diff --git a/ui/app/mirrors/edit/[mirrorId]/page.tsx b/ui/app/mirrors/edit/[mirrorId]/page.tsx index 756684b854..ca0adbd43a 100644 --- a/ui/app/mirrors/edit/[mirrorId]/page.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/page.tsx @@ -65,7 +65,7 @@ export default async function EditMirror({ } const rows = syncs.map((sync) => ({ - batchId: sync.id, + batchId: sync.batch_id, startTime: sync.start_time, endTime: sync.end_time, numRows: sync.rows_in_batch, diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx index 0a481241ca..d1cd51f2d1 100644 --- a/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/syncStatus.tsx @@ -32,7 +32,7 @@ export default async function SyncStatus({ }); const rows = syncs.map((sync) => ({ - batchId: sync.id, + batchId: sync.batch_id, startTime: sync.start_time, endTime: sync.end_time, numRows: sync.rows_in_batch, diff --git a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx index e9b11bfa2d..d4da7c5920 100644 --- a/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/syncStatusTable.tsx @@ -1,5 +1,6 @@ 'use client'; +import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import TimeLabel from '@/components/TimeComponent'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; @@ -10,12 +11,6 @@ import { Table, TableCell, TableRow } from '@/lib/Table'; import moment from 'moment'; import { useMemo, useState } from 'react'; import ReactSelect from 'react-select'; -type SyncStatusRow = { - batchId: number; - startTime: Date; - endTime: Date | null; - numRows: number; -}; type SyncStatusTableProps = { rows: SyncStatusRow[]; diff --git a/ui/app/mirrors/status/qrep/[mirrorId]/qrepConfigViewer.tsx b/ui/app/mirrors/status/qrep/[mirrorId]/qrepConfigViewer.tsx index 31f54b384d..ac070d744d 100644 --- a/ui/app/mirrors/status/qrep/[mirrorId]/qrepConfigViewer.tsx +++ b/ui/app/mirrors/status/qrep/[mirrorId]/qrepConfigViewer.tsx @@ -2,6 +2,7 @@ import prisma from '@/app/utils/prisma'; import { QRepConfig } from '@/grpc_generated/flow'; import { Badge } from '@/lib/Badge'; import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle'; export const dynamic = 'force-dynamic'; @@ -27,9 +28,9 @@ export default async function QRepConfigViewer({ if (!configBuffer?.config_proto) { return ( -
+
- Waiting for mirror to start... +
); }