Skip to content

Commit

Permalink
flow lints: enable gocritic & nonamedreturns (#1124)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 22, 2024
1 parent f0763be commit 6ef3707
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 35 deletions.
5 changes: 5 additions & 0 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ linters:
- durationcheck
- errcheck
- forbidigo
- gocritic
- gofumpt
- gosec
- gosimple
Expand All @@ -15,6 +16,7 @@ linters:
- misspell
- nakedret
- nolintlint
- nonamedreturns
- prealloc
- staticcheck
- stylecheck
Expand All @@ -28,6 +30,9 @@ linters:
- wastedassign
- whitespace
linters-settings:
gocritic:
disabled-checks:
- ifElseChain
stylecheck:
checks:
- all
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewClickhouseConnector(ctx context.Context,
}

func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) {
dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", //&database=%s"
config.Host, config.Port, config.User, config.Password) //, config.Database
dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", // TODO &database=%s"
config.Host, config.Port, config.User, config.Password) // TODO , config.Database

conn, err := sql.Open("clickhouse", dsn)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *ClickhouseConnector) isPartitionSynced(partitionID string) (bool, error
}

func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error {
err := c.createQRepMetadataTable() //(createMetadataTablesTx)
err := c.createQRepMetadataTable()
if err != nil {
return err
}
Expand All @@ -122,7 +122,7 @@ func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig)
return nil
}

