diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 0f0e52f41a..4fb16a26f7 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -13,17 +13,15 @@ import ( "github.com/PeerDB-io/peer-flow/shared" "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PostgresSchemaDeltaTestSuite struct { - got.G t *testing.T connector *PostgresConnector schema string } -func SetupSuite(t *testing.T, g got.G) PostgresSchemaDeltaTestSuite { +func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite { t.Helper() connector, err := NewPostgresConnector(context.Background(), &protos.PostgresConfig{ @@ -53,7 +51,6 @@ func SetupSuite(t *testing.T, g got.G) PostgresSchemaDeltaTestSuite { require.NoError(t, err) return PostgresSchemaDeltaTestSuite{ - G: g, t: t, connector: connector, schema: schema, @@ -80,7 +77,7 @@ func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { TableIdentifiers: []string{tableName}, }) require.NoError(s.t, err) - s.Equal(&protos.TableSchema{ + require.Equal(s.t, &protos.TableSchema{ TableIdentifier: tableName, ColumnNames: []string{"id", "hi"}, ColumnTypes: []string{string(qvalue.QValueKindInt32), string(qvalue.QValueKindInt64)}, @@ -143,7 +140,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { TableIdentifiers: []string{tableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { @@ -193,7 +190,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { TableIdentifiers: []string{tableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { @@ -234,7 +231,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { TableIdentifiers: []string{tableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestPostgresSchemaDeltaTestSuite(t *testing.T) { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index cc0352fae3..af6b50e9be 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -18,11 +18,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteBQ struct { - got.G t *testing.T bqSuffix string @@ -35,13 +33,13 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { err := e2e.TearDownPostgres(s.pool, s.bqSuffix) if err != nil { slog.Error("failed to tear down postgres", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } err = s.bqHelper.DropDataset(s.bqHelper.datasetName) if err != nil { slog.Error("failed to tear down bigquery", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } }) } @@ -118,7 +116,7 @@ func setupBigQuery(t *testing.T) *BigQueryTestHelper { } // Implement SetupAllSuite interface to setup the test suite -func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { +func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ { t.Helper() err := godotenv.Load() @@ -140,7 +138,6 @@ func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { bq := setupBigQuery(t) return PeerFlowE2ETestSuiteBQ{ - G: g, t: t, bqSuffix: bqSuffix, pool: pool, @@ -161,7 +158,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, &limits, nil) // Verify workflow completes - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() // assert that error contains "invalid connection configs" @@ -205,7 +202,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -249,7 +246,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -312,7 +309,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -320,7 +317,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { count, err := s.bqHelper.countRows(dstTableName) require.NoError(s.t, err) - s.Equal(10, count) + require.Equal(s.t, 10, count) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side @@ -386,7 +383,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -449,7 +446,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -524,7 +521,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -591,7 +588,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -658,7 +655,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -726,7 +723,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -742,7 +739,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { s.t.Log(err) } // Make sure that there are no nulls - s.True(noNulls) + require.True(s.t, noNulls) env.AssertExpectations(s.t) } @@ -802,8 +799,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { count2, err := s.bqHelper.countRows(dstTable2Name) require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) env.AssertExpectations(s.t) } @@ -900,7 +897,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -971,7 +968,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1047,7 +1044,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1120,7 +1117,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1183,7 +1180,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1254,8 +1251,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { count2, err := s.bqHelper.countRowsWithDataset(secondDataset, dstTable2Name) require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) err = s.bqHelper.DropDataset(secondDataset) require.NoError(s.t, err) @@ -1334,7 +1331,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1348,7 +1345,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, int64(1), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { @@ -1420,7 +1417,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1432,7 +1429,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, int64(1), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { @@ -1508,7 +1505,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1520,7 +1517,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(1, numNewRows) + require.Equal(s.t, int64(1), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { @@ -1584,7 +1581,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1596,5 +1593,5 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { s.bqHelper.datasetName, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) - s.Eq(0, numNewRows) + require.Equal(s.t, int64(0), numNewRows) } diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index def151071c..e9ecb245ff 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -71,7 +71,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -105,7 +105,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index bc81f0497d..8c34bc7c81 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -91,7 +91,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -155,7 +155,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") require.NoError(s.t, err) @@ -185,7 +185,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2") require.NoError(s.t, err) @@ -216,7 +216,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c3") require.NoError(s.t, err) @@ -247,7 +247,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) err = s.comparePGTables(srcTableName, dstTableName, "id,c1") require.NoError(s.t, err) }() @@ -255,7 +255,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -326,7 +326,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -406,7 +406,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -483,7 +483,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -549,7 +549,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index c7e1b0eb43..af6431fcd7 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -17,11 +17,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuitePG struct { - got.G t *testing.T pool *pgxpool.Pool @@ -39,7 +37,7 @@ func TestPeerFlowE2ETestSuitePG(t *testing.T) { }) } -func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuitePG { +func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { t.Helper() err := godotenv.Load() @@ -67,7 +65,6 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuitePG { require.NoError(t, err) return PeerFlowE2ETestSuitePG{ - G: g, t: t, pool: pool, peer: generatePGPeer(e2e.GetTestPostgresConf()), @@ -248,7 +245,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -293,7 +290,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_P e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index adf84c7b9b..4a870a06aa 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -67,7 +67,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 56f5962232..38d9add502 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -14,11 +14,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteS3 struct { - got.G t *testing.T pool *pgxpool.Pool @@ -53,7 +51,7 @@ func (s PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) require.NoError(s.t, err) } -func setupSuite(t *testing.T, g got.G, gcs bool) PeerFlowE2ETestSuiteS3 { +func setupSuite(t *testing.T, gcs bool) PeerFlowE2ETestSuiteS3 { t.Helper() err := godotenv.Load() @@ -75,7 +73,6 @@ func setupSuite(t *testing.T, g got.G, gcs bool) PeerFlowE2ETestSuiteS3 { } return PeerFlowE2ETestSuiteS3{ - G: g, t: t, pool: pool, s3Helper: helper, @@ -83,14 +80,14 @@ func setupSuite(t *testing.T, g got.G, gcs bool) PeerFlowE2ETestSuiteS3 { } } -func SetupSuiteS3(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { +func SetupSuiteS3(t *testing.T) PeerFlowE2ETestSuiteS3 { t.Helper() - return setupSuite(t, g, false) + return setupSuite(t, false) } -func SetupSuiteGCS(t *testing.T, g got.G) PeerFlowE2ETestSuiteS3 { +func SetupSuiteGCS(t *testing.T) PeerFlowE2ETestSuiteS3 { t.Helper() - return setupSuite(t, g, true) + return setupSuite(t, true) } func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { @@ -123,7 +120,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -174,7 +171,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 4520f929eb..161b36acc7 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -9,8 +9,6 @@ import ( "testing" "time" - "github.com/ysmood/got" - connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" @@ -24,7 +22,6 @@ import ( ) type PeerFlowE2ETestSuiteSF struct { - got.G t *testing.T pgSuffix string @@ -38,21 +35,21 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { err := e2e.TearDownPostgres(s.pool, s.pgSuffix) if err != nil { slog.Error("failed to tear down Postgres", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } if s.sfHelper != nil { err = s.sfHelper.Cleanup() if err != nil { slog.Error("failed to tear down Snowflake", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } } err = s.connector.Close() if err != nil { slog.Error("failed to close Snowflake connector", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } }) } @@ -65,7 +62,7 @@ func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.pgSuffix) } -func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSF { t.Helper() err := godotenv.Load() @@ -82,13 +79,13 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { pool, err := e2e.SetupPostgres(pgSuffix) if err != nil || pool == nil { slog.Error("failed to setup Postgres", slog.Any("error", err)) - g.FailNow() + t.FailNow() } sfHelper, err := NewSnowflakeTestHelper() if err != nil { slog.Error("failed to setup Snowflake", slog.Any("error", err)) - g.FailNow() + t.FailNow() } connector, err := connsnowflake.NewSnowflakeConnector( @@ -98,7 +95,6 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { require.NoError(t, err) suite := PeerFlowE2ETestSuiteSF{ - G: g, t: t, pgSuffix: pgSuffix, pool: pool, @@ -159,7 +155,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -167,7 +163,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { count, err := s.sfHelper.CountRows("test_simple_flow_sf") require.NoError(s.t, err) - s.Equal(20, count) + require.Equal(s.t, 20, count) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago // it should match the count. @@ -176,7 +172,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { `, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(20, numNewRows) + require.Equal(s.t, 20, numNewRows) // TODO: verify that the data is correctly synced to the destination table // on the Snowflake side @@ -237,7 +233,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -245,7 +241,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { count, err := s.sfHelper.CountRows("test_replica_identity_no_pkey") require.NoError(s.t, err) - s.Equal(20, count) + require.Equal(s.t, 20, count) env.AssertExpectations(s.t) } @@ -313,7 +309,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -323,11 +319,11 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { // They should have been filtered out as null on destination lineCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "line") require.NoError(s.t, err) - s.Equal(6, lineCount) + require.Equal(s.t, 6, lineCount) polyCount, err := s.sfHelper.CountNonNullRows("test_invalid_geo_sf_avro_cdc", "poly") require.NoError(s.t, err) - s.Equal(6, polyCount) + require.Equal(s.t, 6, polyCount) // TODO: verify that the data is correctly synced to the destination table // on the bigquery side @@ -392,7 +388,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -450,7 +446,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { if err != nil { slog.Error("Error executing transaction", slog.Any("error", err)) - s.FailNow() + s.t.FailNow() } wg.Done() @@ -459,7 +455,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -534,7 +530,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -600,7 +596,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -666,7 +662,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -734,7 +730,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -755,7 +751,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { require.NoError(s.t, err) // Make sure that there are no nulls - s.Equal(noNulls, true) + require.Equal(s.t, noNulls, true) env.AssertExpectations(s.t) } @@ -805,7 +801,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() count1, err := s.sfHelper.CountRows("test1_sf") @@ -813,8 +809,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { count2, err := s.sfHelper.CountRows("test2_sf") require.NoError(s.t, err) - s.Equal(1, count1) - s.Equal(1, count2) + require.Equal(s.t, 1, count1) + require.Equal(s.t, 1, count2) env.AssertExpectations(s.t) } @@ -880,7 +876,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") // alter source table, add column c2 and insert another row. @@ -916,7 +912,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. @@ -954,7 +950,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. @@ -992,14 +988,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { TableIdentifiers: []string{dstTableName}, }) require.NoError(s.t, err) - s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) s.compareTableContentsSF("test_simple_schema_changes", "id,c1") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1069,7 +1065,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1145,7 +1141,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1217,7 +1213,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error @@ -1296,7 +1292,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1308,8 +1304,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { for _, field := range sfRows.Schema.Fields { require.NotEqual(s.t, field.Name, "c2") } - s.Equal(5, len(sfRows.Schema.Fields)) - s.Equal(10, len(sfRows.Records)) + require.Equal(s.t, 5, len(sfRows.Schema.Fields)) + require.Equal(s.t, 10, len(sfRows.Records)) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { @@ -1383,7 +1379,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1468,7 +1464,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1479,7 +1475,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(1, numNewRows) + require.Equal(s.t, 1, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { @@ -1555,7 +1551,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1566,7 +1562,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(1, numNewRows) + require.Equal(s.t, 1, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { @@ -1630,7 +1626,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.Contains(s.t, err.Error(), "continue as new") @@ -1641,7 +1637,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) require.NoError(s.t, err) - s.Equal(0, numNewRows) + require.Equal(s.t, 0, numNewRows) } func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { @@ -1695,7 +1691,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() // allow only continue as new error diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index d592906f0a..c9303ec2d6 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -96,7 +96,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -144,7 +144,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -186,7 +186,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -232,7 +232,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { e2e.RunXminFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -277,7 +277,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) @@ -322,7 +322,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err = env.GetWorkflowError() require.NoError(s.t, err) diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index d61cbf1bb4..60d8d99574 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -11,30 +11,19 @@ import ( "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" - "github.com/ysmood/got" + "github.com/stretchr/testify/require" ) const schemaDeltaTestSchemaName = "PUBLIC" type SnowflakeSchemaDeltaTestSuite struct { - got.G t *testing.T connector *connsnowflake.SnowflakeConnector sfTestHelper *SnowflakeTestHelper } -func (suite SnowflakeSchemaDeltaTestSuite) failTestError(err error) { - if err != nil { - slog.Error("Error in test", slog.Any("error", err)) - suite.FailNow() - } -} - -func setupSchemaDeltaSuite( - t *testing.T, - g got.G, -) SnowflakeSchemaDeltaTestSuite { +func setupSchemaDeltaSuite(t *testing.T) SnowflakeSchemaDeltaTestSuite { t.Helper() sfTestHelper, err := NewSnowflakeTestHelper() @@ -53,19 +42,18 @@ func setupSchemaDeltaSuite( } return SnowflakeSchemaDeltaTestSuite{ - G: g, t: t, connector: connector, sfTestHelper: sfTestHelper, } } -func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { +func (s SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { tableName := fmt.Sprintf("%s.SIMPLE_ADD_COLUMN", schemaDeltaTestSchemaName) - err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + err := s.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) + require.NoError(s.t, err) - err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: []*protos.DeltaAddedColumn{{ @@ -73,23 +61,23 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { ColumnType: string(qvalue.QValueKindJSON), }}, }}) - suite.failTestError(err) + require.NoError(s.t, err) - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(&protos.TableSchema{ + require.NoError(s.t, err) + require.Equal(s.t, &protos.TableSchema{ TableIdentifier: tableName, ColumnNames: []string{"ID", "HI"}, ColumnTypes: []string{string(qvalue.QValueKindString), string(qvalue.QValueKindJSON)}, }, output.TableNameSchemaMapping[tableName]) } -func (suite SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { +func (s SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { tableName := fmt.Sprintf("%s.ADD_DROP_ALL_COLUMN_TYPES", schemaDeltaTestSchemaName) - err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + err := s.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) + require.NoError(s.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -119,24 +107,24 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { } }) - err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(s.t, err) - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(s.t, err) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { +func (s SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_TRICKY_COLUMN_NAMES", schemaDeltaTestSchemaName) - err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(id TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + err := s.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(id TEXT PRIMARY KEY)", tableName)) + require.NoError(s.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -174,24 +162,24 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { } }) - err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(s.t, err) - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(s.t, err) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } -func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { +func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { tableName := fmt.Sprintf("%s.ADD_DROP_WHITESPACE_COLUMN_NAMES", schemaDeltaTestSchemaName) - err := suite.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(\" \" TEXT PRIMARY KEY)", tableName)) - suite.failTestError(err) + err := s.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(\" \" TEXT PRIMARY KEY)", tableName)) + require.NoError(s.t, err) expectedTableSchema := &protos.TableSchema{ TableIdentifier: tableName, @@ -213,23 +201,23 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { } }) - err = suite.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ + err = s.connector.ReplayTableSchemaDeltas("schema_delta_flow", []*protos.TableSchemaDelta{{ SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, }}) - suite.failTestError(err) + require.NoError(s.t, err) - output, err := suite.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ + output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{tableName}, }) - suite.failTestError(err) - suite.Equal(expectedTableSchema, output.TableNameSchemaMapping[tableName]) + require.NoError(s.t, err) + require.Equal(s.t, expectedTableSchema, output.TableNameSchemaMapping[tableName]) } func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { - e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(suite SnowflakeSchemaDeltaTestSuite) { - suite.failTestError(suite.sfTestHelper.Cleanup()) - suite.failTestError(suite.connector.Close()) + e2eshared.GotSuite(t, setupSchemaDeltaSuite, func(s SnowflakeSchemaDeltaTestSuite) { + require.NoError(s.t, s.sfTestHelper.Cleanup()) + require.NoError(s.t, s.connector.Close()) }) } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index aeb622d559..32df9e2c6c 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -19,11 +19,9 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/require" - "github.com/ysmood/got" ) type PeerFlowE2ETestSuiteSQLServer struct { - got.G t *testing.T pool *pgxpool.Pool @@ -43,7 +41,7 @@ func TestCDCFlowE2ETestSuiteSQLServer(t *testing.T) { }) } -func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSQLServer { +func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteSQLServer { t.Helper() err := godotenv.Load() @@ -69,7 +67,6 @@ func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSQLServer { } return PeerFlowE2ETestSuiteSQLServer{ - G: g, t: t, pool: pool, sqlsHelper: sqlsHelper, @@ -166,7 +163,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( e2e.RunQrepFlowWorkflow(env, qrepConfig) // Verify workflow completes without error - s.True(env.IsWorkflowCompleted()) + require.True(s.t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() require.NoError(s.t, err) @@ -177,5 +174,5 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( err = s.pool.QueryRow(context.Background(), countQuery).Scan(&numRowsInDest) require.NoError(s.t, err) - s.Equal(numRows, int(numRowsInDest.Int64)) + require.Equal(s.t, numRows, int(numRowsInDest.Int64)) } diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 150ef30459..0324751a93 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -11,7 +11,7 @@ import ( "github.com/ysmood/got" ) -func GotSuite[T any](t *testing.T, setup func(t *testing.T, g got.G) T, teardown func(T)) { +func GotSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) { t.Helper() t.Parallel() @@ -19,7 +19,7 @@ func GotSuite[T any](t *testing.T, setup func(t *testing.T, g got.G) T, teardown t.Helper() g := got.New(t) g.Parallel() - suite := setup(t, g) + suite := setup(t) g.Cleanup(func() { teardown(suite) })