diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index df3a7de13f..b0e990ac2a 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -7,106 +7,74 @@ import ( "testing" "time" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" ) type PostgresReplicationSnapshotTestSuite struct { - suite.Suite + t *testing.T + connector *PostgresConnector + schema string } -func (suite *PostgresReplicationSnapshotTestSuite) SetupSuite() { - var err error - suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ +func setupSuite(t *testing.T) PostgresReplicationSnapshotTestSuite { + t.Helper() + + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }, true) - require.NoError(suite.T(), err) + require.NoError(t, err) - setupTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.T(), err) + setupTx, err := connector.pool.Begin(context.Background()) + require.NoError(t, err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - require.NoError(suite.T(), err) + require.NoError(t, err) } }() - _, err = setupTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_repl_test CASCADE") - require.NoError(suite.T(), err) + schema := "repltest_" + shared.RandomString(8) + + _, err = setupTx.Exec(context.Background(), + fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema)) + require.NoError(t, err) - _, err = setupTx.Exec(context.Background(), "CREATE SCHEMA pgpeer_repl_test") - require.NoError(suite.T(), err) + _, err = setupTx.Exec(context.Background(), + fmt.Sprintf("CREATE SCHEMA %s", schema)) + require.NoError(t, err) - // setup 3 tables in pgpeer_repl_test schema - // test_1, test_2, test_3, all have 5 columns all text, c1, c2, c3, c4, c5 + // setup 3 tables test_1, test_2, test_3 + // all have 5 text columns c1, c2, c3, c4, c5 tables := []string{"test_1", "test_2", "test_3"} for _, table := range tables { _, err = setupTx.Exec(context.Background(), - fmt.Sprintf("CREATE TABLE pgpeer_repl_test.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", table)) - require.NoError(suite.T(), err) + fmt.Sprintf("CREATE TABLE %s.%s (c1 text, c2 text, c3 text, c4 text, c5 text)", schema, table)) + require.NoError(t, err) } err = setupTx.Commit(context.Background()) - require.NoError(suite.T(), err) -} - -func (suite *PostgresReplicationSnapshotTestSuite) TearDownSuite() { - teardownTx, err := suite.connector.pool.Begin(context.Background()) - require.NoError(suite.T(), err) - defer func() { - err := teardownTx.Rollback(context.Background()) - if err != pgx.ErrTxClosed { - require.NoError(suite.T(), err) - } - }() - - _, err = teardownTx.Exec(context.Background(), "DROP SCHEMA IF EXISTS pgpeer_test CASCADE") - require.NoError(suite.T(), err) - - // Fetch all the publications - rows, err := teardownTx.Query(context.Background(), - "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.T(), err) - - // Iterate over the publications and drop them - for rows.Next() { - var pubname pgtype.Text - err := rows.Scan(&pubname) - require.NoError(suite.T(), err) + require.NoError(t, err) - // Drop the publication in a new transaction - _, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String)) - require.NoError(suite.T(), err) + return PostgresReplicationSnapshotTestSuite{ + t: t, + connector: connector, + schema: schema, } - - _, err = teardownTx.Exec(context.Background(), - "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", - fmt.Sprintf("%%%s", "test_simple_slot_creation")) - require.NoError(suite.T(), err) - - err = teardownTx.Commit(context.Background()) - require.NoError(suite.T(), err) - - suite.True(suite.connector.ConnectionActive() == nil) - - err = suite.connector.Close() - require.NoError(suite.T(), err) - - suite.False(suite.connector.ConnectionActive() == nil) } -func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { +func (suite PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { tables := map[string]string{ - "pgpeer_repl_test.test_1": "test_1_dst", + suite.schema + ".test_1": "test_1_dst", } flowJobName := "test_simple_slot_creation" @@ -121,7 +89,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { // Moved to a go routine go func() { err := suite.connector.SetupReplication(signal, setupReplicationInput) - require.NoError(suite.T(), err) + require.NoError(suite.t, err) }() slog.Info("waiting for slot creation to complete", flowLog) @@ -136,5 +104,49 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { } func TestPostgresReplTestSuite(t *testing.T) { - suite.Run(t, new(PostgresReplicationSnapshotTestSuite)) + e2eshared.GotSuite(t, setupSuite, func(suite PostgresReplicationSnapshotTestSuite) { + teardownTx, err := suite.connector.pool.Begin(context.Background()) + require.NoError(suite.t, err) + defer func() { + err := teardownTx.Rollback(context.Background()) + if err != pgx.ErrTxClosed { + require.NoError(suite.t, err) + } + }() + + _, err = teardownTx.Exec(context.Background(), + fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", suite.schema)) + require.NoError(suite.t, err) + + // Fetch all the publications + rows, err := teardownTx.Query(context.Background(), + "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", fmt.Sprintf("%%%s", "test_simple_slot_creation")) + require.NoError(suite.t, err) + + // Iterate over the publications and drop them + for rows.Next() { + var pubname pgtype.Text + err := rows.Scan(&pubname) + require.NoError(suite.t, err) + + // Drop the publication in a new transaction + _, err = suite.connector.pool.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubname.String)) + require.NoError(suite.t, err) + } + + _, err = teardownTx.Exec(context.Background(), + "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", + fmt.Sprintf("%%%s", "test_simple_slot_creation")) + require.NoError(suite.t, err) + + err = teardownTx.Commit(context.Background()) + require.NoError(suite.t, err) + + require.True(suite.t, suite.connector.ConnectionActive() == nil) + + err = suite.connector.Close() + require.NoError(suite.t, err) + + require.False(suite.t, suite.connector.ConnectionActive() == nil) + }) } diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 8a919eb214..24fa43dbfc 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -5,79 +5,59 @@ import ( "fmt" "testing" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/jackc/pgx/v5" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/require" ) type PostgresSchemaDeltaTestSuite struct { - suite.Suite + t *testing.T + connector *PostgresConnector } const schemaDeltaTestSchemaName = "pgschema_delta_test" -func (suite *PostgresSchemaDeltaTestSuite) failTestError(err error) { - if err != nil { - suite.FailNow(err.Error()) - } -} +func setupSchemaDeltaSuite(t *testing.T) PostgresSchemaDeltaTestSuite { + t.Helper() -func (suite *PostgresSchemaDeltaTestSuite) SetupSuite() { - var err error - suite.connector, err = NewPostgresConnector(context.Background(), &protos.PostgresConfig{ + connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, User: "postgres", Password: "postgres", Database: "postgres", }, false) - suite.failTestError(err) + require.NoError(t, err) - setupTx, err := suite.connector.pool.Begin(context.Background()) - suite.failTestError(err) + setupTx, err := connector.pool.Begin(context.Background()) + require.NoError(t, err) defer func() { err := setupTx.Rollback(context.Background()) if err != pgx.ErrTxClosed { - suite.failTestError(err) + require.NoError(t, err) } }() _, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(t, err) _, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schemaDeltaTestSchemaName)) - suite.failTestError(err) + require.NoError(t, err) err = setupTx.Commit(context.Background()) - suite.failTestError(err) -} - -func (suite *PostgresSchemaDeltaTestSuite) TearDownSuite() { - teardownTx, err := suite.connector.pool.Begin(context.Background()) - suite.failTestError(err) - defer func() { - err := teardownTx.Rollback(context.Background()) - if err != pgx.ErrTxClosed { - suite.failTestError(err) - } - }() - _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", - schemaDeltaTestSchemaName)) - suite.failTestError(err) - err = teardownTx.Commit(context.Background()) - suite.failTestError(err) - - suite.True(suite.connector.ConnectionActive() == nil) - err = suite.connector.Close() - suite.failTestError(err) - suite.False(suite.connector.ConnectionActive() == nil) + require.NoError(t, err) + return PostgresSchemaDeltaTestSuite{ + t: t, + connector: connector, + } } -func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { +func (suite PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.simple_add_column", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, @@ -87,13 +67,13 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { ColumnType: string(qvalue.QValueKindInt64), }}, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ + require.NoError(suite.t, err) + require.Equal(suite.t, &protos.TableSchema{ TableIdentifier: tableName, Columns: map[string]string{ "id": string(qvalue.QValueKindInt32), @@ -103,11 +83,11 @@ func (suite *PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { }, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { +func (suite PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.add_drop_all_column_types", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -148,20 +128,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(suite.t, err) + require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { +func (suite PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -194,20 +174,20 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(suite.t, err) + require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { +func (suite PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", schemaDeltaTestSchemaName) _, err := suite.connector.pool.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -234,15 +214,34 @@ func (suite *PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(suite.t, err) + require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { - suite.Run(t, new(PostgresSchemaDeltaTestSuite)) + e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(suite PostgresSchemaDeltaTestSuite) { + teardownTx, err := suite.connector.pool.Begin(context.Background()) + require.NoError(suite.t, err) + defer func() { + err := teardownTx.Rollback(context.Background()) + if err != pgx.ErrTxClosed { + require.NoError(suite.t, err) + } + }() + _, err = teardownTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", + schemaDeltaTestSchemaName)) + require.NoError(suite.t, err) + err = teardownTx.Commit(context.Background()) + require.NoError(suite.t, err) + + require.True(suite.t, suite.connector.ConnectionActive() == nil) + err = suite.connector.Close() + require.NoError(suite.t, err) + require.False(suite.t, suite.connector.ConnectionActive() == nil) + }) } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index cc0352fae3..09f39622ab 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -18,11 +18,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteBQ struct { - got.G t *testing.T bqSuffix string @@ -35,13 +33,13 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { slog.Error("failed to tear down postgres", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } err = s.bqHelper.DropDataset(s.bqHelper.datasetName) if err != nil { slog.Error("failed to tear down bigquery", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } }) } @@ -118,7 +116,7 @@ func setupBigQuery(t *testing.T) *BigQueryTestHelper { } // Implement SetupAllSuite interface to setup the test suite -func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { +func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { t.Helper() err := godotenv.Load() @@ -128,9 +126,9 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { slog.Info("Unable to load .env file, using default values from env") } - suffix := shared.RandomString(8) + suffix := strings.ToLower(shared.RandomString(8)) tsSuffix := time.Now().Format("20060102150405") - bqSuffix := fmt.Sprintf("bq_%s_%s", strings.ToLower(suffix), tsSuffix) + bqSuffix := fmt.Sprintf("bq_%s_%s", suffix, tsSuffix) pool, err := e2e.SetupPostgres(bqSuffix) if err != nil || pool == nil { slog.Error("failed to setup postgres", slog.Any("error", err)) @@ -140,7 +138,6 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { bq := setupBigQuery(t) return PeerFlowE2ETestSuiteBQ{ - G: g, t: t, bqSuffix: bqSuffix, pool: pool, @@ -149,7 +146,7 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { } func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. @@ -161,7 +158,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) // Verify workflow completes - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() // assert that error contains "invalid connection configs" @@ -171,7 +168,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { } func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_no_data") @@ -205,7 +202,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -215,7 +212,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { } func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_char_coltype") @@ -249,7 +246,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -262,7 +259,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // The test inserts 10 rows into the source table and verifies that the data is // correctly synced to the destination table after sync flow completes. func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") @@ -312,7 +309,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -320,7 +317,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { count, err := s.bqHelper.countRows(dstTableName) require.NoError(s.t, err) - s.Equal(10, count) + require.Equal(s.t, 10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side @@ -329,7 +326,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_1") @@ -386,7 +383,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -397,7 +394,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_2") @@ -449,7 +446,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -461,7 +458,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_3") @@ -524,7 +521,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -535,7 +532,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_4") @@ -591,7 +588,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -602,7 +599,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_5") @@ -658,7 +655,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -669,7 +666,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_types_bq") @@ -726,7 +723,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -742,13 +739,13 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { s.t.Log(err) } // Make sure that there are no nulls - s.True(noNulls) + require.True(s.t, noNulls) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -802,15 +799,15 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { count2, err := s.bqHelper.countRows(dstTable2Name) require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) env.AssertExpectations(s.t) } // TODO: not checking schema exactly, add later func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -900,7 +897,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -910,7 +907,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -971,7 +968,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -983,7 +980,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1047,7 +1044,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1060,7 +1057,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1120,7 +1117,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1133,7 +1130,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") @@ -1183,7 +1180,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1196,7 +1193,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_bq") @@ -1254,8 +1251,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name) require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) err = s.bqHelper.DropDataset(secondDataset) require.NoError(s.t, err) @@ -1264,7 +1261,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1334,7 +1331,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1348,11 +1345,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, int64(1), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1420,7 +1417,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1432,11 +1429,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, int64(1), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1508,7 +1505,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1520,11 +1517,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, int64(1), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") @@ -1584,7 +1581,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1596,5 +1593,5 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(0, numNewRows) + require.Equal(s.t, numNewRows, int64(0)) } diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index def151071c..f75200bf13 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -47,7 +47,7 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr } func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -71,7 +71,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -82,7 +82,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { } func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -105,7 +105,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index df1ff17c13..eb97430561 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -55,6 +55,7 @@ func cleanPostgres(pool *pgxpool.Pool, suffix string) error { if err != nil { return fmt.Errorf("failed to list publications: %w", err) } + defer rows.Close() // drop all publications with the given suffix for rows.Next() { @@ -135,15 +136,8 @@ func SetupPostgres(suffix string) (*pgxpool.Pool, error) { } func TearDownPostgres(pool *pgxpool.Pool, suffix string) error { - // drop the e2e_test schema - if pool != nil { - err := cleanPostgres(pool, suffix) - if err != nil { - return err - } - pool.Close() - } - return nil + err := cleanPostgres(pool, suffix) + return err } // GeneratePostgresPeer generates a postgres peer config for testing. diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 53658bf6ff..06095b192e 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -9,17 +9,18 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgtype" + "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", postgresSuffix, tableName) +func (s PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) } -func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, postgresSuffix) +func (s PeerFlowE2ETestSuitePG) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.suffix) } -func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { +func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { query := fmt.Sprintf(`SELECT "_PEERDB_IS_DELETED","_PEERDB_SYNCED_AT" FROM %s WHERE id = %d`, dstSchemaQualified, rowID) var isDeleted pgtype.Bool @@ -40,9 +41,9 @@ func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, r return nil } -func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow") dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") @@ -54,7 +55,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), @@ -64,7 +65,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -82,7 +83,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) } s.T().Log("Inserted 10 rows into the source table") }() @@ -90,22 +91,22 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,key,value") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") @@ -116,7 +117,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { c1 BIGINT ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), @@ -126,7 +127,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 1, @@ -140,8 +141,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 1) - s.NoError(err) - s.T().Log("Inserted initial row in the source table") + require.NoError(s.t, err) + s.t.Println("Inserted initial row in the source table") // verify we got our first row. e2e.NormalizeFlowCountQuery(env, connectionGen, 2) @@ -156,20 +157,20 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.NoError(s.t, err) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - s.NoError(err) + require.NoError(s.t, err) // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s ADD COLUMN c2 BIGINT`, srcTableName)) - s.NoError(err) - s.T().Log("Altered source table, added column c2") + require.NoError(s.t, err) + s.t.Log("Altered source table, added column c2") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c2) VALUES ($1,$2)`, srcTableName), 2, 2) - s.NoError(err) - s.T().Log("Inserted row with added c2 in the source table") + require.NoError(s.t, err) + s.t.Log("Inserted row with added c2 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 4) @@ -185,20 +186,20 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.NoError(s.t, err) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") - s.NoError(err) + require.NoError(s.t, err) // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c2, ADD COLUMN c3 BIGINT`, srcTableName)) - s.NoError(err) - s.T().Log("Altered source table, dropped column c2 and added column c3") + require.NoError(s.t, err) + s.t.Log("Altered source table, dropped column c2 and added column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1,c3) VALUES ($1,$2)`, srcTableName), 3, 3) - s.NoError(err) - s.T().Log("Inserted row with added c3 in the source table") + require.NoError(s.t, err) + s.t.Log("Inserted row with added c3 in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 6) @@ -215,20 +216,20 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.NoError(s.t, err) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") - s.NoError(err) + require.NoError(s.t, err) // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` ALTER TABLE %s DROP COLUMN c3`, srcTableName)) - s.NoError(err) - s.T().Log("Altered source table, dropped column c3") + require.NoError(s.t, err) + s.t.Log("Altered source table, dropped column c3") _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c1) VALUES ($1)`, srcTableName), 4) - s.NoError(err) - s.T().Log("Inserted row after dropping all columns in the source table") + require.NoError(s.t, err) + s.t.Log("Inserted row after dropping all columns in the source table") // verify we got our two rows, if schema did not match up it will error. e2e.NormalizeFlowCountQuery(env, connectionGen, 8) @@ -245,28 +246,28 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, }) - s.NoError(err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.NoError(s.t, err) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst") @@ -280,7 +281,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { PRIMARY KEY(id,t) ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), @@ -290,7 +291,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -307,41 +308,41 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } s.T().Log("Inserted 10 rows into the source table") // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - s.NoError(err) + require.NoError(s.t, err) _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst") @@ -359,7 +360,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), @@ -369,7 +370,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 20, @@ -381,7 +382,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) rowsTx, err := s.pool.Begin(context.Background()) - s.NoError(err) + require.NoError(s.t, err) // insert 10 rows into the source table for i := 0; i < 10; i++ { @@ -389,40 +390,40 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } s.T().Log("Inserted 10 rows into the source table") _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) err = rowsTx.Commit(context.Background()) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") @@ -440,7 +441,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); $$ language sql; `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), @@ -450,7 +451,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 10, @@ -468,38 +469,38 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) `, srcTableName), i, testValue) - s.NoError(err) + require.NoError(s.t, err) } s.T().Log("Inserted 10 rows into the source table") e2e.NormalizeFlowCountQuery(env, connectionGen, 2) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) - s.NoError(err) + require.NoError(s.t, err) _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") - s.NoError(err) + require.NoError(s.t, err) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") @@ -511,7 +512,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), @@ -522,7 +523,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ ExitAfterRecords: 2, @@ -537,26 +538,26 @@ func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(key, value) VALUES ($1, $2) `, srcTableName), testKey, testValue) - s.NoError(err) + require.NoError(s.t, err) // delete that row _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` DELETE FROM %s WHERE id=1 `, srcTableName)) - s.NoError(err) - s.T().Log("Inserted and deleted a row for peerdb column check") + require.NoError(s.t, err) + s.t.Log("Inserted and deleted a row for peerdb column check") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") checkErr := s.checkPeerdbColumns(dstTableName, 1) - s.NoError(checkErr) - env.AssertExpectations(s.T()) + require.NoError(s.t, checkErr) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index d408451939..fbd90e8fc7 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -9,31 +9,36 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" + "github.com/stretchr/testify/require" ) -const postgresSuffix = "postgres" - type PeerFlowE2ETestSuitePG struct { - suite.Suite - testsuite.WorkflowTestSuite + t *testing.T pool *pgxpool.Pool peer *protos.Peer connector *connpostgres.PostgresConnector + suffix string } func TestPeerFlowE2ETestSuitePG(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuitePG)) + e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuitePG) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + s.t.Fatal("failed to drop Postgres schema", err) + } + }) } -// Implement SetupAllSuite interface to setup the test suite -func (s *PeerFlowE2ETestSuitePG) SetupSuite() { +func setupSuite(t *testing.T) PeerFlowE2ETestSuitePG { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -41,14 +46,15 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { slog.Info("Unable to load .env file, using default values from env") } - pool, err := e2e.SetupPostgres(postgresSuffix) + suffix := "pgtest_" + strings.ToLower(shared.RandomString(8)) + + pool, err := e2e.SetupPostgres(suffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) + t.Fatal("failed to setup postgres", err) } - s.pool = pool - s.peer = generatePGPeer(e2e.GetTestPostgresConf()) + peer := generatePGPeer(e2e.GetTestPostgresConf()) - s.connector, err = connpostgres.NewPostgresConnector(context.Background(), + connector, err := connpostgres.NewPostgresConnector(context.Background(), &protos.PostgresConfig{ Host: "localhost", Port: 7132, @@ -56,25 +62,24 @@ func (s *PeerFlowE2ETestSuitePG) SetupSuite() { Password: "postgres", Database: "postgres", }, false) - s.NoError(err) -} - -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuitePG) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, postgresSuffix) - if err != nil { - s.Fail("failed to drop Postgres schema", err) + require.NoError(t, err) + return PeerFlowE2ETestSuitePG{ + t: t, + pool: pool, + peer: peer, + connector: connector, + suffix: suffix, } } -func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, postgresSuffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, postgresSuffix, tableName, rowCount) - s.NoError(err) +func (s PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) + require.NoError(s.t, err) + err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) + require.NoError(s.t, err) } -func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQualified, selector string) error { // Execute the two EXCEPT queries for { err := s.compareQuery(srcSchemaQualified, dstSchemaQualified, selector) @@ -104,7 +109,7 @@ func (s *PeerFlowE2ETestSuitePG) comparePGTables(srcSchemaQualified, dstSchemaQu return nil } -func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { +func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualified, selector string) error { query := fmt.Sprintf("SELECT %s FROM %s EXCEPT SELECT %s FROM %s", selector, srcSchemaQualified, selector, dstSchemaQualified) rows, err := s.pool.Query(context.Background(), query) @@ -138,7 +143,7 @@ func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQuali return nil } -func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { +func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) rows, _ := s.pool.Query(context.Background(), query) @@ -159,9 +164,9 @@ func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error return rows.Err() } -func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -170,14 +175,14 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { dstTable := "test_qrep_flow_avro_pg_2" - err := e2e.CreateTableForQRep(s.pool, postgresSuffix, dstTable) - s.NoError(err) + err := e2e.CreateTableForQRep(s.pool, s.suffix, dstTable) + require.NoError(s.t, err) - srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) - dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - postgresSuffix, srcTable) + s.suffix, srcTable) postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) @@ -191,27 +196,27 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { true, "", ) - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*") if err != nil { - s.FailNow(err.Error()) + require.FailNow(s.t, err.Error()) } - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -220,11 +225,11 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ dstTable := "test_qrep_columns_pg_2" - srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) - dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", s.suffix, dstTable) query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", - postgresSuffix, srcTable) + s.suffix, srcTable) postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) @@ -238,20 +243,20 @@ func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_ true, "_PEERDB_SYNCED_AT", ) - s.NoError(err) + require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) err = s.checkSyncedAt(dstSchemaQualified) if err != nil { - s.FailNow(err.Error()) + require.FailNow(s.t, err.Error()) } - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index f28161b97b..25b8e8e3ae 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -10,22 +10,23 @@ import ( "github.com/stretchr/testify/require" ) -func (s *PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { - return fmt.Sprintf("e2e_test_%s.%s", s3Suffix, tableName) +func (s PeerFlowE2ETestSuiteS3) attachSchemaSuffix(tableName string) string { + return fmt.Sprintf("e2e_test_%s.%s", s.suffix, tableName) } -func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { - return fmt.Sprintf("%s_%s", input, s3Suffix) +func (s PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { + return fmt.Sprintf("%s_%s", input, s.suffix) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) - setupErr := s.setupS3("s3") + helper, setupErr := setupS3("s3") if setupErr != nil { - s.Fail("failed to setup S3", setupErr) + require.Fail(s.t, "failed to setup S3", setupErr) } + s.s3Helper = helper srcTableName := s.attachSchemaSuffix("test_simple_flow_s3") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_s3", "test_simple_flow_s3") @@ -37,7 +38,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -46,7 +47,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -56,48 +57,49 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - s.NoError(err) + _, err = s.pool.Exec(context.Background(), + fmt.Sprintf("INSERT INTO %s (key, value) VALUES ($1, $2)", srcTableName), testKey, testValue) + require.NoError(s.t, err) } - s.NoError(err) + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - s.T().Logf("JobName: %s", flowJobName) + s.t.Logf("JobName: %s", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.T().Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) - require.NoError(s.T(), err) + s.t.Logf("Files in Test_Complete_Simple_Flow_S3: %d", len(files)) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) - setupErr := s.setupS3("gcs") +func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) + + helper, setupErr := setupS3("gcs") if setupErr != nil { - s.Fail("failed to setup S3", setupErr) + require.Fail(s.t, "failed to setup S3", setupErr) } + s.s3Helper = helper srcTableName := s.attachSchemaSuffix("test_simple_flow_gcs_interop") dstTableName := fmt.Sprintf("%s.%s", "peerdb_test_gcs_interop", "test_simple_flow_gcs_interop") @@ -109,7 +111,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { value TEXT NOT NULL ); `, srcTableName)) - s.NoError(err) + require.NoError(s.t, err) connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, @@ -118,7 +120,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { } flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() - s.NoError(err) + require.NoError(s.t, err) limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, @@ -128,38 +130,37 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { go func() { e2e.SetupCDCFlowStatusQuery(env, connectionGen) - s.NoError(err) + require.NoError(s.t, err) // insert 20 rows for i := 1; i <= 20; i++ { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) - _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` - INSERT INTO %s (key, value) VALUES ($1, $2) - `, srcTableName), testKey, testValue) - s.NoError(err) + _, err = s.pool.Exec(context.Background(), + fmt.Sprintf("INSERT INTO %s (key, value) VALUES ($1, $2)", srcTableName), testKey, testValue) + require.NoError(s.t, err) } - s.T().Log("Inserted 20 rows into the source table") - s.NoError(err) + s.t.Log("Inserted 20 rows into the source table") + require.NoError(s.t, err) }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error - s.Error(err) - s.Contains(err.Error(), "continue as new") + require.Error(s.t, err) + require.Contains(s.t, err.Error(), "continue as new") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - s.T().Logf("JobName: %s", flowJobName) + s.t.Logf("JobName: %s", flowJobName) files, err := s.s3Helper.ListAllFiles(ctx, flowJobName) - s.T().Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) - require.NoError(s.T(), err) + s.t.Logf("Files in Test_Complete_Simple_Flow_GCS: %d", len(files)) + require.NoError(s.t, err) - require.Equal(s.T(), 4, len(files)) + require.Equal(s.t, 4, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 4fc4f7bf78..7dcb05f840 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -4,53 +4,56 @@ import ( "context" "fmt" "log/slog" + "strings" "testing" "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/e2eshared" + "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" ) -const s3Suffix = "s3" - type PeerFlowE2ETestSuiteS3 struct { - suite.Suite - testsuite.WorkflowTestSuite + t *testing.T pool *pgxpool.Pool s3Helper *S3TestHelper + suffix string } func TestPeerFlowE2ETestSuiteS3(t *testing.T) { - suite.Run(t, new(PeerFlowE2ETestSuiteS3)) -} + e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteS3) { + err := e2e.TearDownPostgres(s.pool, s.suffix) + if err != nil { + require.Fail(s.t, "failed to drop Postgres schema", err) + } -func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateTableForQRep(s.pool, s3Suffix, tableName) - s.NoError(err) - err = e2e.PopulateSourceTable(s.pool, s3Suffix, tableName, rowCount) - s.NoError(err) + if s.s3Helper != nil { + err = s.s3Helper.CleanUp() + if err != nil { + require.Fail(s.t, "failed to clean up s3", err) + } + } + }) } -func (s *PeerFlowE2ETestSuiteS3) setupS3(mode string) error { - switchToGCS := false - if mode == "gcs" { - switchToGCS = true - } - helper, err := NewS3TestHelper(switchToGCS) - if err != nil { - return err - } +func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { + err := e2e.CreateTableForQRep(s.pool, s.suffix, tableName) + require.NoError(s.t, err) + err = e2e.PopulateSourceTable(s.pool, s.suffix, tableName, rowCount) + require.NoError(s.t, err) +} - s.s3Helper = helper - return nil +func setupS3(mode string) (*S3TestHelper, error) { + return NewS3TestHelper(mode == "gcs") } -func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { +func setupSuite(t *testing.T) PeerFlowE2ETestSuiteS3 { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -58,43 +61,35 @@ func (s *PeerFlowE2ETestSuiteS3) SetupSuite() { slog.Info("Unable to load .env file, using default values from env") } - pool, err := e2e.SetupPostgres(s3Suffix) + suffix := "s3_" + strings.ToLower(shared.RandomString(8)) + pool, err := e2e.SetupPostgres(suffix) if err != nil || pool == nil { - s.Fail("failed to setup postgres", err) - } - s.pool = pool - - err = s.setupS3("s3") - if err != nil { - s.Fail("failed to setup S3", err) + require.Fail(t, "failed to setup postgres", err) } -} -// Implement TearDownAllSuite interface to tear down the test suite -func (s *PeerFlowE2ETestSuiteS3) TearDownSuite() { - err := e2e.TearDownPostgres(s.pool, s3Suffix) + helper, err := setupS3("s3") if err != nil { - s.Fail("failed to drop Postgres schema", err) + require.Fail(t, "failed to setup S3", err) } - if s.s3Helper != nil { - err = s.s3Helper.CleanUp() - if err != nil { - s.Fail("failed to clean up s3", err) - } + return PeerFlowE2ETestSuiteS3{ + t: t, + pool: pool, + s3Helper: helper, + suffix: suffix, } } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) jobName := "test_complete_flow_s3" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 10) query := fmt.Sprintf("SELECT * FROM %s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", @@ -109,16 +104,16 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -127,23 +122,23 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 1, len(files)) + require.Equal(s.t, 1, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } -func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { +func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { if s.s3Helper == nil { - s.T().Skip("Skipping S3 test") + s.t.Skip("Skipping S3 test") } - env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(s.T(), env) + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) jobName := "test_complete_flow_s3_ctid" - schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) + schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s.suffix, jobName) s.setupSourceTable(jobName, 20000) query := fmt.Sprintf("SELECT * FROM %s WHERE ctid BETWEEN {{.start}} AND {{.end}}", schemaQualifiedName) @@ -157,7 +152,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { false, "", ) - s.NoError(err) + require.NoError(s.t, err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url qrepConfig.NumRowsPerPartition = 2000 qrepConfig.InitialCopyOnly = true @@ -166,10 +161,10 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() - s.NoError(err) + require.NoError(s.t, err) // Verify destination has 1 file // make context with timeout @@ -178,9 +173,9 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { files, err := s.s3Helper.ListAllFiles(ctx, jobName) - require.NoError(s.T(), err) + require.NoError(s.t, err) - require.Equal(s.T(), 10, len(files)) + require.Equal(s.t, 10, len(files)) - env.AssertExpectations(s.T()) + env.AssertExpectations(s.t) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 314c92d2b5..0d1c3e1d20 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -9,8 +9,6 @@ import ( "testing" "time" - "github.com/ysmood/got" - connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" @@ -24,7 +22,6 @@ import ( ) type PeerFlowE2ETestSuiteSF struct { - got.G t *testing.T pgSuffix string @@ -34,25 +31,25 @@ type PeerFlowE2ETestSuiteSF struct { } func TestPeerFlowE2ETestSuiteSF(t *testing.T) { - e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { + e2eshared.GotSuite(t, setupSuite, func(s PeerFlowE2ETestSuiteSF) { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { slog.Error("failed to tear down Postgres", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } if s.sfHelper != nil { err = s.sfHelper.Cleanup() if err != nil { slog.Error("failed to tear down Snowflake", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } } err = s.connector.Close() if err != nil { slog.Error("failed to close Snowflake connector", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } }) } @@ -65,7 +62,7 @@ func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.pgSuffix) } -func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { +func setupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { t.Helper() err := godotenv.Load() @@ -75,20 +72,20 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { slog.Info("Unable to load .env file, using default values from env") } - suffix := shared.RandomString(8) + suffix := strings.ToLower(shared.RandomString(8)) tsSuffix := time.Now().Format("20060102150405") pgSuffix := fmt.Sprintf("sf_%s_%s", strings.ToLower(suffix), tsSuffix) pool, err := e2e.SetupPostgres(pgSuffix) if err != nil || pool == nil { slog.Error("failed to setup Postgres", slog.Any("error", err)) - g.FailNow() + t.FailNow() } sfHelper, err := NewSnowflakeTestHelper() if err != nil { slog.Error("failed to setup Snowflake", slog.Any("error", err)) - g.FailNow() + t.FailNow() } connector, err := connsnowflake.NewSnowflakeConnector( @@ -98,7 +95,6 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { require.NoError(t, err) suite := PeerFlowE2ETestSuiteSF{ - G: g, t: t, pgSuffix: pgSuffix, pool: pool, @@ -110,7 +106,7 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") @@ -159,7 +155,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -167,7 +163,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { count, err := s.sfHelper.CountRows("test_simple_flow_sf") require.NoError(s.t, err) - s.Equal(20, count) + require.Equal(s.t, 20, count) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago // it should match the count. @@ -176,7 +172,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { `, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(20, numNewRows) + require.Equal(s.t, 20, numNewRows) // TODO: verify that the data is correctly synced to the destination table // on the Snowflake side @@ -185,7 +181,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_replica_identity_no_pkey") @@ -237,7 +233,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -245,13 +241,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { count, err := s.sfHelper.CountRows("test_replica_identity_no_pkey") require.NoError(s.t, err) - s.Equal(20, count) + require.Equal(s.t, 20, count) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") @@ -313,7 +309,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -323,11 +319,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { // They should have been filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") require.NoError(s.t, err) - s.Equal(6, lineCount) + require.Equal(s.t, 6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") require.NoError(s.t, err) - s.Equal(6, polyCount) + require.Equal(s.t, 6, polyCount) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side @@ -336,7 +332,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_1") @@ -392,7 +388,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -403,7 +399,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_2") @@ -450,7 +446,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { if err != nil { slog.Error("Error executing transaction", slog.Any("error", err)) - s.FailNow() } wg.Done() @@ -459,7 +454,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -472,7 +467,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_3") @@ -534,7 +529,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -545,7 +540,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_4") @@ -600,7 +595,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -611,7 +606,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_5") @@ -666,7 +661,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -677,7 +672,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_types_sf") @@ -734,7 +729,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -750,13 +745,13 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { s.t.Log(err) } // Make sure that there are no nulls - s.Equal(noNulls, true) + require.Equal(s.t, noNulls, true) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_sf") @@ -800,7 +795,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") @@ -808,14 +803,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { count2, err := s.sfHelper.CountRows("test2_sf") require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) env.AssertExpectations(s.t) } func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") @@ -869,7 +864,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") // alter source table, add column c2 and insert another row. @@ -898,7 +893,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. @@ -928,7 +923,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. @@ -958,14 +953,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -975,7 +970,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") @@ -1035,7 +1030,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1048,7 +1043,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") @@ -1111,7 +1106,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1124,7 +1119,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") @@ -1183,7 +1178,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1196,7 +1191,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_exclude_sf") @@ -1262,7 +1257,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1274,12 +1269,12 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { for _, field := range sfRows.Schema.Fields { require.NotEqual(s.t, field.Name, "c2") } - s.Equal(5, len(sfRows.Schema.Fields)) - s.Equal(10, len(sfRows.Records)) + require.Equal(s.t, 5, len(sfRows.Schema.Fields)) + require.Equal(s.t, 10, len(sfRows.Records)) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel") @@ -1349,7 +1344,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1366,7 +1361,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") @@ -1434,7 +1429,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1445,11 +1440,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(1, numNewRows) + require.Equal(s.t, 1, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") @@ -1521,7 +1516,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1532,11 +1527,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(1, numNewRows) + require.Equal(s.t, 1, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") @@ -1596,7 +1591,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1607,7 +1602,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(0, numNewRows) + require.Equal(s.t, 0, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index f45a78d7c1..5b150a7bd3 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -52,7 +52,7 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableNam } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -81,7 +81,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -93,7 +93,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -126,7 +126,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -138,7 +138,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -168,7 +168,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -180,7 +180,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -214,7 +214,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { e2e.RunXminFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -226,7 +226,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { } func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -260,7 +260,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -272,7 +272,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() } func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -305,7 +305,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index 693062e39d..5b46e125dd 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -10,30 +10,19 @@ import ( "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/ysmood/got" + "github.com/stretchr/testify/require" ) const schemaDeltaTestSchemaName = "PUBLIC" type SnowflakeSchemaDeltaTestSuite struct { - got.G t *testing.T connector *connsnowflake.SnowflakeConnector sfTestHelper *SnowflakeTestHelper } -func (suite SnowflakeSchemaDeltaTestSuite) failTestError(err error) { - if err != nil { - slog.Error("Error in test", slog.Any("error", err)) - suite.FailNow() - } -} - -func setupSchemaDeltaSuite( - t *testing.T, - g got.G, -) SnowflakeSchemaDeltaTestSuite { +func setupSchemaDeltaSuite(t *testing.T) SnowflakeSchemaDeltaTestSuite { t.Helper() sfTestHelper, err := NewSnowflakeTestHelper() @@ -52,7 +41,6 @@ func setupSchemaDeltaSuite( } return SnowflakeSchemaDeltaTestSuite{ - G: g, t: t, connector: connector, sfTestHelper: sfTestHelper, @@ -62,7 +50,7 @@ func setupSchemaDeltaSuite( func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.SIMPLE_ADD_COLUMN", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, @@ -72,13 +60,13 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { ColumnType: string(qvalue.QValueKindJSON), }}, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ + require.NoError(suite.t, err) + require.Equal(suite.t, &protos.TableSchema{ TableIdentifier: tableName, Columns: map[string]string{ "ID": string(qvalue.QValueKindString), @@ -90,7 +78,7 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { func (suite SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.ADD_DROP_ALL_COLUMN_TYPES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -124,19 +112,19 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(suite.t, err) + require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func (suite SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_TRICKY_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(id TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -168,19 +156,19 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(suite.t, err) + require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_WHITESPACE_COLUMN_NAMES", schemaDeltaTestSchemaName) err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(\" \" TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + require.NoError(suite.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -206,18 +194,18 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(suite.t, err) output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(suite.t, err) + require.Equal(suite.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(suite SnowflakeSchemaDeltaTestSuite) { - suite.failTestError(suite.sfTestHelper.Cleanup()) - suite.failTestError(suite.connector.Close()) + require.NoError(suite.t, suite.sfTestHelper.Cleanup()) + require.NoError(suite.t, suite.connector.Close()) }) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index aeb622d559..e21e7c5979 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -19,11 +19,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteSQLServer struct { - got.G t *testing.T pool *pgxpool.Pool @@ -131,7 +129,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( s.t.Skip("Skipping SQL Server test") } - env := e2e.NewTemporalTestWorkflowEnvironment() + env := e2e.NewTemporalTestWorkflowEnvironment(s.t) e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -166,7 +164,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() require.NoError(s.t, err) @@ -177,5 +175,5 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( err = s.pool.QueryRow(context.Background(), countQuery).Scan(&numRowsInDest) require.NoError(s.t, err) - s.Equal(numRows, int(numRowsInDest.Int64)) + require.Equal(s.t, numRows, int(numRowsInDest.Int64)) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 4674667b97..6668bfae7d 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "log/slog" - "os" "strings" "testing" "time" @@ -371,16 +370,26 @@ func GetOwnersSelectorStringsSF() [2]string { return [2]string{strings.Join(pgFields, ","), strings.Join(sfFields, ",")} } -func NewTemporalTestWorkflowEnvironment() *testsuite.TestWorkflowEnvironment { - testSuite := &testsuite.WorkflowTestSuite{} +type tlogWriter struct { + t *testing.T +} + +func (iw tlogWriter) Write(p []byte) (n int, err error) { + iw.t.Log(string(p)) + return len(p), nil +} + +func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnvironment { + t.Helper() logger := slog.New(logger.NewHandler( slog.NewJSONHandler( - os.Stdout, + tlogWriter{t}, &slog.HandlerOptions{Level: slog.LevelWarn}, ))) tLogger := NewTStructuredLogger(*logger) + testSuite := &testsuite.WorkflowTestSuite{} testSuite.SetLogger(tLogger) return testSuite.NewTestWorkflowEnvironment() } diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index e235536ade..0cf5445190 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -11,15 +11,14 @@ import ( "github.com/ysmood/got" ) -func GotSuite[T any](t *testing.T, setup func(t *testing.T, g got.G) T, teardown func(T)) { +func GotSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) { t.Helper() got.Each(t, func(t *testing.T) T { t.Helper() - g := got.New(t) - g.Parallel() - suite := setup(t, g) - g.Cleanup(func() { + t.Parallel() + suite := setup(t) + t.Cleanup(func() { teardown(suite) }) return suite