diff --git a/flow/.golangci.yml b/flow/.golangci.yml index ab612fb893..16b2a24619 100644 --- a/flow/.golangci.yml +++ b/flow/.golangci.yml @@ -4,18 +4,22 @@ run: linters: enable: - dogsled + - durationcheck + - errcheck - gofumpt - gosec - gosimple - misspell - nakedret + - nolintlint + - staticcheck - stylecheck + - sqlclosecheck - unconvert - unparam - whitespace - - errcheck - prealloc - - staticcheck + - thelper - ineffassign - unparam - unused diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 4c5693f292..05b5a910a7 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -510,7 +510,6 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl p.relationMessageMapping[msg.RelationID] = convertRelationMessageToProto(msg) } else { // RelationMessages don't contain an LSN, so we use current clientXlogPos instead. - //nolint:lll // https://github.com/postgres/postgres/blob/8b965c549dc8753be8a38c4a1b9fabdb535a4338/src/backend/replication/logical/proto.c#L670 return p.processRelationMessage(currentClientXlogPos, convertRelationMessageToProto(msg)) } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index e48c71b29d..304e313f4c 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -19,7 +19,6 @@ import ( "golang.org/x/exp/maps" ) -//nolint:stylecheck const ( mirrorJobsTableIdentifier = "peerdb_mirror_jobs" createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(mirror_job_name TEXT PRIMARY KEY, diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index 164312010d..2339e93c71 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -211,7 +211,9 @@ func TestGetQRepPartitions(t *testing.T) { } // returns the number of rows inserted -func prepareTestData(test *testing.T, pool *pgxpool.Pool, schema string) int { +func prepareTestData(t *testing.T, pool *pgxpool.Pool, schema string) int { + t.Helper() + // Define the start and end times startTime := time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC) endTime := time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC) @@ -223,12 +225,12 @@ func prepareTestData(test *testing.T, pool *pgxpool.Pool, schema string) int { } // Insert the test data - for i, t := range times { + for i, time := range times { _, err := pool.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s.test (value, "from") VALUES ($1, $2) - `, schema), i+1, t) + `, schema), i+1, time) if err != nil { - test.Fatalf("Failed to insert test data: %v", err) + t.Fatalf("Failed to insert test data: %v", err) } } diff --git a/flow/connectors/postgres/qrep_query_executor_test.go b/flow/connectors/postgres/qrep_query_executor_test.go index 8b4ff0c116..db5a04d93f 100644 --- a/flow/connectors/postgres/qrep_query_executor_test.go +++ b/flow/connectors/postgres/qrep_query_executor_test.go @@ -13,6 +13,8 @@ import ( ) func setupDB(t *testing.T) (*pgxpool.Pool, string) { + t.Helper() + config, err := pgxpool.ParseConfig("postgres://postgres:postgres@localhost:7132/postgres") if err != nil { t.Fatalf("unable to parse config: %v", err) @@ -36,6 +38,8 @@ func setupDB(t *testing.T) (*pgxpool.Pool, string) { } func teardownDB(t *testing.T, pool *pgxpool.Pool, schemaName string) { + t.Helper() + _, err := pool.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA %s CASCADE;", schemaName)) if err != nil { t.Fatalf("error while dropping schema: %v", err) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 4b27f9c689..55a29362f8 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -15,7 +15,6 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/lib/pq/oid" - //nolint:all geom "github.com/twpayne/go-geos" ) diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index f08b66a6c8..0d0cdda6a1 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -17,6 +17,8 @@ import ( // createQValue creates a QValue of the appropriate kind for a given placeholder. func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue.QValue { + t.Helper() + var value interface{} switch kind { case qvalue.QValueKindInt16, qvalue.QValueKindInt32, qvalue.QValueKindInt64: @@ -55,13 +57,15 @@ func createQValue(t *testing.T, kind qvalue.QValueKind, placeHolder int) qvalue. } } -// nolint:unparam +//nolint:unparam func generateRecords( t *testing.T, nullable bool, numRows uint32, allnulls bool, ) (*model.QRecordStream, *model.QRecordSchema) { + t.Helper() + allQValueKinds := []qvalue.QValueKind{ qvalue.QValueKindFloat32, qvalue.QValueKindFloat64, diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index beb38a4a04..0e2be32717 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -74,7 +74,6 @@ func (c *SnowflakeConnector) getTableCounts(tables []string) (int64, error) { if err != nil { return 0, fmt.Errorf("failed to parse table name %s: %w", table, err) } - //nolint:gosec row := c.database.QueryRowContext(c.ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)) var count pgtype.Int8 err = row.Scan(&count) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 55dd4444df..d480aea7cc 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -25,7 +25,6 @@ import ( "golang.org/x/sync/errgroup" ) -//nolint:stylecheck const ( mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS" createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(MIRROR_JOB_NAME STRING NOT NULL,OFFSET INT NOT NULL, @@ -326,6 +325,7 @@ func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { if err != nil { return 0, fmt.Errorf("error querying Snowflake peer for last syncBatchId: %w", err) } + defer rows.Close() var result pgtype.Int8 if !rows.Next() { @@ -346,6 +346,7 @@ func (c *SnowflakeConnector) GetLastSyncAndNormalizeBatchID(jobName string) (mod return model.SyncAndNormalizeBatchID{}, fmt.Errorf("error querying Snowflake peer for last normalizeBatchId: %w", err) } + defer rows.Close() var syncResult, normResult pgtype.Int8 if !rows.Next() { @@ -372,6 +373,7 @@ func (c *SnowflakeConnector) getDistinctTableNamesInBatch(flowJobName string, sy if err != nil { return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err) } + defer rows.Close() var result pgtype.Text destinationTableNames := make([]string, 0) @@ -395,6 +397,8 @@ func (c *SnowflakeConnector) getTableNametoUnchangedCols(flowJobName string, syn if err != nil { return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err) } + defer rows.Close() + // Create a map to store the results resultMap := make(map[string][]string) // Process the rows and populate the map diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index b95eb511fd..af7d375d79 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -382,7 +382,6 @@ func toQValue(kind qvalue.QValueKind, val interface{}) (qvalue.QValue, error) { if v, ok := val.(*sql.NullString); ok { if v.Valid { numeric := new(big.Rat) - //nolint:gosec if _, ok := numeric.SetString(v.String); !ok { return qvalue.QValue{}, fmt.Errorf("failed to parse numeric: %v", v.String) } diff --git a/flow/connectors/sqlserver/qrep.go b/flow/connectors/sqlserver/qrep.go index cd91b1fc9a..6a75373597 100644 --- a/flow/connectors/sqlserver/qrep.go +++ b/flow/connectors/sqlserver/qrep.go @@ -41,7 +41,6 @@ func (c *SQLServerConnector) GetQRepPartitions( } // Query to get the total number of rows in the table - //nolint:gosec countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s %s", config.WatermarkTable, whereClause) var minVal interface{} = nil var totalRows pgtype.Int8 @@ -91,7 +90,6 @@ func (c *SQLServerConnector) GetQRepPartitions( var rows *sqlx.Rows if minVal != nil { // Query to get partitions using window functions - //nolint:gosec partitionsQuery := fmt.Sprintf( `SELECT bucket_v, MIN(v_from) AS start_v, MAX(v_from) AS end_v FROM ( @@ -112,7 +110,6 @@ func (c *SQLServerConnector) GetQRepPartitions( } rows, err = c.db.NamedQuery(partitionsQuery, params) } else { - //nolint:gosec partitionsQuery := fmt.Sprintf( `SELECT bucket_v, MIN(v_from) AS start_v, MAX(v_from) AS end_v FROM ( diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 66722b69ed..a2e82bf559 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -1,4 +1,3 @@ -//nolint:stylecheck package cdc_records import ( diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index b2dd84ea44..7d9b8e429c 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -1,4 +1,3 @@ -//nolint:stylecheck package cdc_records import ( diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 6cbac4c915..4d47d2ff96 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -31,9 +31,9 @@ type PeerFlowE2ETestSuiteBQ struct { func TestPeerFlowE2ETestSuiteBQ(t *testing.T) { got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteBQ { - g := got.New(t) + t.Helper() - // Concurrently run each test + g := got.New(t) g.Parallel() suite := setupSuite(t, g) @@ -100,6 +100,8 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel // setupBigQuery sets up the bigquery connection. func setupBigQuery(t *testing.T) *BigQueryTestHelper { + t.Helper() + bqHelper, err := NewBigQueryTestHelper() if err != nil { slog.Error("Error in test", slog.Any("error", err)) @@ -117,6 +119,8 @@ func setupBigQuery(t *testing.T) *BigQueryTestHelper { // Implement SetupAllSuite interface to setup the test suite func setupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteBQ { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -161,7 +165,7 @@ func (s PeerFlowE2ETestSuiteBQ) tearDownSuite() { func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) // TODO (kaushikiska): ensure flow name can only be alpha numeric and underscores. limits := peerflow.CDCFlowLimits{ @@ -183,7 +187,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() { func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_no_data") dstTableName := "test_no_data" @@ -227,7 +231,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_char_coltype") dstTableName := "test_char_coltype" @@ -274,7 +278,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { // correctly synced to the destination table after sync flow completes. func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_bq") dstTableName := "test_simple_flow_bq" @@ -341,7 +345,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_1") dstTableName := "test_toast_bq_1" @@ -409,7 +413,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_2") dstTableName := "test_toast_bq_2" @@ -473,7 +477,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_3") dstTableName := "test_toast_bq_3" @@ -547,7 +551,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_4") dstTableName := "test_toast_bq_4" @@ -614,7 +618,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_bq_5") dstTableName := "test_toast_bq_5" @@ -681,7 +685,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_types_bq") dstTableName := "test_types_bq" @@ -760,7 +764,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_bq") dstTable1Name := "test1_bq" @@ -822,7 +826,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { // TODO: not checking schema exactly, add later func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := "test_simple_schema_changes" @@ -922,7 +926,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := "test_simple_cpkey" @@ -995,7 +999,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := "test_cpkey_toast1" @@ -1072,7 +1076,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := "test_cpkey_toast2" @@ -1145,7 +1149,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") dstTableName := "test_peerdb_cols_dst" @@ -1208,7 +1212,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_bq") dstTable1Name := "test1_bq" @@ -1276,7 +1280,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1364,7 +1368,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1448,7 +1452,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1536,7 +1540,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") dstTableName := "test_softdel_iad" diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index ca74a412c3..e6d3ac9f5b 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -48,7 +48,7 @@ func (s PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsStr func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -83,7 +83,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index da050ccf64..779beeea2a 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -42,7 +42,7 @@ func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, r func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_simple_flow") dstTableName := s.attachSchemaSuffix("test_simple_flow_dst") @@ -105,7 +105,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := s.attachSchemaSuffix("test_simple_schema_changes_dst") @@ -266,7 +266,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := s.attachSchemaSuffix("test_simple_cpkey_dst") @@ -341,7 +341,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst") @@ -422,7 +422,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") @@ -499,7 +499,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) srcTableName := s.attachSchemaSuffix("test_peerdb_cols") dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 0bb886f9a3..880e2d19e4 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -158,15 +158,13 @@ func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) numRows := 10 - //nolint:gosec srcTable := "test_qrep_flow_avro_pg_1" s.setupSourceTable(srcTable, numRows) - //nolint:gosec dstTable := "test_qrep_flow_avro_pg_2" err := e2e.CreateTableForQRep(s.pool, postgresSuffix, dstTable) @@ -210,15 +208,13 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) numRows := 10 - //nolint:gosec srcTable := "test_qrep_columns_pg_1" s.setupSourceTable(srcTable, numRows) - //nolint:gosec dstTable := "test_qrep_columns_pg_2" srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index a938f673b3..2f4bfd4404 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -20,7 +20,7 @@ func (s *PeerFlowE2ETestSuiteS3) attachSuffix(input string) string { func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) setupErr := s.setupS3("s3") if setupErr != nil { @@ -93,7 +93,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { func (s *PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_GCS_Interop() { env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) setupErr := s.setupS3("gcs") if setupErr != nil { s.Fail("failed to setup S3", setupErr) diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index fda57ced09..4fc4f7bf78 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -91,7 +91,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { } env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) jobName := "test_complete_flow_s3" schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) @@ -140,7 +140,7 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { } env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) jobName := "test_complete_flow_s3_ctid" schemaQualifiedName := fmt.Sprintf("e2e_test_%s.%s", s3Suffix, jobName) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 8d521dbb72..f979198635 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -34,9 +34,9 @@ type PeerFlowE2ETestSuiteSF struct { func TestPeerFlowE2ETestSuiteSF(t *testing.T) { got.Each(t, func(t *testing.T) PeerFlowE2ETestSuiteSF { - g := got.New(t) + t.Helper() - // Concurrently run each test + g := got.New(t) g.Parallel() suite := SetupSuite(t, g) @@ -58,6 +58,8 @@ func (s PeerFlowE2ETestSuiteSF) attachSuffix(input string) string { } func SetupSuite(t *testing.T, g got.G) PeerFlowE2ETestSuiteSF { + t.Helper() + err := godotenv.Load() if err != nil { // it's okay if the .env file is not present @@ -125,7 +127,7 @@ func (s PeerFlowE2ETestSuiteSF) tearDownSuite() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_flow_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_flow_sf") @@ -200,7 +202,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_replica_identity_no_pkey") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_replica_identity_no_pkey") @@ -266,7 +268,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_invalid_geo_sf_avro_cdc") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_invalid_geo_sf_avro_cdc") @@ -351,7 +353,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_1") @@ -418,7 +420,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_2") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_2") @@ -487,7 +489,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_3") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_3") @@ -560,7 +562,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_4") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_4") @@ -626,7 +628,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_toast_sf_5") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_toast_sf_5") @@ -692,7 +694,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_types_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf") @@ -771,7 +773,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTable1Name := s.attachSchemaSuffix("test1_sf") srcTable2Name := s.attachSchemaSuffix("test2_sf") @@ -830,7 +832,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_schema_changes") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_schema_changes") @@ -990,7 +992,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_simple_cpkey") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_simple_cpkey") @@ -1063,7 +1065,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast1") @@ -1139,7 +1141,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast2") @@ -1211,7 +1213,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_exclude_sf") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_exclude_sf") @@ -1294,7 +1296,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1381,7 +1383,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1464,7 +1466,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") srcTableName := fmt.Sprintf("%s_src", cmpTableName) @@ -1551,7 +1553,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("test_softdel_iad") dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iad") diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index b3cd9b9c2a..4499f7f437 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/require" ) -// nolint:unparam +//nolint:unparam func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) { err := e2e.CreateTableForQRep(s.pool, s.pgSuffix, tableName) require.NoError(s.t, err) @@ -58,7 +58,7 @@ func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selecto func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -99,7 +99,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -144,7 +144,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -186,7 +186,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -232,7 +232,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 @@ -278,7 +278,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { env := e2e.NewTemporalTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.t) + e2e.RegisterWorkflowsAndActivities(s.t, env) numRows := 10 diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index f8a06733b1..c69205dfd1 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -33,6 +33,8 @@ func setupSchemaDeltaSuite( t *testing.T, g got.G, ) SnowflakeSchemaDeltaTestSuite { + t.Helper() + sfTestHelper, err := NewSnowflakeTestHelper() if err != nil { slog.Error("Error in test", slog.Any("error", err)) @@ -221,8 +223,9 @@ func (suite SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { func TestSnowflakeSchemaDeltaTestSuite(t *testing.T) { got.Each(t, func(t *testing.T) SnowflakeSchemaDeltaTestSuite { - g := got.New(t) + t.Helper() + g := got.New(t) g.Parallel() suite := setupSchemaDeltaSuite(t, g) diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 2d327458af..5486583d86 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -97,7 +97,6 @@ func (s *PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName s params["status"] = 1 _, err := s.sqlsHelper.E.NamedExec( - //nolint:lll "INSERT INTO "+schemaQualified+" (id, card_id, v_from, price, status) VALUES (:id, :card_id, :v_from, :price, :status)", params, ) @@ -136,7 +135,7 @@ func (s *PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append } env := s.NewTestWorkflowEnvironment() - e2e.RegisterWorkflowsAndActivities(env, s.T()) + e2e.RegisterWorkflowsAndActivities(s.T(), env) numRows := 10 tblName := "test_qrep_flow_avro_ss_append" diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 8bea8cf984..83f4760acd 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -43,7 +43,9 @@ func ReadFileToBytes(path string) ([]byte, error) { return ret, nil } -func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment, t *testing.T) { +func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnvironment) { + t.Helper() + conn, err := utils.GetCatalogConnectionPoolFromEnv() if err != nil { t.Fatalf("unable to create catalog connection pool: %v", err) diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index cdefa6a376..7f8c5615a8 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -72,8 +72,8 @@ func PeerDBSlotLagMBAlertThreshold() uint32 { // PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely func PeerDBAlertingGapMinutesAsDuration() time.Duration { - why := time.Duration(getEnvUint32("PEERDB_ALERTING_GAP_MINUTES", 15)) - return why * time.Minute + why := int64(getEnvUint32("PEERDB_ALERTING_GAP_MINUTES", 15)) + return time.Duration(why) * time.Minute } // PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely