diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index b6294a42b0..5d3bf1e679 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -81,7 +81,7 @@ func NewBigQueryServiceAccount(bqConfig *protos.BigqueryConfig) (*BigQueryServic // Validate validates a BigQueryServiceAccount, that none of the fields are empty. func (bqsa *BigQueryServiceAccount) Validate() error { v := reflect.ValueOf(*bqsa) - for i := 0; i < v.NumField(); i++ { + for i := range v.NumField() { if v.Field(i).String() == "" { return fmt.Errorf("field %s is empty", v.Type().Field(i).Name) } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 6517ecd011..4a4ecd3b75 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "fmt" "log/slog" + "slices" "time" "github.com/jackc/pglogrepl" @@ -786,19 +787,19 @@ func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest, rec model.Record, ) (*model.TableWithPkey, error) { tableName := rec.GetDestinationTableName() - pkeyColsMerged := make([]byte, 0) + pkeyColsMerged := make([][]byte, 0, len(req.TableNameSchemaMapping[tableName].PrimaryKeyColumns)) for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns { pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol) if err != nil { return nil, fmt.Errorf("error getting pkey column value: %w", err) } - pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value))...) + pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value))) } return &model.TableWithPkey{ TableName: tableName, - PkeyColVal: sha256.Sum256(pkeyColsMerged), + PkeyColVal: sha256.Sum256(slices.Concat(pkeyColsMerged...)), }, nil } diff --git a/flow/connectors/postgres/qrep_bench_test.go b/flow/connectors/postgres/qrep_bench_test.go index c36ee844ca..406d74a00b 100644 --- a/flow/connectors/postgres/qrep_bench_test.go +++ b/flow/connectors/postgres/qrep_bench_test.go @@ -29,7 +29,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) { // Run the benchmark b.ResetTimer() - for i := 0; i < b.N; i++ { + for i := range b.N { // log the iteration b.Logf("iteration %d", i) diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index b1aabfb60b..9ed69968a4 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -197,8 +197,8 @@ func TestGetQRepPartitions(t *testing.T) { expected := tc.want assert.Equal(t, len(expected), len(got)) - for i := 0; i < len(expected); i++ { - er := expected[i].Range.Range.(*protos.PartitionRange_TimestampRange).TimestampRange + for i, val := range expected { + er := val.Range.Range.(*protos.PartitionRange_TimestampRange).TimestampRange gotr := got[i].Range.Range.(*protos.PartitionRange_TimestampRange).TimestampRange assert.Equal(t, er.Start.AsTime(), gotr.Start.AsTime()) assert.Equal(t, er.End.AsTime(), gotr.End.AsTime()) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index cc830d289a..9382f59c54 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -515,9 +515,7 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No g, gCtx := errgroup.WithContext(ctx) g.SetLimit(8) // limit parallel merges to 8 - for _, destinationTableName := range destinationTableNames { - tableName := destinationTableName // local variable for the closure - + for _, tableName := range destinationTableNames { g.Go(func() error { mergeGen := &mergeStmtGenerator{ rawTableName: getRawTableIdentifier(req.FlowJobName), diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index a1be8d6a5a..5d4fa6cadd 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -310,7 +310,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -724,7 +724,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 4 invalid shapes and 6 valid shapes into the source table - for i := 0; i < 4; i++ { + for range 4 { _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,"polyPoly") VALUES ($1,$2) `, srcTableName), "010200000001000000000000000000F03F0000000000000040", @@ -736,7 +736,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 4 invalid geography rows into the source table") - for i := 4; i < 10; i++ { + for range 6 { _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,"polyPoly") VALUES ($1,$2) `, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040", @@ -949,7 +949,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) @@ -1013,7 +1013,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { e2e.EnvNoError(s.t, env, err) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) @@ -1073,7 +1073,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index d99479a781..0f5cbfde13 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -110,7 +110,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -502,7 +502,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) @@ -570,7 +570,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { e2e.EnvNoError(s.t, env, err) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000)) @@ -636,7 +636,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000)) @@ -1084,7 +1084,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table - for i := 0; i < 20; i++ { + for i := range 10 { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -1157,7 +1157,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { } addRows := func(numRows int) { - for i := 0; i < numRows; i++ { + for range numRows { _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name)) e2e.EnvNoError(s.t, env, err) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index d9b2697980..777a588e93 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -157,7 +157,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table - for i := 0; i < 20; i++ { + for i := range 20 { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -218,7 +218,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table - for i := 0; i < 20; i++ { + for i := range 20 { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -270,7 +270,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 4 invalid shapes and 6 valid shapes into the source table - for i := 0; i < 4; i++ { + for range 4 { _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,poly) VALUES ($1,$2) `, srcTableName), "010200000001000000000000000000F03F0000000000000040", @@ -282,7 +282,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { e2e.EnvNoError(s.t, env, err) } s.t.Log("Inserted 4 invalid geography rows into the source table") - for i := 4; i < 10; i++ { + for range 6 { _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s (line,poly) VALUES ($1,$2) `, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040", @@ -939,7 +939,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t) VALUES ($1,$2) @@ -1000,7 +1000,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { e2e.EnvNoError(s.t, env, err) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) @@ -1061,7 +1061,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) @@ -1130,7 +1130,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 10 rows into the source table - for i := 0; i < 10; i++ { + for i := range 10 { testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100)) @@ -1510,7 +1510,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { go func() { e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen) // insert 20 rows into the source table - for i := 0; i < 20; i++ { + for i := range 20 { testKey := fmt.Sprintf("test_key_%d", i) testValue := fmt.Sprintf("test_value_%d", i) _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 74065b98ea..2ac659a1b5 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -99,13 +99,14 @@ func (s PeerFlowE2ETestSuiteSQLServer) setupSQLServerTable(tableName string) { func (s PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName string, numRows int) { schemaQualified := fmt.Sprintf("%s.%s", s.sqlsHelper.SchemaName, tableName) - for i := 0; i < numRows; i++ { - params := make(map[string]interface{}) - params["id"] = "test_id_" + strconv.Itoa(i) - params["card_id"] = "test_card_id_" + strconv.Itoa(i) - params["v_from"] = time.Now() - params["price"] = 100.00 - params["status"] = 1 + for i := range numRows { + params := map[string]interface{}{ + "id": "test_id_" + strconv.Itoa(i), + "card_id": "test_card_id_" + strconv.Itoa(i), + "v_from": time.Now(), + "price": 100.00, + "status": 1, + } _, err := s.sqlsHelper.E.NamedExec( context.Background(), diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 13e90c22a6..93136809b2 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -307,7 +307,7 @@ func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error { func generate20MBJson() ([]byte, error) { xn := make(map[string]interface{}, 215000) - for i := 0; i < 215000; i++ { + for range 215000 { xn[uuid.New().String()] = uuid.New().String() } @@ -320,11 +320,13 @@ func generate20MBJson() ([]byte, error) { } func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCount int) error { - var ids []string - var rows []string - for i := 0; i < rowCount-1; i++ { + var id0 string + rows := make([]string, 0, rowCount) + for i := range rowCount - 1 { id := uuid.New().String() - ids = append(ids, id) + if i == 0 { + id0 = id + } row := fmt.Sprintf(` ( '%s', '%s', CURRENT_TIMESTAMP, 3.86487206688919, CURRENT_TIMESTAMP, @@ -390,14 +392,14 @@ func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCou return err } - // generate a 20 MB json and update id[0]'s col f5 to it + // generate a 20 MB json and update id0's col f5 to it v, err := generate20MBJson() if err != nil { return err } _, err = conn.Exec(context.Background(), fmt.Sprintf(` UPDATE e2e_test_%s.%s SET f5 = $1 WHERE id = $2; - `, suffix, tableName), v, ids[0]) + `, suffix, tableName), v, id0) if err != nil { return err } @@ -405,7 +407,7 @@ func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCou // update my_date to a date before 1970 _, err = conn.Exec(context.Background(), fmt.Sprintf(` UPDATE e2e_test_%s.%s SET old_date = '1950-01-01' WHERE id = $1; - `, suffix, tableName), ids[0]) + `, suffix, tableName), id0) if err != nil { return err } diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 721302551e..b242ebb1ea 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -16,10 +16,9 @@ func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T)) t.Helper() t.Parallel() - // can be replaced with reflect.TypeFor[T]() in go 1.22 - typ := reflect.TypeOf((*T)(nil)).Elem() + typ := reflect.TypeFor[T]() mcount := typ.NumMethod() - for i := 0; i < mcount; i++ { + for i := range mcount { m := typ.Method(i) if strings.HasPrefix(m.Name, "Test") { if m.Type.NumIn() == 1 && m.Type.NumOut() == 0 {