diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index b0e990ac2a..df3a7de13f 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -7,74 +7,106 @@ import ( "testing" "time" - "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) type PostgresReplicationSnapshotTestSuite struct { - t *testing.T - + suite.Suite connector *PostgresConnector - schema string } -func setupSuite(t *testing.T) PostgresReplicationSnapshotTestSuite { - t.Helper() - - connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ +func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() { + var err error + suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }, true) - require.NoError(t, err) + require.NoError(suite.T(), err) - setupTx, err := connector.pool.Begin(context.Background()) - require.NoError(t, err) + setupTx, err := suite.connector.pool.Begin(context.Background()) + require.NoError(suite.T(), err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - require.NoError(t, err) + require.NoError(suite.T(), err) } }() - schema := "repltest_" + shared.RandomString(8) - - _, err = setupTx.Exec(context.Background(), - fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema)) - require.NoError(t, err) + _, err = setupTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_repl_test CASCADE") + require.NoError(suite.T(), err) - _, err = setupTx.Exec(context.Background(), - fmt.Sprintf("CREATE SCHEMA %s", schema)) - require.NoError(t, err) + _, err = setupTx.Exec(context.Background(), "CREATE SCHEMA pgpeer_repl_test") + require.NoError(suite.T(), err) - // setup 3 tables test_1, test_2, test_3 - // all have 5 text columns c1, c2, c3, c4, c5 + // setup 3 tables in pgpeer_repl_test schema + // test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5 tables := []string{"test_1", "test_2", "test_3"} for _, table := range tables { _, err = setupTx.Exec(context.Background(), - fmt.Sprintf("CREATE TABLE %s.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", schema, table)) - require.NoError(t, err) + fmt.Sprintf("CREATE TABLE pgpeer_repl_test.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", table)) + require.NoError(suite.T(), err) } err = setupTx.Commit(context.Background()) - require.NoError(t, err) + require.NoError(suite.T(), err) +} + +func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() { + teardownTx, err := suite.connector.pool.Begin(context.Background()) + require.NoError(suite.T(), err) + defer func() { + err := teardownTx.Rollback(context.Background()) + if err != pgx.ErrTxClosed { + require.NoError(suite.T(), err) + } + }() + + _, err = teardownTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_test CASCADE") + require.NoError(suite.T(), err) + + // Fetch all the publications + rows, err := teardownTx.Query(context.Background(), + "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) + require.NoError(suite.T(), err) + + // Iterate over the publications and drop them + for rows.Next() { + var pubname pgtype.Text + err := rows.Scan(&pubname) + require.NoError(suite.T(), err) - return PostgresReplicationSnapshotTestSuite{ - t: t, - connector: connector, - schema: schema, + // Drop the publication in a new transaction + _, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String)) + require.NoError(suite.T(), err) } + + _, err = teardownTx.Exec(context.Background(), + "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", + fmt.Sprintf("%%%s", "test_simple_slot_creation")) + require.NoError(suite.T(), err) + + err = teardownTx.Commit(context.Background()) + require.NoError(suite.T(), err) + + suite.True(suite.connector.ConnectionActive() == nil) + + err = suite.connector.Close() + require.NoError(suite.T(), err) + + suite.False(suite.connector.ConnectionActive() == nil) } -func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { +func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { tables := map[string]string{ - suite.schema + ".test_1": "test_1_dst", + "pgpeer_repl_test.test_1": "test_1_dst", } flowJobName := "test_simple_slot_creation" @@ -89,7 +121,7 @@ func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { // Moved to a go routine go func() { err := suite.connector.SetupReplication(signal, setupReplicationInput) - require.NoError(suite.t, err) + require.NoError(suite.T(), err) }() slog.Info("waiting for slot creation to complete", flowLog) @@ -104,49 +136,5 @@ func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { } func TestPostgresReplTestSuite(t *testing.T) { - e2eshared.GotSuite(t, setupSuite, func(suite PostgresReplicationSnapshotTestSuite) { - teardownTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.t, err) - defer func() { - err := teardownTx.Rollback(context.Background()) - if err != pgx.ErrTxClosed { - require.NoError(suite.t, err) - } - }() - - _, err = teardownTx.Exec(context.Background(), - fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema)) - require.NoError(suite.t, err) - - // Fetch all the publications - rows, err := teardownTx.Query(context.Background(), - "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.t, err) - - // Iterate over the publications and drop them - for rows.Next() { - var pubname pgtype.Text - err := rows.Scan(&pubname) - require.NoError(suite.t, err) - - // Drop the publication in a new transaction - _, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String)) - require.NoError(suite.t, err) - } - - _, err = teardownTx.Exec(context.Background(), - "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", - fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.t, err) - - err = teardownTx.Commit(context.Background()) - require.NoError(suite.t, err) - - require.True(suite.t, suite.connector.ConnectionActive() == nil) - - err = suite.connector.Close() - require.NoError(suite.t, err) - - require.False(suite.t, suite.connector.ConnectionActive() == nil) - }) + suite.Run(t, new(PostgresReplicationSnapshotTestSuite)) } diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 24fa43dbfc..8a919eb214 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -5,59 +5,79 @@ import ( "fmt" "testing" - "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pgx/v5" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) type PostgresSchemaDeltaTestSuite struct { - t *testing.T - + suite.Suite connector *PostgresConnector } const schemaDeltaTestSchemaName = "pgschema_delta_test" -func setupSchemaDeltaSuite(t *testing.T) PostgresSchemaDeltaTestSuite { - t.Helper() +func (suite *PostgresSchemaDeltaTestSuite) failTestError(err error) { + if err != nil { + suite.FailNow(err.Error()) + } +} - connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ +func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() { + var err error + suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }, false) - require.NoError(t, err) + suite.failTestError(err) - setupTx, err := connector.pool.Begin(context.Background()) - require.NoError(t, err) + setupTx, err := suite.connector.pool.Begin(context.Background()) + suite.failTestError(err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - require.NoError(t, err) + suite.failTestError(err) } }() _, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schemaDeltaTestSchemaName)) - require.NoError(t, err) + suite.failTestError(err) _, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schemaDeltaTestSchemaName)) - require.NoError(t, err) + suite.failTestError(err) err = setupTx.Commit(context.Background()) - require.NoError(t, err) - return PostgresSchemaDeltaTestSuite{ - t: t, - connector: connector, - } + suite.failTestError(err) +} + +func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() { + teardownTx, err := suite.connector.pool.Begin(context.Background()) + suite.failTestError(err) + defer func() { + err := teardownTx.Rollback(context.Background()) + if err != pgx.ErrTxClosed { + suite.failTestError(err) + } + }() + _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", + schemaDeltaTestSchemaName)) + suite.failTestError(err) + err = teardownTx.Commit(context.Background()) + suite.failTestError(err) + + suite.True(suite.connector.ConnectionActive() == nil) + err = suite.connector.Close() + suite.failTestError(err) + suite.False(suite.connector.ConnectionActive() == nil) } -func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { +func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.simple_add_column", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - require.NoError(suite.t, err) + suite.failTestError(err) err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, @@ -67,13 +87,13 @@ func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { ColumnType: string(qvalue.QValueKindInt64), }}, }}) - require.NoError(suite.t, err) + suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - require.NoError(suite.t, err) - require.Equal(suite.t, &protos.TableSchema{ + suite.failTestError(err) + suite.Equal(&protos.TableSchema{ TableIdentifier: tableName, Columns: map[string]string{ "id": string(qvalue.QValueKindInt32), @@ -83,11 +103,11 @@ func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { }, output.TableNameSchemaMapping[tableName]) } -func (suite PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { +func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.add_drop_all_column_types", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - require.NoError(suite.t, err) + suite.failTestError(err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -128,20 +148,20 @@ func (suite PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { DstTableName: tableName, AddedColumns: addedColumns, }}) - require.NoError(suite.t, err) + suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - require.NoError(suite.t, err) - require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) + suite.failTestError(err) + suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { +func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - require.NoError(suite.t, err) + suite.failTestError(err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -174,20 +194,20 @@ func (suite PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - require.NoError(suite.t, err) + suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - require.NoError(suite.t, err) - require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) + suite.failTestError(err) + suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { +func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName)) - require.NoError(suite.t, err) + suite.failTestError(err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -214,34 +234,15 @@ func (suite PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - require.NoError(suite.t, err) + suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - require.NoError(suite.t, err) - require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) + suite.failTestError(err) + suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { - e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(suite PostgresSchemaDeltaTestSuite) { - teardownTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.t, err) - defer func() { - err := teardownTx.Rollback(context.Background()) - if err != pgx.ErrTxClosed { - require.NoError(suite.t, err) - } - }() - _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", - schemaDeltaTestSchemaName)) - require.NoError(suite.t, err) - err = teardownTx.Commit(context.Background()) - require.NoError(suite.t, err) - - require.True(suite.t, suite.connector.ConnectionActive() == nil) - err = suite.connector.Close() - require.NoError(suite.t, err) - require.False(suite.t, suite.connector.ConnectionActive() == nil) - }) + suite.Run(t, new(PostgresSchemaDeltaTestSuite)) } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 09f39622ab..cc0352fae3 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -18,9 +18,11 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" + "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteBQ struct { + got.G t *testing.T bqSuffix string @@ -33,13 +35,13 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { slog.Error("failed to tear down postgres", slog.Any("error", err)) - s.t.FailNow() + s.FailNow() } err = s.bqHelper.DropDataset(s.bqHelper.datasetName) if err != nil { slog.Error("failed to tear down bigquery", slog.Any("error", err)) - s.t.FailNow() + s.FailNow() } }) } @@ -116,7 +118,7 @@ func setupBigQuery(t *testing.T) *BigQueryTestHelper { } // Implement SetupAllSuite interface to setup the test suite -func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { +func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { t.Helper() err := godotenv.Load() @@ -126,9 +128,9 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { slog.Info("Unable to load .env file, using default values from env") } - suffix := strings.ToLower(shared.RandomString(8)) + suffix := shared.RandomString(8) tsSuffix := time.Now().Format("20060102150405") - bqSuffix := fmt.Sprintf("bq_%s_%s", suffix, tsSuffix) + bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) pool, err := e2e.SetupPostgres(bqSuffix) if err != nil || pool == nil { slog.Error("failed to setup postgres", slog.Any("error", err)) @@ -138,6 +140,7 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { bq := setupBigQuery(t) return PeerFlowE2ETestSuiteBQ{ + G: g, t: t, bqSuffix: bqSuffix, pool: pool, @@ -146,7 +149,7 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { } func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. @@ -158,7 +161,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) // Verify workflow completes - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err := env.GetWorkflowError() // assert that error contains "invalid connection configs" @@ -168,7 +171,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { } func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_no_data") @@ -202,7 +205,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -212,7 +215,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_char_coltype") @@ -246,7 +249,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -259,7 +262,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") @@ -309,7 +312,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -317,7 +320,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { count, err := s.bqHelper.countRows(dstTableName) require.NoError(s.t, err) - require.Equal(s.t, 10, count) + s.Equal(10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side @@ -326,7 +329,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_1") @@ -383,7 +386,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -394,7 +397,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_2") @@ -446,7 +449,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -458,7 +461,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_3") @@ -521,7 +524,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -532,7 +535,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_4") @@ -588,7 +591,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -599,7 +602,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_5") @@ -655,7 +658,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -666,7 +669,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_types_bq") @@ -723,7 +726,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -739,13 +742,13 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { s.t.Log(err) } // Make sure that there are no nulls - require.True(s.t, noNulls) + s.True(noNulls) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -799,15 +802,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { count2, err := s.bqHelper.countRows(dstTable2Name) require.NoError(s.t, err) - require.Equal(s.t, 1, count1) - require.Equal(s.t, 1, count2) + s.Equal(1, count1) + s.Equal(1, count2) env.AssertExpectations(s.t) } // TODO: not checking schema exactly, add later func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -897,7 +900,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -907,7 +910,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -968,7 +971,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -980,7 +983,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1044,7 +1047,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1057,7 +1060,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1117,7 +1120,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1130,7 +1133,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") @@ -1180,7 +1183,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1193,7 +1196,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -1251,8 +1254,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name) require.NoError(s.t, err) - require.Equal(s.t, 1, count1) - require.Equal(s.t, 1, count2) + s.Equal(1, count1) + s.Equal(1, count2) err = s.bqHelper.DropDataset(secondDataset) require.NoError(s.t, err) @@ -1261,7 +1264,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1331,7 +1334,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1345,11 +1348,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, int64(1), numNewRows) + s.Eq(1, numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1417,7 +1420,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1429,11 +1432,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, int64(1), numNewRows) + s.Eq(1, numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1505,7 +1508,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1517,11 +1520,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, int64(1), numNewRows) + s.Eq(1, numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") @@ -1581,7 +1584,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1593,5 +1596,5 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, numNewRows, int64(0)) + s.Eq(0, numNewRows) } diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index f75200bf13..def151071c 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -47,7 +47,7 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr } func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -71,7 +71,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -82,7 +82,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { } func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -105,7 +105,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index eb97430561..df1ff17c13 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -55,7 +55,6 @@ func cleanPostgres(pool *pgxpool.Pool, suffix string) error { if err != nil { return fmt.Errorf("failed to list publications: %w", err) } - defer rows.Close() // drop all publications with the given suffix for rows.Next() { @@ -136,8 +135,15 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { } func TearDownPostgres(pool *pgxpool.Pool, suffix string) error { - err := cleanPostgres(pool, suffix) - return err + // drop the e2e_test schema + if pool != nil { + err := cleanPostgres(pool, suffix) + if err != nil { + return err + } + pool.Close() + } + return nil } // GeneratePostgresPeer generates a postgres peer config for testing. diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 06095b192e..53658bf6ff 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -9,18 +9,17 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgtype" - "github.com/stretchr/testify/require" ) -func (s PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) +func (s *PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", postgresSuffix, tableName) } -func (s PeerFlowE2ETestSuitePG) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s.suffix) +func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, postgresSuffix) } -func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { +func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { query := fmt.Sprintf(`SELECT "_PEERDB_IS_DELETED","_PEERDB_SYNCED_AT" FROM %s WHERE id = %d`, dstSchemaQualified, rowID) var isDeleted pgtype.Bool @@ -41,9 +40,9 @@ func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, ro return nil } -func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) +func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_simple_flow") dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") @@ -55,7 +54,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { value TEXT NOT NULL ); `, srcTableName)) - require.NoError(s.t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), @@ -65,7 +64,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -83,7 +82,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(s.t, err) + s.NoError(err) } s.T().Log("Inserted 10 rows into the source table") }() @@ -91,22 +90,22 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - require.Error(s.t, err) - require.Contains(s.t, err.Error(), "continue as new") + s.Error(err) + s.Contains(err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,key,value") - require.NoError(s.t, err) + s.NoError(err) - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } -func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) +func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") @@ -117,7 +116,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { c1 BIGINT ); `, srcTableName)) - require.NoError(s.t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -127,7 +126,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -141,8 +140,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - require.NoError(s.t, err) - s.t.Println("Inserted initial row in the source table") + s.NoError(err) + s.T().Log("Inserted initial row in the source table") // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) @@ -157,20 +156,20 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - require.NoError(s.t, err) + s.NoError(err) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - require.NoError(s.t, err) - s.t.Log("Altered source table, added column c2") + s.NoError(err) + s.T().Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - require.NoError(s.t, err) - s.t.Log("Inserted row with added c2 in the source table") + s.NoError(err) + s.T().Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) @@ -186,20 +185,20 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") - require.NoError(s.t, err) + s.NoError(err) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - require.NoError(s.t, err) - s.t.Log("Altered source table, dropped column c2 and added column c3") + s.NoError(err) + s.T().Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - require.NoError(s.t, err) - s.t.Log("Inserted row with added c3 in the source table") + s.NoError(err) + s.T().Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) @@ -216,20 +215,20 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") - require.NoError(s.t, err) + s.NoError(err) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - require.NoError(s.t, err) - s.t.Log("Altered source table, dropped column c3") + s.NoError(err) + s.T().Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - require.NoError(s.t, err) - s.t.Log("Inserted row after dropping all columns in the source table") + s.NoError(err) + s.T().Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) @@ -246,28 +245,28 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.NoError(err) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - require.NoError(s.t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - require.Error(s.t, err) - require.Contains(s.t, err.Error(), "continue as new") + s.Error(err) + s.Contains(err.Error(), "continue as new") - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } -func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) +func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst") @@ -281,7 +280,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { PRIMARY KEY(id,t) ); `, srcTableName)) - require.NoError(s.t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -291,7 +290,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -308,41 +307,41 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - require.NoError(s.t, err) + s.NoError(err) } s.T().Log("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - require.NoError(s.t, err) + s.NoError(err) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - require.Error(s.t, err) - require.Contains(s.t, err.Error(), "continue as new") + s.Error(err) + s.Contains(err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - require.NoError(s.t, err) + s.NoError(err) - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } -func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) +func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst") @@ -360,7 +359,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - require.NoError(s.t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -370,7 +369,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -382,7 +381,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - require.NoError(s.t, err) + s.NoError(err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -390,40 +389,40 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(s.t, err) + s.NoError(err) } s.T().Log("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + s.NoError(err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + s.NoError(err) err = rowsTx.Commit(context.Background()) - require.NoError(s.t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - require.Error(s.t, err) - require.Contains(s.t, err.Error(), "continue as new") + s.Error(err) + s.Contains(err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - require.NoError(s.t, err) + s.NoError(err) - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } -func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) +func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") @@ -441,7 +440,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - require.NoError(s.t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -451,7 +450,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -469,38 +468,38 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - require.NoError(s.t, err) + s.NoError(err) } s.T().Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - require.NoError(s.t, err) + s.NoError(err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - require.NoError(s.t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - require.Error(s.t, err) - require.Contains(s.t, err.Error(), "continue as new") + s.Error(err) + s.Contains(err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - require.NoError(s.t, err) + s.NoError(err) - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } -func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) +func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") @@ -512,7 +511,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { value TEXT NOT NULL ); `, srcTableName)) - require.NoError(s.t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), @@ -523,7 +522,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -538,26 +537,26 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - require.NoError(s.t, err) + s.NoError(err) // delete that row _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1 `, srcTableName)) - require.NoError(s.t, err) - s.t.Log("Inserted and deleted a row for peerdb column check") + s.NoError(err) + s.T().Log("Inserted and deleted a row for peerdb column check") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - require.Error(s.t, err) - require.Contains(s.t, err.Error(), "continue as new") + s.Error(err) + s.Contains(err.Error(), "continue as new") checkErr := s.checkPeerdbColumns(dstTableName, 1) - require.NoError(s.t, checkErr) - env.AssertExpectations(s.t) + s.NoError(checkErr) + env.AssertExpectations(s.T()) } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index fbd90e8fc7..d408451939 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -9,36 +9,31 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" - "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" ) +const postgresSuffix = "postgres" + type PeerFlowE2ETestSuitePG struct { - t *testing.T + suite.Suite + testsuite.WorkflowTestSuite pool *pgxpool.Pool peer *protos.Peer connector *connpostgres.PostgresConnector - suffix string } func TestPeerFlowE2ETestSuitePG(t *testing.T) { - e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuitePG) { - err := e2e.TearDownPostgres(s.pool, s.suffix) - if err != nil { - s.t.Fatal("failed to drop Postgres schema", err) - } - }) + suite.Run(t, new(PeerFlowE2ETestSuitePG)) } -func setupSuite(t *testing.T) PeerFlowE2ETestSuitePG { - t.Helper() - +// Implement SetupAllSuite interface to setup the test suite +func (s *PeerFlowE2ETestSuitePG) SetupSuite() { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -46,15 +41,14 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuitePG { slog.Info("Unable to load .env file, using default values from env") } - suffix := "pgtest_" + strings.ToLower(shared.RandomString(8)) - - pool, err := e2e.SetupPostgres(suffix) + pool, err := e2e.SetupPostgres(postgresSuffix) if err != nil || pool == nil { - t.Fatal("failed to setup postgres", err) + s.Fail("failed to setup postgres", err) } - peer := generatePGPeer(e2e.GetTestPostgresConf()) + s.pool = pool + s.peer = generatePGPeer(e2e.GetTestPostgresConf()) - connector, err := connpostgres.NewPostgresConnector(context.Background(), + s.connector, err = connpostgres.NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, @@ -62,24 +56,25 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuitePG { Password: "postgres", Database: "postgres", }, false) - require.NoError(t, err) - return PeerFlowE2ETestSuitePG{ - t: t, - pool: pool, - peer: peer, - connector: connector, - suffix: suffix, + s.NoError(err) +} + +// Implement TearDownAllSuite interface to tear down the test suite +func (s *PeerFlowE2ETestSuitePG) TearDownSuite() { + err := e2e.TearDownPostgres(s.pool, postgresSuffix) + if err != nil { + s.Fail("failed to drop Postgres schema", err) } } -func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) - require.NoError(s.t, err) - err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) - require.NoError(s.t, err) +func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, postgresSuffix, tableName) + s.NoError(err) + err = e2e.PopulateSourceTable(s.pool, postgresSuffix, tableName, rowCount) + s.NoError(err) } -func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { // Execute the two EXCEPT queries for { err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector) @@ -109,7 +104,7 @@ func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQua return nil } -func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) rows, err := s.pool.Query(context.Background(), query) @@ -143,7 +138,7 @@ func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualif return nil } -func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { +func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) rows, _ := s.pool.Query(context.Background(), query) @@ -164,9 +159,9 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { return rows.Err() } -func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) +func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) numRows := 10 @@ -175,14 +170,14 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { dstTable := "test_qrep_flow_avro_pg_2" - err := e2e.CreateTableForQRep(s.pool, s.suffix, dstTable) - require.NoError(s.t, err) + err := e2e.CreateTableForQRep(s.pool, postgresSuffix, dstTable) + s.NoError(err) - srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) - dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - s.suffix, srcTable) + postgresSuffix, srcTable) postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) @@ -196,27 +191,27 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { true, "", ) - require.NoError(s.t, err) + s.NoError(err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - require.NoError(s.t, err) + s.NoError(err) err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*") if err != nil { - require.FailNow(s.t, err.Error()) + s.FailNow(err.Error()) } - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } -func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) +func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) numRows := 10 @@ -225,11 +220,11 @@ func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_P dstTable := "test_qrep_columns_pg_2" - srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) - dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - s.suffix, srcTable) + postgresSuffix, srcTable) postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) @@ -243,20 +238,20 @@ func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_P true, "_PEERDB_SYNCED_AT", ) - require.NoError(s.t, err) + s.NoError(err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - require.NoError(s.t, err) + s.NoError(err) err = s.checkSyncedAt(dstSchemaQualified) if err != nil { - require.FailNow(s.t, err.Error()) + s.FailNow(err.Error()) } - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 25b8e8e3ae..f28161b97b 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -10,23 +10,22 @@ import ( "github.com/stretchr/testify/require" ) -func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) +func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName) } -func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s.suffix) +func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s3Suffix) } -func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) +func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) - helper, setupErr := setupS3("s3") + setupErr := s.setupS3("s3") if setupErr != nil { - require.Fail(s.t, "failed to setup S3", setupErr) + s.Fail("failed to setup S3", setupErr) } - s.s3Helper = helper srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") @@ -38,7 +37,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { value TEXT NOT NULL ); `, srcTableName)) - require.NoError(s.t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -47,7 +46,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -57,49 +56,48 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - require.NoError(s.t, err) + s.NoError(err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), - fmt.Sprintf("INSERT INTO %s (key, value) VALUES ($1, $2)", srcTableName), testKey, testValue) - require.NoError(s.t, err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + s.NoError(err) } - require.NoError(s.t, err) + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - require.Error(s.t, err) - require.Contains(s.t, err.Error(), "continue as new") + s.Error(err) + s.Contains(err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - s.t.Logf("JobName: %s", flowJobName) + s.T().Logf("JobName: %s", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.t.Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) - require.NoError(s.t, err) + s.T().Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) + require.NoError(s.T(), err) - require.Equal(s.t, 4, len(files)) + require.Equal(s.T(), 4, len(files)) - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } -func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) - - helper, setupErr := setupS3("gcs") +func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) + setupErr := s.setupS3("gcs") if setupErr != nil { - require.Fail(s.t, "failed to setup S3", setupErr) + s.Fail("failed to setup S3", setupErr) } - s.s3Helper = helper srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") @@ -111,7 +109,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { value TEXT NOT NULL ); `, srcTableName)) - require.NoError(s.t, err) + s.NoError(err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -120,7 +118,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - require.NoError(s.t, err) + s.NoError(err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -130,37 +128,38 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - require.NoError(s.t, err) + s.NoError(err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), - fmt.Sprintf("INSERT INTO %s (key, value) VALUES ($1, $2)", srcTableName), testKey, testValue) - require.NoError(s.t, err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s (key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + s.NoError(err) } - s.t.Log("Inserted 20 rows into the source table") - require.NoError(s.t, err) + s.T().Log("Inserted 20 rows into the source table") + s.NoError(err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - require.Error(s.t, err) - require.Contains(s.t, err.Error(), "continue as new") + s.Error(err) + s.Contains(err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - s.t.Logf("JobName: %s", flowJobName) + s.T().Logf("JobName: %s", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.t.Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) - require.NoError(s.t, err) + s.T().Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) + require.NoError(s.T(), err) - require.Equal(s.t, 4, len(files)) + require.Equal(s.T(), 4, len(files)) - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 7dcb05f840..4fc4f7bf78 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -4,56 +4,53 @@ import ( "context" "fmt" "log/slog" - "strings" "testing" "time" "github.com/PeerDB-io/peer-flow/e2e" - "github.com/PeerDB-io/peer-flow/e2eshared" - "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" ) +const s3Suffix = "s3" + type PeerFlowE2ETestSuiteS3 struct { - t *testing.T + suite.Suite + testsuite.WorkflowTestSuite pool *pgxpool.Pool s3Helper *S3TestHelper - suffix string } func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteS3) { - err := e2e.TearDownPostgres(s.pool, s.suffix) - if err != nil { - require.Fail(s.t, "failed to drop Postgres schema", err) - } - - if s.s3Helper != nil { - err = s.s3Helper.CleanUp() - if err != nil { - require.Fail(s.t, "failed to clean up s3", err) - } - } - }) + suite.Run(t, new(PeerFlowE2ETestSuiteS3)) } -func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) - require.NoError(s.t, err) - err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) - require.NoError(s.t, err) +func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, s3Suffix, tableName) + s.NoError(err) + err = e2e.PopulateSourceTable(s.pool, s3Suffix, tableName, rowCount) + s.NoError(err) } -func setupS3(mode string) (*S3TestHelper, error) { - return NewS3TestHelper(mode == "gcs") -} +func (s *PeerFlowE2ETestSuiteS3) setupS3(mode string) error { + switchToGCS := false + if mode == "gcs" { + switchToGCS = true + } + helper, err := NewS3TestHelper(switchToGCS) + if err != nil { + return err + } -func setupSuite(t *testing.T) PeerFlowE2ETestSuiteS3 { - t.Helper() + s.s3Helper = helper + return nil +} +func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -61,35 +58,43 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteS3 { slog.Info("Unable to load .env file, using default values from env") } - suffix := "s3_" + strings.ToLower(shared.RandomString(8)) - pool, err := e2e.SetupPostgres(suffix) + pool, err := e2e.SetupPostgres(s3Suffix) if err != nil || pool == nil { - require.Fail(t, "failed to setup postgres", err) + s.Fail("failed to setup postgres", err) + } + s.pool = pool + + err = s.setupS3("s3") + if err != nil { + s.Fail("failed to setup S3", err) } +} - helper, err := setupS3("s3") +// Implement TearDownAllSuite interface to tear down the test suite +func (s *PeerFlowE2ETestSuiteS3) TearDownSuite() { + err := e2e.TearDownPostgres(s.pool, s3Suffix) if err != nil { - require.Fail(t, "failed to setup S3", err) + s.Fail("failed to drop Postgres schema", err) } - return PeerFlowE2ETestSuiteS3{ - t: t, - pool: pool, - s3Helper: helper, - suffix: suffix, + if s.s3Helper != nil { + err = s.s3Helper.CleanUp() + if err != nil { + s.Fail("failed to clean up s3", err) + } } } -func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { +func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { - s.t.Skip("Skipping S3 test") + s.T().Skip("Skipping S3 test") } - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) jobName := "test_complete_flow_s3" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) s.setupSourceTable(jobName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", @@ -104,16 +109,16 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { false, "", ) - require.NoError(s.t, err) + s.NoError(err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - require.NoError(s.t, err) + s.NoError(err) // Verify destination has 1 file // make context with timeout @@ -122,23 +127,23 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.t, err) + require.NoError(s.T(), err) - require.Equal(s.t, 1, len(files)) + require.Equal(s.T(), 1, len(files)) - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } -func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { +func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { if s.s3Helper == nil { - s.t.Skip("Skipping S3 test") + s.T().Skip("Skipping S3 test") } - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) - e2e.RegisterWorkflowsAndActivities(s.t, env) + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.T(), env) jobName := "test_complete_flow_s3_ctid" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) s.setupSourceTable(jobName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) @@ -152,7 +157,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { false, "", ) - require.NoError(s.t, err) + s.NoError(err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url qrepConfig.NumRowsPerPartition = 2000 qrepConfig.InitialCopyOnly = true @@ -161,10 +166,10 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() - require.NoError(s.t, err) + s.NoError(err) // Verify destination has 1 file // make context with timeout @@ -173,9 +178,9 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.t, err) + require.NoError(s.T(), err) - require.Equal(s.t, 10, len(files)) + require.Equal(s.T(), 10, len(files)) - env.AssertExpectations(s.t) + env.AssertExpectations(s.T()) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 0d1c3e1d20..314c92d2b5 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/ysmood/got" + connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" @@ -22,6 +24,7 @@ import ( ) type PeerFlowE2ETestSuiteSF struct { + got.G t *testing.T pgSuffix string @@ -31,25 +34,25 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteSF) { + e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { slog.Error("failed to tear down Postgres", slog.Any("error", err)) - s.t.FailNow() + s.FailNow() } if s.sfHelper != nil { err = s.sfHelper.Cleanup() if err != nil { slog.Error("failed to tear down Snowflake", slog.Any("error", err)) - s.t.FailNow() + s.FailNow() } } err = s.connector.Close() if err != nil { slog.Error("failed to close Snowflake connector", slog.Any("error", err)) - s.t.FailNow() + s.FailNow() } }) } @@ -62,7 +65,7 @@ func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.pgSuffix) } -func setupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { +func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { t.Helper() err := godotenv.Load() @@ -72,20 +75,20 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { slog.Info("Unable to load .env file, using default values from env") } - suffix := strings.ToLower(shared.RandomString(8)) + suffix := shared.RandomString(8) tsSuffix := time.Now().Format("20060102150405") pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) pool, err := e2e.SetupPostgres(pgSuffix) if err != nil || pool == nil { slog.Error("failed to setup Postgres", slog.Any("error", err)) - t.FailNow() + g.FailNow() } sfHelper, err := NewSnowflakeTestHelper() if err != nil { slog.Error("failed to setup Snowflake", slog.Any("error", err)) - t.FailNow() + g.FailNow() } connector, err := connsnowflake.NewSnowflakeConnector( @@ -95,6 +98,7 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { require.NoError(t, err) suite := PeerFlowE2ETestSuiteSF{ + G: g, t: t, pgSuffix: pgSuffix, pool: pool, @@ -106,7 +110,7 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") @@ -155,7 +159,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -163,7 +167,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { count, err := s.sfHelper.CountRows("test_simple_flow_sf") require.NoError(s.t, err) - require.Equal(s.t, 20, count) + s.Equal(20, count) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago // it should match the count. @@ -172,7 +176,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { `, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, 20, numNewRows) + s.Equal(20, numNewRows) // TODO: verify that the data is correctly synced to the destination table // on the Snowflake side @@ -181,7 +185,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_replica_identity_no_pkey") @@ -233,7 +237,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -241,13 +245,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { count, err := s.sfHelper.CountRows("test_replica_identity_no_pkey") require.NoError(s.t, err) - require.Equal(s.t, 20, count) + s.Equal(20, count) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") @@ -309,7 +313,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -319,11 +323,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { // They should have been filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") require.NoError(s.t, err) - require.Equal(s.t, 6, lineCount) + s.Equal(6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") require.NoError(s.t, err) - require.Equal(s.t, 6, polyCount) + s.Equal(6, polyCount) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side @@ -332,7 +336,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_1") @@ -388,7 +392,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -399,7 +403,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_2") @@ -446,6 +450,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { if err != nil { slog.Error("Error executing transaction", slog.Any("error", err)) + s.FailNow() } wg.Done() @@ -454,7 +459,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -467,7 +472,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_3") @@ -529,7 +534,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -540,7 +545,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_4") @@ -595,7 +600,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -606,7 +611,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_5") @@ -661,7 +666,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -672,7 +677,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_types_sf") @@ -729,7 +734,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -745,13 +750,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { s.t.Log(err) } // Make sure that there are no nulls - require.Equal(s.t, noNulls, true) + s.Equal(noNulls, true) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_sf") @@ -795,7 +800,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") @@ -803,14 +808,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { count2, err := s.sfHelper.CountRows("test2_sf") require.NoError(s.t, err) - require.Equal(s.t, 1, count1) - require.Equal(s.t, 1, count2) + s.Equal(1, count1) + s.Equal(1, count2) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -864,7 +869,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") // alter source table, add column c2 and insert another row. @@ -893,7 +898,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. @@ -923,7 +928,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. @@ -953,14 +958,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -970,7 +975,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -1030,7 +1035,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1043,7 +1048,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1106,7 +1111,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1119,7 +1124,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1178,7 +1183,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1191,7 +1196,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_exclude_sf") @@ -1257,7 +1262,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1269,12 +1274,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { for _, field := range sfRows.Schema.Fields { require.NotEqual(s.t, field.Name, "c2") } - require.Equal(s.t, 5, len(sfRows.Schema.Fields)) - require.Equal(s.t, 10, len(sfRows.Records)) + s.Equal(5, len(sfRows.Schema.Fields)) + s.Equal(10, len(sfRows.Records)) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1344,7 +1349,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1361,7 +1366,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1429,7 +1434,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1440,11 +1445,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, 1, numNewRows) + s.Equal(1, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1516,7 +1521,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1527,11 +1532,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, 1, numNewRows) + s.Equal(1, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") @@ -1591,7 +1596,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1602,7 +1607,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - require.Equal(s.t, 0, numNewRows) + s.Equal(0, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 5b150a7bd3..f45a78d7c1 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -52,7 +52,7 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableNam } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -81,7 +81,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -93,7 +93,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -126,7 +126,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -138,7 +138,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -168,7 +168,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -180,7 +180,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -214,7 +214,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { e2e.RunXminFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -226,7 +226,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -260,7 +260,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -272,7 +272,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() } func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -305,7 +305,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index 5b46e125dd..693062e39d 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -10,19 +10,30 @@ import ( "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/stretchr/testify/require" + "github.com/ysmood/got" ) const schemaDeltaTestSchemaName = "PUBLIC" type SnowflakeSchemaDeltaTestSuite struct { + got.G t *testing.T connector *connsnowflake.SnowflakeConnector sfTestHelper *SnowflakeTestHelper } -func setupSchemaDeltaSuite(t *testing.T) SnowflakeSchemaDeltaTestSuite { +func (suite SnowflakeSchemaDeltaTestSuite) failTestError(err error) { + if err != nil { + slog.Error("Error in test", slog.Any("error", err)) + suite.FailNow() + } +} + +func setupSchemaDeltaSuite( + t *testing.T, + g got.G, +) SnowflakeSchemaDeltaTestSuite { t.Helper() sfTestHelper, err := NewSnowflakeTestHelper() @@ -41,6 +52,7 @@ func setupSchemaDeltaSuite(t *testing.T) SnowflakeSchemaDeltaTestSuite { } return SnowflakeSchemaDeltaTestSuite{ + G: g, t: t, connector: connector, sfTestHelper: sfTestHelper, @@ -50,7 +62,7 @@ func setupSchemaDeltaSuite(t *testing.T) SnowflakeSchemaDeltaTestSuite { func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.SIMPLE_ADD_COLUMN", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) - require.NoError(suite.t, err) + suite.failTestError(err) err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, @@ -60,13 +72,13 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { ColumnType: string(qvalue.QValueKindJSON), }}, }}) - require.NoError(suite.t, err) + suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - require.NoError(suite.t, err) - require.Equal(suite.t, &protos.TableSchema{ + suite.failTestError(err) + suite.Equal(&protos.TableSchema{ TableIdentifier: tableName, Columns: map[string]string{ "ID": string(qvalue.QValueKindString), @@ -78,7 +90,7 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { func (suite SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.ADD_DROP_ALL_COLUMN_TYPES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) - require.NoError(suite.t, err) + suite.failTestError(err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -112,19 +124,19 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { DstTableName: tableName, AddedColumns: addedColumns, }}) - require.NoError(suite.t, err) + suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - require.NoError(suite.t, err) - require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) + suite.failTestError(err) + suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func (suite SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_TRICKY_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(id TEXT PRIMARY KEY)", tableName)) - require.NoError(suite.t, err) + suite.failTestError(err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -156,19 +168,19 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - require.NoError(suite.t, err) + suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - require.NoError(suite.t, err) - require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) + suite.failTestError(err) + suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_WHITESPACE_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(\" \" TEXT PRIMARY KEY)", tableName)) - require.NoError(suite.t, err) + suite.failTestError(err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -194,18 +206,18 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - require.NoError(suite.t, err) + suite.failTestError(err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - require.NoError(suite.t, err) - require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) + suite.failTestError(err) + suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(suite SnowflakeSchemaDeltaTestSuite) { - require.NoError(suite.t, suite.sfTestHelper.Cleanup()) - require.NoError(suite.t, suite.connector.Close()) + suite.failTestError(suite.sfTestHelper.Cleanup()) + suite.failTestError(suite.connector.Close()) }) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index e21e7c5979..aeb622d559 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -19,9 +19,11 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" + "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteSQLServer struct { + got.G t *testing.T pool *pgxpool.Pool @@ -129,7 +131,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( s.t.Skip("Skipping SQL Server test") } - env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -164,7 +166,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - require.True(s.t, env.IsWorkflowCompleted()) + s.True(env.IsWorkflowCompleted()) err := env.GetWorkflowError() require.NoError(s.t, err) @@ -175,5 +177,5 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( err = s.pool.QueryRow(context.Background(), countQuery).Scan(&numRowsInDest) require.NoError(s.t, err) - require.Equal(s.t, numRows, int(numRowsInDest.Int64)) + s.Equal(numRows, int(numRowsInDest.Int64)) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 6668bfae7d..4674667b97 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log/slog" + "os" "strings" "testing" "time" @@ -370,26 +371,16 @@ func GetOwnersSelectorStringsSF() [2]string { return [2]string{strings.Join(pgFields, ","), strings.Join(sfFields, ",")} } -type tlogWriter struct { - t *testing.T -} - -func (iw tlogWriter) Write(p []byte) (n int, err error) { - iw.t.Log(string(p)) - return len(p), nil -} - -func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnvironment { - t.Helper() +func NewTemporalTestWorkflowEnvironment() *testsuite.TestWorkflowEnvironment { + testSuite := &testsuite.WorkflowTestSuite{} logger := slog.New(logger.NewHandler( slog.NewJSONHandler( - tlogWriter{t}, + os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn}, ))) tLogger := NewTStructuredLogger(*logger) - testSuite := &testsuite.WorkflowTestSuite{} testSuite.SetLogger(tLogger) return testSuite.NewTestWorkflowEnvironment() } diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 0cf5445190..e235536ade 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -11,14 +11,15 @@ import ( "github.com/ysmood/got" ) -func GotSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) { +func GotSuite[T any](t *testing.T, setup func(t *testing.T, g got.G) T, teardown func(T)) { t.Helper() got.Each(t, func(t *testing.T) T { t.Helper() - t.Parallel() - suite := setup(t) - t.Cleanup(func() { + g := got.New(t) + g.Parallel() + suite := setup(t, g) + g.Cleanup(func() { teardown(suite) }) return suite