Skip to content

Commit

Permalink
go 1.22 features (#1385)
Browse files Browse the repository at this point in the history
1. reflect.TypeOf
2. no need to copy loop variable for closure
3. `range N` replaces basic count-to-N loop
  • Loading branch information
serprex authored Feb 27, 2024
1 parent 04c27fe commit 7df7735
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 49 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewBigQueryServiceAccount(bqConfig *protos.BigqueryConfig) (*BigQueryServic
// Validate validates a BigQueryServiceAccount, that none of the fields are empty.
func (bqsa *BigQueryServiceAccount) Validate() error {
v := reflect.ValueOf(*bqsa)
for i := 0; i < v.NumField(); i++ {
for i := range v.NumField() {
if v.Field(i).String() == "" {
return fmt.Errorf("field %s is empty", v.Type().Field(i).Name)
}
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"fmt"
"log/slog"
"slices"
"time"

"github.com/jackc/pglogrepl"
Expand Down Expand Up @@ -786,19 +787,19 @@ func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
rec model.Record,
) (*model.TableWithPkey, error) {
tableName := rec.GetDestinationTableName()
pkeyColsMerged := make([]byte, 0)
pkeyColsMerged := make([][]byte, 0, len(req.TableNameSchemaMapping[tableName].PrimaryKeyColumns))

for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns {
pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol)
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value))...)
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value)))
}

