From 923eafc524b0bdb1fe074f8e413a3a99024acf2b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Fri, 8 Dec 2023 14:21:44 -0500 Subject: [PATCH] make bq faster too! --- .github/workflows/dev-docker.yml | 2 +- flow/e2e/bigquery/peer_flow_bq_test.go | 310 ++++++++++++------------ flow/e2e/bigquery/qrep_flow_bq_test.go | 28 +-- flow/e2e/snowflake/peer_flow_sf_test.go | 49 ++-- flow/e2e/snowflake/qrep_flow_sf_test.go | 10 +- flow/e2e/test_utils.go | 12 + 6 files changed, 210 insertions(+), 201 deletions(-) diff --git a/.github/workflows/dev-docker.yml b/.github/workflows/dev-docker.yml index d5f1a2dc88..c4e3e313ec 100644 --- a/.github/workflows/dev-docker.yml +++ b/.github/workflows/dev-docker.yml @@ -10,7 +10,7 @@ jobs: docker-build: strategy: matrix: - runner: [ubicloud] + runner: [ubuntu-latest] runs-on: ${{ matrix.runner }} permissions: contents: read diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 2a287b5781..3975e7939a 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -14,13 +14,12 @@ import ( "github.com/joho/godotenv" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteBQ struct { - suite.Suite - testsuite.WorkflowTestSuite + got.G + t *testing.T bqSuffix string pool *pgxpool.Pool @@ -28,43 +27,49 @@ type PeerFlowE2ETestSuiteBQ struct { } func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteBQ)) + got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteBQ { + g := got.New(t) + + // Concurrently run each test + g.Parallel() + + suite := setupSuite(t, g) + + g.Cleanup(func() { + suite.tearDownSuite() + }) + + return suite + }) } -func (s *PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { +func (s PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string { return fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tableName) } -func (s *PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { +func (s PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.bqSuffix) } // setupBigQuery sets up the bigquery connection. -func (s *PeerFlowE2ETestSuiteBQ) setupBigQuery() error { +func setupBigQuery(t *testing.T) *BigQueryTestHelper { bqHelper, err := NewBigQueryTestHelper() if err != nil { - return fmt.Errorf("failed to create bigquery helper: %w", err) + log.Errorf("Error in test: %v", err) + t.FailNow() } err = bqHelper.RecreateDataset() if err != nil { - return fmt.Errorf("failed to recreate bigquery dataset: %w", err) + log.Errorf("Error in test: %v", err) + t.FailNow() } - s.bqHelper = bqHelper - return nil -} - -func (s *PeerFlowE2ETestSuiteBQ) setupTemporalLogger() { - logger := log.New() - logger.SetReportCaller(true) - logger.SetLevel(log.WarnLevel) - tlogger := e2e.NewTLogrusLogger(logger) - s.SetLogger(tlogger) + return bqHelper } // Implement SetupAllSuite interface to setup the test suite -func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -75,38 +80,43 @@ func (s *PeerFlowE2ETestSuiteBQ) SetupSuite() { log.SetReportCaller(true) log.SetLevel(log.WarnLevel) - s.setupTemporalLogger() - suffix := util.RandomString(8) tsSuffix := time.Now().Format("20060102150405") - s.bqSuffix = fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) - pool, err := e2e.SetupPostgres(s.bqSuffix) + bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) + pool, err := e2e.SetupPostgres(bqSuffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + log.Errorf("failed to setup postgres: %v", err) + t.FailNow() } - s.pool = pool - err = s.setupBigQuery() - if err != nil { - s.Fail("failed to setup bigquery", err) + bq := setupBigQuery(t) + + return PeerFlowE2ETestSuiteBQ{ + G: g, + t: t, + bqSuffix: bqSuffix, + pool: pool, + bqHelper: bq, } } // Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteBQ) TearDownSuite() { +func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + log.Errorf("failed to tear down postgres: %v", err) + s.FailNow() } err = s.bqHelper.DropDataset() if err != nil { - s.Fail("failed to drop bigquery dataset", err) + log.Errorf("failed to tear down bigquery: %v", err) + s.FailNow() } } -func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. @@ -123,13 +133,13 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { // assert that error contains "invalid connection configs" s.Error(err) - s.Contains(err.Error(), "invalid connection configs") + require.Contains(s.t, err.Error(), "invalid connection configs") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_no_data") @@ -142,7 +152,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { value VARCHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_flow_no_data"), @@ -153,7 +163,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -168,13 +178,13 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_char_coltype") @@ -187,7 +197,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { value CHAR(255) NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_char_table"), @@ -198,7 +208,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -213,16 +223,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } // Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") @@ -235,7 +245,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_simple_flow"), @@ -246,7 +256,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -264,7 +274,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -277,20 +287,20 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") count, err := s.bqHelper.countRows(dstTableName) - s.NoError(err) + require.NoError(s.t, err) s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_1") @@ -304,7 +314,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_1"), @@ -315,7 +325,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -340,7 +350,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -352,14 +362,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_2") @@ -373,7 +383,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_2"), @@ -384,7 +394,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 0, @@ -402,7 +412,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -414,14 +424,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_3") @@ -435,7 +445,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_3"), @@ -446,7 +456,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 11, @@ -477,7 +487,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -489,14 +499,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_4") @@ -509,7 +519,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_4"), @@ -520,7 +530,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 6, @@ -545,7 +555,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -557,14 +567,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_bq_5") @@ -578,7 +588,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { k int ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_5"), @@ -589,7 +599,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 4, @@ -613,7 +623,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed a transaction touching toast columns") }() @@ -625,14 +635,14 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_types_bq") @@ -646,7 +656,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR, c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[]); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_bq"), @@ -657,7 +667,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ @@ -684,7 +694,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { ARRAY[0.0003, 8902.0092], ARRAY['hello','bye']; `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -695,7 +705,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") noNulls, err := s.bqHelper.CheckNull(dstTableName, []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -707,11 +717,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { // Make sure that there are no nulls s.True(noNulls) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -723,7 +733,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s(id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table_bq"), @@ -734,7 +744,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -750,30 +760,30 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Executed an insert on two tables") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.T(), env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() count1, err := s.bqHelper.countRows(dstTable1Name) - s.NoError(err) + require.NoError(s.t, err) count2, err := s.bqHelper.countRows(dstTable2Name) - s.NoError(err) + require.NoError(s.t, err) s.Equal(1, count1) s.Equal(1, count2) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } // TODO: not checking schema exactly, add later -func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -785,7 +795,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -796,7 +806,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -810,7 +820,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -820,11 +830,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -834,11 +844,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -848,11 +858,11 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + require.NoError(s.t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -868,13 +878,13 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -889,7 +899,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -900,7 +910,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -917,7 +927,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") @@ -927,9 +937,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -940,15 +950,15 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") s.compareTableContentsBQ(dstTableName, "id,c1,c2,t") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -964,7 +974,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -975,7 +985,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -987,7 +997,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(s.t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -995,18 +1005,18 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1017,16 +1027,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1042,7 +1052,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1053,7 +1063,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -1071,16 +1081,16 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1091,10 +1101,10 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { // allow only continue as new error s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 79e4201852..31becd0b48 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -9,24 +9,24 @@ import ( "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { +func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { err := e2e.CreateSourceTableQRep(s.pool, s.bqSuffix, tableName) - s.NoError(err) + require.NoError(s.t, err) err = e2e.PopulateSourceTable(s.pool, s.bqSuffix, tableName, rowCount) - s.NoError(err) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) { +func (s PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) { schema := e2e.GetOwnersSchema() err := s.bqHelper.CreateTable(dstTable, schema) // fail if table creation fails - require.NoError(s.T(), err) + require.NoError(s.t, err) fmt.Printf("created table on bigquery: %s.%s. %v\n", s.bqHelper.Config.DatasetId, dstTable, err) } -func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { +func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) @@ -34,20 +34,20 @@ func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsSt pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", colsString, s.bqSuffix, tableName), ) - s.NoError(err) + require.NoError(s.t, err) // read rows from destination table qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) bqSelQuery := fmt.Sprintf("SELECT %s FROM %s ORDER BY id", colsString, qualifiedTableName) fmt.Printf("running query on bigquery: %s\n", bqSelQuery) bqRows, err := s.bqHelper.ExecuteAndProcessQuery(bqSelQuery) - s.NoError(err) + require.NoError(s.t, err) - s.True(pgRows.Equals(bqRows), "rows from source and destination tables are not equal") + s.True(pgRows.Equals(bqRows)) } -func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { - env := s.NewTestWorkflowEnvironment() +func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -65,7 +65,7 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { query, s.bqHelper.Peer, "") - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error @@ -73,9 +73,9 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) s.compareTableContentsBQ(tblName, "*") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 78022b9878..759ab34b05 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/ysmood/got" - "go.temporal.io/sdk/testsuite" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" @@ -50,18 +49,6 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { }) } -func (s PeerFlowE2ETestSuiteSF) newTestWorkflowEnvironment() *testsuite.TestWorkflowEnvironment { - testSuite := &testsuite.WorkflowTestSuite{} - - logger := log.New() - logger.SetReportCaller(true) - logger.SetLevel(log.WarnLevel) - tLogger := e2e.NewTLogrusLogger(logger) - - testSuite.SetLogger(tLogger) - return testSuite.NewTestWorkflowEnvironment() -} - func (s PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { return fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tableName) } @@ -140,7 +127,7 @@ func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") @@ -215,7 +202,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") @@ -300,7 +287,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_1") @@ -367,7 +354,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_2") @@ -436,7 +423,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_3") @@ -509,7 +496,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_4") @@ -575,7 +562,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_toast_sf_5") @@ -641,7 +628,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_types_sf") @@ -718,7 +705,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTable1Name := s.attachSchemaSuffix("test1_sf") @@ -777,7 +764,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -937,7 +924,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -1010,7 +997,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1086,7 +1073,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1158,7 +1145,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_exclude_sf") @@ -1240,7 +1227,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1326,7 +1313,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1408,7 +1395,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1494,7 +1481,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index ea972d672b..1aca83171f 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -57,7 +57,7 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selecto } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -97,7 +97,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -141,7 +141,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -181,7 +181,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 @@ -225,7 +225,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { - env := s.newTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 2d561c74c2..c16040062a 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -364,6 +364,18 @@ func GetOwnersSelectorString() string { return strings.Join(fields, ",") } +func NewTemporalTestWorkflowEnvironment() *testsuite.TestWorkflowEnvironment { + testSuite := &testsuite.WorkflowTestSuite{} + + logger := log.New() + logger.SetReportCaller(true) + logger.SetLevel(log.WarnLevel) + tLogger := NewTLogrusLogger(logger) + + testSuite.SetLogger(tLogger) + return testSuite.NewTestWorkflowEnvironment() +} + // implement temporal logger interface with logrus // // type Logger interface {