From 949efa882704fc8f8103eb0aefd153c6fa4faceb Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Wed, 8 Nov 2023 15:57:19 +0000 Subject: [PATCH] Experiment: replace testify suite.Suite with our own that supports parallelism --- flow/e2e/snowflake/peer_flow_sf_test.go | 385 +++++++++++++----------- flow/e2e/snowflake/qrep_flow_sf_test.go | 100 +++--- 2 files changed, 257 insertions(+), 228 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9f92496eb3..caf0803e67 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -3,6 +3,7 @@ package e2e_snowflake import ( "context" "fmt" + "reflect" "strings" "testing" @@ -14,14 +15,13 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/assert" "go.temporal.io/sdk/testsuite" ) const snowflakeSuffix = "snowflake" type PeerFlowE2ETestSuiteSF struct { - suite.Suite testsuite.WorkflowTestSuite pool *pgxpool.Pool @@ -30,7 +30,25 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteSF)) + e2e := new(PeerFlowE2ETestSuiteSF) + if e2e.SetupSuite(t) != nil { + return + } + defer e2e.TearDownSuite(t) + + t.Run("group", func(t *testing.T) { + e2etype := reflect.TypeOf(e2e) + methodcount := e2etype.NumMethod() + for methodid := 0; methodid < methodcount; methodid += 1 { + method := e2etype.Method(methodid) + if strings.HasPrefix(method.Name, "Test_") { + t.Run(method.Name, func(t *testing.T) { + t.Parallel() + method.Func.Call([]reflect.Value{reflect.ValueOf(e2e), reflect.ValueOf(t)}) + }) + } + } + }) } func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { @@ -53,7 +71,7 @@ func (s *PeerFlowE2ETestSuiteSF) setupSnowflake() error { return nil } -func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { +func (s *PeerFlowE2ETestSuiteSF) SetupSuite(t *testing.T) error { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -65,39 +83,48 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { pool, err := e2e.SetupPostgres(snowflakeSuffix) if err != nil { - s.Fail("failed to setup postgres", err) + t.Error("failed to setup postgres", err) + return err } s.pool = pool err = s.setupSnowflake() if err != nil { - s.Fail("failed to setup snowflake", err) + t.Error("failed to setup snowflake", err) + return err } s.connector, err = connsnowflake.NewSnowflakeConnector(context.Background(), s.sfHelper.Config) - s.NoError(err) + if err != nil { + t.Error(err) + return err + } + + return nil } // Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { +func (s *PeerFlowE2ETestSuiteSF) TearDownSuite(t *testing.T) { err := e2e.TearDownPostgres(s.pool, snowflakeSuffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + t.Error("failed to drop Postgres schema", err) } if s.sfHelper != nil { err = s.sfHelper.Cleanup() if err != nil { - s.Fail("failed to clean up Snowflake", err) + t.Error("failed to clean up Snowflake", err) } } err = s.connector.Close() - s.NoError(err) + if err != nil { + t.Error(err) + } } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -111,7 +138,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -120,7 +147,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -138,7 +165,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -146,24 +173,24 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf") - s.NoError(err) - s.Equal(10, count) + assert.NoError(t, err) + assert.Equal(t, 10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -177,7 +204,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow_avro"), @@ -188,7 +215,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -206,7 +233,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -214,24 +241,24 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf_avro_cdc") - s.NoError(err) - s.Equal(10, count) + assert.NoError(t, err) + assert.Equal(t, 10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -245,7 +272,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { poly GEOGRAPHY(POLYGON) NOT NULL ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), @@ -256,7 +283,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -277,7 +304,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -287,7 +314,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -295,30 +322,30 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // We inserted 4 invalid shapes in each. // They should have filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") - s.NoError(err) - s.Equal(6, lineCount) + assert.NoError(t, err) + assert.Equal(t, 6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") - s.NoError(err) - s.Equal(6, polyCount) + assert.NoError(t, err) + assert.Equal(t, 6, polyCount) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -336,7 +363,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_1"), @@ -346,7 +373,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -371,25 +398,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_1", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_1", `id,t1,t2,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -407,7 +434,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_2"), @@ -417,7 +444,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -435,25 +462,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_2", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_2", `id,t1,t2,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -471,7 +498,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_3"), @@ -481,7 +508,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -512,25 +539,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_3", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_3", `id,t1,t2,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -547,7 +574,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_4"), @@ -557,7 +584,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -582,25 +609,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_4", `id,t1,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_4", `id,t1,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -618,7 +645,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_5"), @@ -628,7 +655,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -652,25 +679,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_5", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_5", `id,t1,t2,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -694,7 +721,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { VOLATILE SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -704,7 +731,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -730,19 +757,19 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -752,12 +779,12 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { fmt.Println("error %w", err) } // Make sure that there are no nulls - s.Equal(noNulls, true) + assert.Equal(t, noNulls, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -781,7 +808,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { VOLATILE SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -792,7 +819,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -818,19 +845,19 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") noNulls, err := s.sfHelper.CheckNull("test_types_sf_avro_cdc", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -840,12 +867,12 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { fmt.Println("error %w", err) } // Make sure that there are no nulls - s.Equal(noNulls, true) + assert.Equal(t, noNulls, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -858,7 +885,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s (id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table"), @@ -868,7 +895,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -884,28 +911,28 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") - s.NoError(err) + assert.NoError(t, err) count2, err := s.sfHelper.CountRows("test2_sf") - s.NoError(err) + assert.NoError(t, err) - s.Equal(1, count1) - s.Equal(1, count2) + assert.Equal(t, 1, count1) + assert.Equal(t, 1, count2) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -918,7 +945,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -928,7 +955,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -942,7 +969,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -958,18 +985,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.compareTableContentsSF(t, "test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -986,18 +1013,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.compareTableContentsSF(t, "test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1015,18 +1042,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.compareTableContentsSF(t, "test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1044,25 +1071,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.compareTableContentsSF(t, "test_simple_schema_changes", "id,c1", false) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1078,7 +1105,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -1088,7 +1115,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 5, @@ -1105,38 +1132,38 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false) + s.compareTableContentsSF(t, "test_simple_cpkey", "id,c1,c2,t", false) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + assert.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false) + s.compareTableContentsSF(t, "test_simple_cpkey", "id,c1,c2,t", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1156,7 +1183,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1166,7 +1193,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1178,7 +1205,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + assert.NoError(t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1186,37 +1213,37 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + assert.NoError(t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + assert.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_cpkey_toast1", "id,c1,c2,t,t2", false) + s.compareTableContentsSF(t, "test_cpkey_toast1", "id,c1,c2,t,t2", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1236,7 +1263,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1246,7 +1273,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1264,30 +1291,30 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + assert.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_cpkey_toast2", "id,c1,c2,t,t2", false) + s.compareTableContentsSF(t, "test_cpkey_toast2", "id,c1,c2,t,t2", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index cdcfaeca98..42336c0c34 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -3,41 +3,43 @@ package e2e_snowflake import ( "context" "fmt" + "testing" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" ) -func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, rowCount int) { +func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(t *testing.T, tableName string, rowCount int) { err := e2e.CreateSourceTableQRep(s.pool, snowflakeSuffix, tableName) - s.NoError(err) + assert.NoError(t, err) err = e2e.PopulateSourceTable(s.pool, snowflakeSuffix, tableName, rowCount) - s.NoError(err) + assert.NoError(t, err) } -func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { +func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(t *testing.T, dstTable string) { schema := e2e.GetOwnersSchema() err := s.sfHelper.CreateTable(dstTable, schema) // fail if table creation fails if err != nil { - s.FailNow("unable to create table on snowflake", err) + assert.FailNow(t, "unable to create table on snowflake", err) } fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err) } -func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) { +func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(t *testing.T, tableName string, selector string, caseSensitive bool) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, snowflakeSuffix, tableName), ) - require.NoError(s.T(), err) + require.NoError(t, err) // read rows from destination table qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) @@ -50,20 +52,20 @@ func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, select fmt.Printf("running query on snowflake: %s\n", sfSelQuery) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) - require.NoError(s.T(), err) + require.NoError(t, err) - s.True(pgRows.Equals(sfRows), "rows from source and destination tables are not equal") + assert.True(t, pgRows.Equals(sfRows), "rows from source and destination tables are not equal") } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -79,32 +81,32 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { s.sfHelper.Peer, "", ) - s.NoError(err) + assert.NoError(t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_ups" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -124,32 +126,32 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, UpsertKeyColumns: []string{"id"}, } - s.NoError(err) + assert.NoError(t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_s3" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -165,31 +167,31 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { s.sfHelper.Peer, "", ) - s.NoError(err) + assert.NoError(t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_ups_xmin" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -210,31 +212,31 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { UpsertKeyColumns: []string{"id"}, } qrepConfig.WatermarkColumn = "xmin" - s.NoError(err) + assert.NoError(t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_s3_int" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -253,19 +255,19 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( sfPeer, "", ) - s.NoError(err) + assert.NoError(t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) }