func (c *ClickhouseConnector) createQRepMetadataTable() error { // createMetadataTableTx *sql.Tx
func (c *ClickhouseConnector) createQRepMetadataTable() error {
// Define the schema
schemaStatement := `
CREATE TABLE IF NOT EXISTS %s (
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package connpostgres
import (
"errors"
"fmt"
"log"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -557,12 +556,12 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, sync
for rows.Next() {
err := rows.Scan(&destinationTableName, &unchangedToastColumns)
if err != nil {
log.Fatalf("Failed to scan row: %v", err)
return nil, fmt.Errorf("failed to scan row: %w", err)
}
resultMap[destinationTableName.String] = unchangedToastColumns
}
if err := rows.Err(); err != nil {
log.Fatalf("Error iterating over rows: %v", err)
return nil, fmt.Errorf("error iterating over rows: %w", err)
}
return resultMap, nil
}
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/escape.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ func QuoteLiteral(literal string) string {
// https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/interfaces/libpq/fe-exec.c
//
// substitute any single-quotes (') with two single-quotes ('')
literal = strings.Replace(literal, `'`, `''`, -1)
literal = strings.ReplaceAll(literal, `'`, `''`)
// determine if the string has any backslashes (\) in it.
// if it does, replace any backslashes (\) with two backslashes (\\)
// then, we need to wrap the entire string with a PostgreSQL
// C-style escape. Per how "PQEscapeStringInternal" handles this case, we
// also add a space before the "E"
if strings.Contains(literal, `\`) {
literal = strings.Replace(literal, `\`, `\\`, -1)
literal = strings.ReplaceAll(literal, `\`, `\\`)
literal = ` E'` + literal + `'`
} else {
// otherwise, we can just wrap the literal with a pair of single quotes
Expand All @@ -53,5 +53,5 @@ func QuoteIdentifier(name string) string {
if end > -1 {
name = name[:end]
}
return `"` + strings.Replace(name, `"`, `""`, -1) + `"`
return `"` + strings.ReplaceAll(name, `"`, `""`) + `"`
}
13 changes: 8 additions & 5 deletions flow/connectors/postgres/ssh_wrapped_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,21 @@ func (swpp *SSHWrappedPostgresPool) Close() {

type retryFunc func() error

func retryWithBackoff(fn retryFunc, maxRetries int, backoff time.Duration) (err error) {
for i := 0; i < maxRetries; i++ {
err = fn()
func retryWithBackoff(fn retryFunc, maxRetries int, backoff time.Duration) error {
i := 0
for {
err := fn()
if err == nil {
return nil
}
if i < maxRetries-1 {
i += 1
if i < maxRetries {
slog.Info(fmt.Sprintf("Attempt #%d failed, retrying in %s", i+1, backoff))
time.Sleep(backoff)
} else {
return err
}
}
return err
}

// see: https://github.com/jackc/pgx/issues/382#issuecomment-1496586216
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"log/slog"
"regexp"
"strings"
Expand Down Expand Up @@ -383,12 +382,12 @@ func (c *SnowflakeConnector) getTableNametoUnchangedCols(flowJobName string, syn
var r UnchangedToastColumnResult
err := rows.Scan(&r.TableName, &r.UnchangedToastColumns)
if err != nil {
log.Fatalf("Failed to scan row: %v", err)
return nil, fmt.Errorf("failed to scan row: %w", err)
}
resultMap[r.TableName] = r.UnchangedToastColumns
}
if err := rows.Err(); err != nil {
log.Fatalf("Error iterating over rows: %v", err)
return nil, fmt.Errorf("error iterating over rows: %w", err)
}
return resultMap, nil
}
Expand Down
12 changes: 5 additions & 7 deletions flow/connectors/utils/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ func compareValues(prevEnd interface{}, start interface{}) int {
return -1
} else if pe.BlockNumber > v.BlockNumber {
return 1
} else if pe.OffsetNumber < v.OffsetNumber {
return -1
} else if pe.OffsetNumber > v.OffsetNumber {
return 1
} else {
if pe.OffsetNumber < v.OffsetNumber {
return -1
} else if pe.OffsetNumber > v.OffsetNumber {
return 1
} else {
return 0
}
return 0
}
case uint32: // xmin
if prevEnd.(uint32) < v {
Expand Down
10 changes: 5 additions & 5 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,11 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
}

// returns whether the function errors or there are nulls
func (b *BigQueryTestHelper) CheckNull(tableName string, ColName []string) (bool, error) {
if len(ColName) == 0 {
func (b *BigQueryTestHelper) CheckNull(tableName string, colName []string) (bool, error) {
if len(colName) == 0 {
return true, nil
}
joinedString := strings.Join(ColName, " is null or ") + " is null"
joinedString := strings.Join(colName, " is null or ") + " is null"
command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s` WHERE %s",
b.Config.DatasetId, tableName, joinedString)
q := b.client.Query(command)
Expand Down Expand Up @@ -419,8 +419,8 @@ func (b *BigQueryTestHelper) CheckNull(tableName string, ColName []string) (bool
}

// check if NaN, Inf double values are null
func (b *BigQueryTestHelper) CheckDoubleValues(tableName string, ColName []string) (bool, error) {
csep := strings.Join(ColName, ",")
func (b *BigQueryTestHelper) CheckDoubleValues(tableName string, colName []string) (bool, error) {
csep := strings.Join(colName, ",")
command := fmt.Sprintf("SELECT %s FROM `%s.%s`",
csep, b.Config.DatasetId, tableName)
q := b.client.Query(command)
Expand Down
4 changes: 2 additions & 2 deletions flow/hstore/hstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func (p *hstoreParser) atEnd() bool {
}

// consume returns the next byte of the string, or end if the string is done.
func (p *hstoreParser) consume() (b byte, end bool) {
func (p *hstoreParser) consume() (byte, bool) {
if p.pos >= len(p.str) {
return 0, true
}
b = p.str[p.pos]
b := p.str[p.pos]
p.pos++
return b, false
}
Expand Down
7 changes: 4 additions & 3 deletions flow/shared/signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ import (
func FlowSignalHandler(activeSignal CDCFlowSignal,
v CDCFlowSignal, logger log.Logger,
) CDCFlowSignal {
if v == ShutdownSignal {
switch v {
case ShutdownSignal:
logger.Info("received shutdown signal")
return v
} else if v == PauseSignal {
case PauseSignal:
logger.Info("received pause signal")
if activeSignal == NoopSignal {
logger.Info("workflow was running, pausing it")
return v
}
} else if v == NoopSignal {
case NoopSignal:
logger.Info("received resume signal")
if activeSignal == PauseSignal {
logger.Info("workflow was paused, resuming it")
Expand Down

0 comments on commit 6ef3707

Please sign in to comment.