return &model.TableWithPkey{
TableName: tableName,
PkeyColVal: sha256.Sum256(pkeyColsMerged),
PkeyColVal: sha256.Sum256(slices.Concat(pkeyColsMerged...)),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func BenchmarkQRepQueryExecutor(b *testing.B) {

// Run the benchmark
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := range b.N {
// log the iteration
b.Logf("iteration %d", i)

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ func TestGetQRepPartitions(t *testing.T) {
expected := tc.want
assert.Equal(t, len(expected), len(got))

for i := 0; i < len(expected); i++ {
er := expected[i].Range.Range.(*protos.PartitionRange_TimestampRange).TimestampRange
for i, val := range expected {
er := val.Range.Range.(*protos.PartitionRange_TimestampRange).TimestampRange
gotr := got[i].Range.Range.(*protos.PartitionRange_TimestampRange).TimestampRange
assert.Equal(t, er.Start.AsTime(), gotr.Start.AsTime())
assert.Equal(t, er.End.AsTime(), gotr.End.AsTime())
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,7 @@ func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.No
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(8) // limit parallel merges to 8

for _, destinationTableName := range destinationTableNames {
tableName := destinationTableName // local variable for the closure

for _, tableName := range destinationTableNames {
g.Go(func() error {
mergeGen := &mergeStmtGenerator{
rawTableName: getRawTableIdentifier(req.FlowJobName),
Expand Down
12 changes: 6 additions & 6 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -724,7 +724,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 4 invalid shapes and 6 valid shapes into the source table
for i := 0; i < 4; i++ {
for range 4 {
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,"polyPoly") VALUES ($1,$2)
`, srcTableName), "010200000001000000000000000000F03F0000000000000040",
Expand All @@ -736,7 +736,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 4 invalid geography rows into the source table")
for i := 4; i < 10; i++ {
for range 6 {
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,"polyPoly") VALUES ($1,$2)
`, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040",
Expand Down Expand Up @@ -949,7 +949,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t) VALUES ($1,$2)
Expand Down Expand Up @@ -1013,7 +1013,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
e2e.EnvNoError(s.t, env, err)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
Expand Down Expand Up @@ -1073,7 +1073,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
Expand Down
12 changes: 6 additions & 6 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -502,7 +502,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t) VALUES ($1,$2)
Expand Down Expand Up @@ -570,7 +570,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
e2e.EnvNoError(s.t, env, err)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000))
Expand Down Expand Up @@ -636,7 +636,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000))
Expand Down Expand Up @@ -1084,7 +1084,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 20 rows into the source table
for i := 0; i < 20; i++ {
for i := range 10 {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
}

addRows := func(numRows int) {
for i := 0; i < numRows; i++ {
for range numRows {
_, err = s.Conn().Exec(context.Background(),
fmt.Sprintf(`INSERT INTO %s DEFAULT VALUES`, srcTable1Name))
e2e.EnvNoError(s.t, env, err)
Expand Down
18 changes: 9 additions & 9 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 20 rows into the source table
for i := 0; i < 20; i++ {
for i := range 20 {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -218,7 +218,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 20 rows into the source table
for i := 0; i < 20; i++ {
for i := range 20 {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand Down Expand Up @@ -270,7 +270,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 4 invalid shapes and 6 valid shapes into the source table
for i := 0; i < 4; i++ {
for range 4 {
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,poly) VALUES ($1,$2)
`, srcTableName), "010200000001000000000000000000F03F0000000000000040",
Expand All @@ -282,7 +282,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() {
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 4 invalid geography rows into the source table")
for i := 4; i < 10; i++ {
for range 6 {
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (line,poly) VALUES ($1,$2)
`, srcTableName), "010200000002000000000000000000F03F000000000000004000000000000008400000000000001040",
Expand Down Expand Up @@ -939,7 +939,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t) VALUES ($1,$2)
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() {
e2e.EnvNoError(s.t, env, err)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
Expand Down Expand Up @@ -1061,7 +1061,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))
Expand Down Expand Up @@ -1130,7 +1130,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)

// insert 10 rows into the source table
for i := 0; i < 10; i++ {
for i := range 10 {
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(100))
Expand Down Expand Up @@ -1510,7 +1510,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() {
go func() {
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 20 rows into the source table
for i := 0; i < 20; i++ {
for i := range 20 {
testKey := fmt.Sprintf("test_key_%d", i)
testValue := fmt.Sprintf("test_value_%d", i)
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
Expand Down
15 changes: 8 additions & 7 deletions flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ func (s PeerFlowE2ETestSuiteSQLServer) setupSQLServerTable(tableName string) {

func (s PeerFlowE2ETestSuiteSQLServer) insertRowsIntoSQLServerTable(tableName string, numRows int) {
schemaQualified := fmt.Sprintf("%s.%s", s.sqlsHelper.SchemaName, tableName)
for i := 0; i < numRows; i++ {
params := make(map[string]interface{})
params["id"] = "test_id_" + strconv.Itoa(i)
params["card_id"] = "test_card_id_" + strconv.Itoa(i)
params["v_from"] = time.Now()
params["price"] = 100.00
params["status"] = 1
for i := range numRows {
params := map[string]interface{}{
"id": "test_id_" + strconv.Itoa(i),
"card_id": "test_card_id_" + strconv.Itoa(i),
"v_from": time.Now(),
"price": 100.00,
"status": 1,
}

_, err := s.sqlsHelper.E.NamedExec(
context.Background(),
Expand Down
18 changes: 10 additions & 8 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error {

func generate20MBJson() ([]byte, error) {
xn := make(map[string]interface{}, 215000)
for i := 0; i < 215000; i++ {
for range 215000 {
xn[uuid.New().String()] = uuid.New().String()
}

Expand All @@ -320,11 +320,13 @@ func generate20MBJson() ([]byte, error) {
}

func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCount int) error {
var ids []string
var rows []string
for i := 0; i < rowCount-1; i++ {
var id0 string
rows := make([]string, 0, rowCount)
for i := range rowCount - 1 {
id := uuid.New().String()
ids = append(ids, id)
if i == 0 {
id0 = id
}
row := fmt.Sprintf(`
(
'%s', '%s', CURRENT_TIMESTAMP, 3.86487206688919, CURRENT_TIMESTAMP,
Expand Down Expand Up @@ -390,22 +392,22 @@ func PopulateSourceTable(conn *pgx.Conn, suffix string, tableName string, rowCou
return err
}

// generate a 20 MB json and update id[0]'s col f5 to it
// generate a 20 MB json and update id0's col f5 to it
v, err := generate20MBJson()
if err != nil {
return err
}
_, err = conn.Exec(context.Background(), fmt.Sprintf(`
UPDATE e2e_test_%s.%s SET f5 = $1 WHERE id = $2;
`, suffix, tableName), v, ids[0])
`, suffix, tableName), v, id0)
if err != nil {
return err
}

// update my_date to a date before 1970
_, err = conn.Exec(context.Background(), fmt.Sprintf(`
UPDATE e2e_test_%s.%s SET old_date = '1950-01-01' WHERE id = $1;
`, suffix, tableName), ids[0])
`, suffix, tableName), id0)
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions flow/e2eshared/e2eshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ func RunSuite[T any](t *testing.T, setup func(t *testing.T) T, teardown func(T))
t.Helper()
t.Parallel()

// can be replaced with reflect.TypeFor[T]() in go 1.22
typ := reflect.TypeOf((*T)(nil)).Elem()
typ := reflect.TypeFor[T]()
mcount := typ.NumMethod()
for i := 0; i < mcount; i++ {
for i := range mcount {
m := typ.Method(i)
if strings.HasPrefix(m.Name, "Test") {
if m.Type.NumIn() == 1 && m.Type.NumOut() == 0 {
Expand Down

0 comments on commit 7df7735

Please sign in to comment.