diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9f92496eb3..b9ffbcf4ca 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -3,6 +3,7 @@ package e2e_snowflake import ( "context" "fmt" + "reflect" "strings" "testing" @@ -14,14 +15,13 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/assert" "go.temporal.io/sdk/testsuite" ) const snowflakeSuffix = "snowflake" type PeerFlowE2ETestSuiteSF struct { - suite.Suite testsuite.WorkflowTestSuite pool *pgxpool.Pool @@ -30,7 +30,23 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteSF)) + e2e := new(PeerFlowE2ETestSuiteSF) + if e2e.SetupSuite(t) != nil { + return + } + defer e2e.TearDownSuite(t) + e2etype := reflect.TypeOf(e2e) + + methodcount := e2etype.NumMethod() + for methodid := 0; methodid < methodcount; methodid += 1 { + method := e2etype.Method(methodid) + if strings.HasPrefix(method.Name, "Test_") { + t.Run(method.Name, func(t *testing.T) { + t.Parallel() + method.Func.Call([]reflect.Value{reflect.ValueOf(e2e), reflect.ValueOf(t)}) + }) + } + } } func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { @@ -53,7 +69,7 @@ func (s *PeerFlowE2ETestSuiteSF) setupSnowflake() error { return nil } -func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { +func (s *PeerFlowE2ETestSuiteSF) SetupSuite(t *testing.T) error { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -65,39 +81,48 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { pool, err := e2e.SetupPostgres(snowflakeSuffix) if err != nil { - s.Fail("failed to setup postgres", err) + t.Errorf("failed to setup postgres: %w", err) + return err } s.pool = pool err = s.setupSnowflake() if err != nil { - s.Fail("failed to setup snowflake", err) + t.Errorf("failed to setup snowflake: %w", err) + return err } s.connector, err = connsnowflake.NewSnowflakeConnector(context.Background(), s.sfHelper.Config) - s.NoError(err) + if err != nil { + t.Error(err) + return err + } + + return nil } // Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { +func (s *PeerFlowE2ETestSuiteSF) TearDownSuite(t *testing.T) { err := e2e.TearDownPostgres(s.pool, snowflakeSuffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + t.Errorf("failed to drop Postgres schema", err) } if s.sfHelper != nil { err = s.sfHelper.Cleanup() if err != nil { - s.Fail("failed to clean up Snowflake", err) + t.Errorf("failed to clean up Snowflake", err) } } err = s.connector.Close() - s.NoError(err) + if err != nil { + t.Error(err) + } } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -111,7 +136,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -120,7 +145,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -138,7 +163,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -146,24 +171,24 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf") - s.NoError(err) - s.Equal(10, count) + assert.NoError(t, err) + assert.Equal(t, 10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -177,7 +202,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow_avro"), @@ -188,7 +213,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -206,7 +231,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -214,24 +239,24 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf_avro_cdc") - s.NoError(err) - s.Equal(10, count) + assert.NoError(t, err) + assert.Equal(t, 10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -245,7 +270,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { poly GEOGRAPHY(POLYGON) NOT NULL ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), @@ -256,7 +281,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -277,7 +302,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -287,7 +312,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -295,30 +320,30 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // We inserted 4 invalid shapes in each. // They should have filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") - s.NoError(err) - s.Equal(6, lineCount) + assert.NoError(t, err) + assert.Equal(t, 6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") - s.NoError(err) - s.Equal(6, polyCount) + assert.NoError(t, err) + assert.Equal(t, 6, polyCount) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -336,7 +361,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_1"), @@ -346,7 +371,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -371,25 +396,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_1", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -407,7 +432,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_2"), @@ -417,7 +442,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -435,25 +460,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_2", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -471,7 +496,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_3"), @@ -481,7 +506,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -512,25 +537,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_3", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -547,7 +572,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_4"), @@ -557,7 +582,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -582,25 +607,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_4", `id,t1,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -618,7 +643,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_5"), @@ -628,7 +653,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -652,25 +677,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") s.compareTableContentsSF("test_toast_sf_5", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -694,7 +719,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { VOLATILE SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -704,7 +729,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -730,19 +755,19 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -752,12 +777,12 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { fmt.Println("error %w", err) } // Make sure that there are no nulls - s.Equal(noNulls, true) + assert.Equal(t, noNulls, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -781,7 +806,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { VOLATILE SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -792,7 +817,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -818,19 +843,19 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") noNulls, err := s.sfHelper.CheckNull("test_types_sf_avro_cdc", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -840,12 +865,12 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { fmt.Println("error %w", err) } // Make sure that there are no nulls - s.Equal(noNulls, true) + assert.Equal(t, noNulls, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -858,7 +883,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s (id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table"), @@ -868,7 +893,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -884,28 +909,28 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") - s.NoError(err) + assert.NoError(t, err) count2, err := s.sfHelper.CountRows("test2_sf") - s.NoError(err) + assert.NoError(t, err) - s.Equal(1, count1) - s.Equal(1, count2) + assert.Equal(t, 1, count1) + assert.Equal(t, 1, count2) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -918,7 +943,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -928,7 +953,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -942,7 +967,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -958,18 +983,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -986,18 +1011,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1015,18 +1040,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1044,25 +1069,25 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1078,7 +1103,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -1088,7 +1113,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 5, @@ -1105,7 +1130,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") @@ -1115,28 +1140,28 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + assert.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1156,7 +1181,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1166,7 +1191,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1178,7 +1203,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + assert.NoError(t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1186,37 +1211,37 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + assert.NoError(t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + assert.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsSF("test_cpkey_toast1", "id,c1,c2,t,t2", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) @@ -1236,7 +1261,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1246,7 +1271,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1264,30 +1289,30 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + assert.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // verify our updates and delete happened s.compareTableContentsSF("test_cpkey_toast2", "id,c1,c2,t,t2", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index cdcfaeca98..42336c0c34 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -3,41 +3,43 @@ package e2e_snowflake import ( "context" "fmt" + "testing" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" ) -func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, rowCount int) { +func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(t *testing.T, tableName string, rowCount int) { err := e2e.CreateSourceTableQRep(s.pool, snowflakeSuffix, tableName) - s.NoError(err) + assert.NoError(t, err) err = e2e.PopulateSourceTable(s.pool, snowflakeSuffix, tableName, rowCount) - s.NoError(err) + assert.NoError(t, err) } -func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { +func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(t *testing.T, dstTable string) { schema := e2e.GetOwnersSchema() err := s.sfHelper.CreateTable(dstTable, schema) // fail if table creation fails if err != nil { - s.FailNow("unable to create table on snowflake", err) + assert.FailNow(t, "unable to create table on snowflake", err) } fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err) } -func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) { +func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(t *testing.T, tableName string, selector string, caseSensitive bool) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, snowflakeSuffix, tableName), ) - require.NoError(s.T(), err) + require.NoError(t, err) // read rows from destination table qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) @@ -50,20 +52,20 @@ func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, select fmt.Printf("running query on snowflake: %s\n", sfSelQuery) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) - require.NoError(s.T(), err) + require.NoError(t, err) - s.True(pgRows.Equals(sfRows), "rows from source and destination tables are not equal") + assert.True(t, pgRows.Equals(sfRows), "rows from source and destination tables are not equal") } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -79,32 +81,32 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { s.sfHelper.Peer, "", ) - s.NoError(err) + assert.NoError(t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_ups" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -124,32 +126,32 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, UpsertKeyColumns: []string{"id"}, } - s.NoError(err) + assert.NoError(t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_s3" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -165,31 +167,31 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { s.sfHelper.Peer, "", ) - s.NoError(err) + assert.NoError(t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_ups_xmin" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -210,31 +212,31 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { UpsertKeyColumns: []string{"id"}, } qrepConfig.WatermarkColumn = "xmin" - s.NoError(err) + assert.NoError(t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_s3_int" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -253,19 +255,19 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( sfPeer, "", ) - s.NoError(err) + assert.NoError(t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) }