diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 54887adeac..484b087026 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -93,7 +93,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 4 ./... -timeout 2400s + gotestsum --format testname -- -p 4 ./e2e/snowflake/... -timeout 2400s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 9f92496eb3..266783af91 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -14,14 +14,11 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/assert" "go.temporal.io/sdk/testsuite" ) -const snowflakeSuffix = "snowflake" - type PeerFlowE2ETestSuiteSF struct { - suite.Suite testsuite.WorkflowTestSuite pool *pgxpool.Pool @@ -30,15 +27,15 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteSF)) + e2e.RunSuite[*PeerFlowE2ETestSuiteSF](t) } -func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tableName) +func (s *PeerFlowE2ETestSuiteSF) attachSchemaSuffix(t *testing.T, tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", e2e.TestIdent(t), tableName) } func (s *PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, snowflakeSuffix) + return fmt.Sprintf("%s_%s", input, "snowflake") } // setupSnowflake sets up the snowflake connection. @@ -53,7 +50,7 @@ func (s *PeerFlowE2ETestSuiteSF) setupSnowflake() error { return nil } -func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { +func (s *PeerFlowE2ETestSuiteSF) SetupSuite(t *testing.T) error { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -63,45 +60,56 @@ func (s *PeerFlowE2ETestSuiteSF) SetupSuite() { log.SetReportCaller(true) - pool, err := e2e.SetupPostgres(snowflakeSuffix) + suffix := e2e.TestIdent(t) + pool, err := e2e.SetupPostgres(suffix) if err != nil { - s.Fail("failed to setup postgres", err) + t.Error("failed to setup postgres", err) + return err } s.pool = pool err = s.setupSnowflake() if err != nil { - s.Fail("failed to setup snowflake", err) + t.Error("failed to setup snowflake", err) + return err } s.connector, err = connsnowflake.NewSnowflakeConnector(context.Background(), s.sfHelper.Config) - s.NoError(err) + if err != nil { + t.Error(err) + return err + } + + return nil } // Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteSF) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, snowflakeSuffix) +func (s *PeerFlowE2ETestSuiteSF) TearDownSuite(t *testing.T) { + suffix := e2e.TestIdent(t) + err := e2e.TearDownPostgres(s.pool, suffix) if err != nil { - s.Fail("failed to drop Postgres schema", err) + t.Error("failed to drop Postgres schema", err) } if s.sfHelper != nil { err = s.sfHelper.Cleanup() if err != nil { - s.Fail("failed to clean up Snowflake", err) + t.Error("failed to clean up Snowflake", err) } } err = s.connector.Close() - s.NoError(err) + if err != nil { + t.Error(err) + } } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") + srcTableName := s.attachSchemaSuffix(t, "test_simple_flow_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -111,7 +119,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -120,7 +128,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -138,7 +146,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -146,28 +154,28 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf") - s.NoError(err) - s.Equal(10, count) + assert.NoError(t, err) + assert.Equal(t, 10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_simple_flow_sf_avro_cdc") + srcTableName := s.attachSchemaSuffix(t, "test_simple_flow_sf_avro_cdc") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf_avro_cdc") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -177,7 +185,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow_avro"), @@ -188,7 +196,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -206,7 +214,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") }() @@ -214,28 +222,28 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") count, err := s.sfHelper.CountRows("test_simple_flow_sf_avro_cdc") - s.NoError(err) - s.Equal(10, count) + assert.NoError(t, err) + assert.Equal(t, 10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") + srcTableName := s.attachSchemaSuffix(t, "test_invalid_geo_sf_avro_cdc") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_invalid_geo_sf_avro_cdc") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -245,7 +253,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { poly GEOGRAPHY(POLYGON) NOT NULL ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_invalid_geo_sf_avro_cdc"), @@ -256,7 +264,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -277,7 +285,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "5fc64140f2567052abc2c9bf2df9c5925fc641409394e16573c2c9bf2df9c5925fc6414049eceda9afc1c9bfdd1cc1a05fc64140fe43faedebc0"+ "c9bf4694f6065fc64140fe43faedebc0c9bfffe7305f5ec641406693d6f2ddc0c9bf1a8361d35dc64140afdb8d2b1bc3c9bf", ) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 4 invalid geography rows into the source table") for i := 4; i < 10; i++ { @@ -287,7 +295,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { "010300000001000000050000000000000000000000000000000000000000000000"+ "00000000000000000000f03f000000000000f03f000000000000f03f0000000000"+ "00f03f000000000000000000000000000000000000000000000000") - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 6 valid geography rows and 10 total rows into source") }() @@ -295,34 +303,34 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // We inserted 4 invalid shapes in each. // They should have filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") - s.NoError(err) - s.Equal(6, lineCount) + assert.NoError(t, err) + assert.Equal(t, 6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") - s.NoError(err) - s.Equal(6, polyCount) + assert.NoError(t, err) + assert.Equal(t, 6, polyCount) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_toast_sf_1") + srcTableName := s.attachSchemaSuffix(t, "test_toast_sf_1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_1") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -336,7 +344,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_1"), @@ -346,7 +354,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -361,7 +369,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { Executing a transaction which 1. changes both toast column 2. changes no toast column - 2. changes 1 toast column + 3. changes 1 toast column */ _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` BEGIN; @@ -371,29 +379,29 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_1", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_1", `id,t1,t2,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_toast_sf_2") + srcTableName := s.attachSchemaSuffix(t, "test_toast_sf_2") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_2") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -407,7 +415,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_2"), @@ -417,7 +425,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -435,29 +443,29 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { UPDATE %s SET t1='dummy' WHERE id=2; END; `, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_2", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_2", `id,t1,t2,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_toast_sf_3") + srcTableName := s.attachSchemaSuffix(t, "test_toast_sf_3") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_3") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -471,7 +479,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_3"), @@ -481,7 +489,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -512,29 +520,29 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_3", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_3", `id,t1,t2,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_toast_sf_4") + srcTableName := s.attachSchemaSuffix(t, "test_toast_sf_4") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_4") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -547,7 +555,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_4"), @@ -557,7 +565,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -582,29 +590,29 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { UPDATE %s SET k=4 WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_4", `id,t1,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_4", `id,t1,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_toast_sf_5") + srcTableName := s.attachSchemaSuffix(t, "test_toast_sf_5") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_5") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -618,7 +626,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_5"), @@ -628,7 +636,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -652,29 +660,29 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { UPDATE %s SET t2='dummy' WHERE id=1; END; `, srcTableName, srcTableName, srcTableName, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed a transaction touching toast columns") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_5", `id,t1,t2,k`, false) - env.AssertExpectations(s.T()) + s.compareTableContentsSF(t, "test_toast_sf_5", `id,t1,t2,k`, false) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_types_sf") + srcTableName := s.attachSchemaSuffix(t, "test_types_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -694,7 +702,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { VOLATILE SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -704,7 +712,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -730,19 +738,19 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -752,16 +760,16 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF() { fmt.Println("error %w", err) } // Make sure that there are no nulls - s.Equal(noNulls, true) + assert.Equal(t, noNulls, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { +func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_types_sf_avro_cdc") + srcTableName := s.attachSchemaSuffix(t, "test_types_sf_avro_cdc") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf_avro_cdc") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -781,7 +789,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { VOLATILE SET search_path = 'pg_catalog'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), @@ -792,7 +800,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -818,19 +826,19 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { 'LINESTRING(-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831)','LINESTRING(0 0, 1 1, 2 2)', 'POLYGON((-74.0060 40.7128, -73.9352 40.7306, -73.9123 40.7831, -74.0060 40.7128))'; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") noNulls, err := s.sfHelper.CheckNull("test_types_sf_avro_cdc", []string{"c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -840,17 +848,17 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Types_SF_Avro_CDC() { fmt.Println("error %w", err) } // Make sure that there are no nulls - s.Equal(noNulls, true) + assert.Equal(t, noNulls, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTable1Name := s.attachSchemaSuffix("test1_sf") - srcTable2Name := s.attachSchemaSuffix("test2_sf") + srcTable1Name := s.attachSchemaSuffix(t, "test1_sf") + srcTable2Name := s.attachSchemaSuffix(t, "test2_sf") dstTable1Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test1_sf") dstTable2Name := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test2_sf") @@ -858,7 +866,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { CREATE TABLE %s (id serial primary key, c1 int, c2 text); CREATE TABLE %s (id serial primary key, c1 int, c2 text); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table"), @@ -868,7 +876,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 1, @@ -884,32 +892,32 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { INSERT INTO %s (c1,c2) VALUES (1,'dummy_1'); INSERT INTO %s (c1,c2) VALUES (-1,'dummy_-1'); `, srcTable1Name, srcTable2Name)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Executed an insert with all types") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") - s.NoError(err) + assert.NoError(t, err) count2, err := s.sfHelper.CountRows("test2_sf") - s.NoError(err) + assert.NoError(t, err) - s.Equal(1, count1) - s.Equal(1, count2) + assert.Equal(t, 1, count1) + assert.Equal(t, 1, count2) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") + srcTableName := s.attachSchemaSuffix(t, "test_simple_schema_changes") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_schema_changes") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -918,7 +926,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -928,7 +936,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 10, @@ -942,7 +950,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted initial row in the source table") // verify we got our first row. @@ -958,18 +966,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.compareTableContentsSF(t, "test_simple_schema_changes", "id,c1", false) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -986,18 +994,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.compareTableContentsSF(t, "test_simple_schema_changes", "id,c1,c2", false) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1015,18 +1023,18 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.compareTableContentsSF(t, "test_simple_schema_changes", "id,c1,c3", false) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) + assert.NoError(t, err) fmt.Println("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. @@ -1044,29 +1052,29 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) + assert.NoError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.compareTableContentsSF(t, "test_simple_schema_changes", "id,c1", false) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_simple_cpkey") + srcTableName := s.attachSchemaSuffix(t, "test_simple_cpkey") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_cpkey") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1078,7 +1086,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -1088,7 +1096,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 5, @@ -1105,42 +1113,42 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false) + s.compareTableContentsSF(t, "test_simple_cpkey", "id,c1,c2,t", false) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + assert.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false) + s.compareTableContentsSF(t, "test_simple_cpkey", "id,c1,c2,t", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") + srcTableName := s.attachSchemaSuffix(t, "test_cpkey_toast1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast1") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1156,7 +1164,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -1166,7 +1174,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 2, @@ -1178,7 +1186,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + assert.NoError(t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -1186,41 +1194,41 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + assert.NoError(t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + assert.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_cpkey_toast1", "id,c1,c2,t,t2", false) + s.compareTableContentsSF(t, "test_cpkey_toast1", "id,c1,c2,t,t2", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") + srcTableName := s.attachSchemaSuffix(t, "test_cpkey_toast2") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast2") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -1236,7 +1244,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + assert.NoError(t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -1246,7 +1254,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + assert.NoError(t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -1264,30 +1272,30 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + assert.NoError(t, err) } fmt.Println("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + assert.NoError(t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + assert.NoError(t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + assert.Error(t, err) + assert.Contains(t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_cpkey_toast2", "id,c1,c2,t,t2", false) + s.compareTableContentsSF(t, "test_cpkey_toast2", "id,c1,c2,t,t2", false) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index cdcfaeca98..026e535810 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -3,41 +3,43 @@ package e2e_snowflake import ( "context" "fmt" + "testing" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" ) -func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateSourceTableQRep(s.pool, snowflakeSuffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, snowflakeSuffix, tableName, rowCount) - s.NoError(err) +func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(t *testing.T, tableName string, rowCount int) { + err := e2e.CreateSourceTableQRep(s.pool, e2e.TestIdent(t), tableName) + assert.NoError(t, err) + err = e2e.PopulateSourceTable(s.pool, e2e.TestIdent(t), tableName, rowCount) + assert.NoError(t, err) } -func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { +func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(t *testing.T, dstTable string) { schema := e2e.GetOwnersSchema() err := s.sfHelper.CreateTable(dstTable, schema) // fail if table creation fails if err != nil { - s.FailNow("unable to create table on snowflake", err) + assert.FailNow(t, "unable to create table on snowflake", err) } fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err) } -func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) { +func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(t *testing.T, tableName string, selector string, caseSensitive bool) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, snowflakeSuffix, tableName), + fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, e2e.TestIdent(t), tableName), ) - require.NoError(s.T(), err) + require.NoError(t, err) // read rows from destination table qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) @@ -50,70 +52,72 @@ func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, select fmt.Printf("running query on snowflake: %s\n", sfSelQuery) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) - require.NoError(s.T(), err) + require.NoError(t, err) - s.True(pgRows.Equals(sfRows), "rows from source and destination tables are not equal") + t.Logf("compare %v = %v", pgRows, sfRows) + + assert.True(t, pgRows.Equals(sfRows), "rows from source and destination tables are not equal") } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + e2e.TestIdent(t), tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf", - fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", e2e.TestIdent(t), tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, s.sfHelper.Peer, "", ) - s.NoError(err) + assert.NoError(t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_ups" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + e2e.TestIdent(t), tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf", - fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", e2e.TestIdent(t), tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, @@ -124,81 +128,81 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, UpsertKeyColumns: []string{"id"}, } - s.NoError(err) + assert.NoError(t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) // assert that error contains "invalid connection configs" err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_s3" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + e2e.TestIdent(t), tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf", - s.attachSchemaSuffix(tblName), + s.attachSchemaSuffix(t, tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, s.sfHelper.Peer, "", ) - s.NoError(err) + assert.NoError(t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_ups_xmin" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + e2e.TestIdent(t), tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf_xmin", - fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + fmt.Sprintf("e2e_test_%s.%s", e2e.TestIdent(t), tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, @@ -210,62 +214,62 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { UpsertKeyColumns: []string{"id"}, } qrepConfig.WatermarkColumn = "xmin" - s.NoError(err) + assert.NoError(t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } -func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(t *testing.T) { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) numRows := 10 tblName := "test_qrep_flow_avro_sf_s3_int" - s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) + s.setupSourceTable(t, tblName, numRows) + s.setupSFDestinationTable(t, tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - snowflakeSuffix, tblName) + e2e.TestIdent(t), tblName) sfPeer := s.sfHelper.Peer sfPeer.GetSnowflakeConfig().S3Integration = "peerdb_s3_integration" qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_sf_int", - s.attachSchemaSuffix(tblName), + s.attachSchemaSuffix(t, tblName), dstSchemaQualified, query, protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, sfPeer, "", ) - s.NoError(err) + assert.NoError(t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + assert.True(t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + assert.NoError(t, err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(t, tblName, sel, true) - env.AssertExpectations(s.T()) + env.AssertExpectations(t) } diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index d17b60f798..ba3f8c7e07 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -6,47 +6,55 @@ import ( "testing" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" + "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/assert" ) const schemaDeltaTestSchemaName = "PUBLIC" type SnowflakeSchemaDeltaTestSuite struct { - suite.Suite connector *connsnowflake.SnowflakeConnector sfTestHelper *SnowflakeTestHelper } -func (suite *SnowflakeSchemaDeltaTestSuite) failTestError(err error) { +func failTestError(t *testing.T, err error) { if err != nil { - suite.FailNow(err.Error()) + assert.FailNow(t, err.Error()) } } -func (suite *SnowflakeSchemaDeltaTestSuite) SetupSuite() { +func (suite *SnowflakeSchemaDeltaTestSuite) SetupSuite(t *testing.T) error { var err error suite.sfTestHelper, err = NewSnowflakeTestHelper() - suite.failTestError(err) + if err != nil { + t.Error(err) + return err + } suite.connector, err = connsnowflake.NewSnowflakeConnector(context.Background(), suite.sfTestHelper.Config) - suite.failTestError(err) + if err != nil { + t.Error(err) + return err + } + + return err } -func (suite *SnowflakeSchemaDeltaTestSuite) TearDownSuite() { +func (suite *SnowflakeSchemaDeltaTestSuite) TearDownSuite(t *testing.T) { err := suite.sfTestHelper.Cleanup() - suite.failTestError(err) + assert.NoError(t, err) err = suite.connector.Close() - suite.failTestError(err) + assert.NoError(t, err) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { +func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn(t *testing.T) { tableName := fmt.Sprintf("%s.SIMPLE_ADD_COLUMN", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + failTestError(t, err) err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, @@ -56,13 +64,13 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { ColumnType: string(qvalue.QValueKindJSON), }}, }}) - suite.failTestError(err) + failTestError(t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ + failTestError(t, err) + assert.Equal(t, &protos.TableSchema{ TableIdentifier: tableName, Columns: map[string]string{ "ID": string(qvalue.QValueKindString), @@ -71,10 +79,10 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { }, output.TableNameSchemaMapping[tableName]) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { +func (suite *SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes(t *testing.T) { tableName := fmt.Sprintf("%s.ADD_DROP_ALL_COLUMN_TYPES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + failTestError(t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -108,19 +116,19 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + failTestError(t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + failTestError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { +func (suite *SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames(t *testing.T) { tableName := fmt.Sprintf("%s.ADD_DROP_TRICKY_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(id TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + failTestError(t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -152,19 +160,19 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + failTestError(t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + failTestError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { +func (suite *SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames(t *testing.T) { tableName := fmt.Sprintf("%s.ADD_DROP_WHITESPACE_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(\" \" TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + failTestError(t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -190,15 +198,15 @@ func (suite *SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + failTestError(t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + failTestError(t, err) + assert.Equal(t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { - suite.Run(t, new(SnowflakeSchemaDeltaTestSuite)) + e2e.RunSuite[*SnowflakeSchemaDeltaTestSuite](t) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4f53e9a6a4..e4f19e523b 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -6,7 +6,9 @@ import ( "fmt" "io" "os" + "reflect" "strings" + "testing" "time" "github.com/PeerDB-io/peer-flow/activities" @@ -357,3 +359,32 @@ func GetOwnersSelectorString() string { } return strings.Join(fields, ",") } + +func TestIdent(t *testing.T) string { + return strings.Replace(t.Name(), "/", "_", -1) +} + +type Suite interface { + SetupSuite(*testing.T) error + TearDownSuite(*testing.T) +} + +func RunSuite[T Suite](t *testing.T) { + t.Run("group", func(t *testing.T) { + e2etype := reflect.TypeOf((*T)(nil)).Elem() + methodcount := e2etype.NumMethod() + for methodid := 0; methodid < methodcount; methodid += 1 { + method := e2etype.Method(methodid) + if strings.HasPrefix(method.Name, "Test") { + t.Run(method.Name, func(t *testing.T) { + t.Parallel() + suite := reflect.New(e2etype.Elem()).Interface().(T) + if suite.SetupSuite(t) == nil { + method.Func.Call([]reflect.Value{reflect.ValueOf(suite), reflect.ValueOf(t)}) + suite.TearDownSuite(t) + } + }) + } + } + }) +}