diff --git a/flow/.golangci.yml b/flow/.golangci.yml index 9897773d62..eee69b73a1 100644 --- a/flow/.golangci.yml +++ b/flow/.golangci.yml @@ -7,6 +7,7 @@ linters: - durationcheck - errcheck - forbidigo + - gocritic - gofumpt - gosec - gosimple @@ -15,6 +16,7 @@ linters: - misspell - nakedret - nolintlint + - nonamedreturns - prealloc - staticcheck - stylecheck @@ -28,6 +30,9 @@ linters: - wastedassign - whitespace linters-settings: + gocritic: + disabled-checks: + - ifElseChain stylecheck: checks: - all diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index de51b0feb9..59f58b1379 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -37,8 +37,8 @@ func NewClickhouseConnector(ctx context.Context, } func connect(ctx context.Context, config *protos.ClickhouseConfig) (*sql.DB, error) { - dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", //&database=%s" - config.Host, config.Port, config.User, config.Password) //, config.Database + dsn := fmt.Sprintf("tcp://%s:%d?username=%s&password=%s", // TODO &database=%s" + config.Host, config.Port, config.User, config.Password) // TODO , config.Database conn, err := sql.Open("clickhouse", dsn) if err != nil { diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 74ffe26524..77ca30035b 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -107,7 +107,7 @@ func (c *ClickhouseConnector) isPartitionSynced(partitionID string) (bool, error } func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) error { - err := c.createQRepMetadataTable() //(createMetadataTablesTx) + err := c.createQRepMetadataTable() if err != nil { return err } @@ -122,7 +122,7 @@ func (c *ClickhouseConnector) SetupQRepMetadataTables(config *protos.QRepConfig) return nil } -func (c *ClickhouseConnector) createQRepMetadataTable() error { // createMetadataTableTx *sql.Tx +func (c *ClickhouseConnector) createQRepMetadataTable() error { // Define the schema schemaStatement := ` CREATE TABLE IF NOT EXISTS %s ( diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index bf5bb98c46..51f5d025f5 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -3,7 +3,6 @@ package connpostgres import ( "errors" "fmt" - "log" "regexp" "strconv" "strings" @@ -557,12 +556,12 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, sync for rows.Next() { err := rows.Scan(&destinationTableName, &unchangedToastColumns) if err != nil { - log.Fatalf("Failed to scan row: %v", err) + return nil, fmt.Errorf("failed to scan row: %w", err) } resultMap[destinationTableName.String] = unchangedToastColumns } if err := rows.Err(); err != nil { - log.Fatalf("Error iterating over rows: %v", err) + return nil, fmt.Errorf("error iterating over rows: %w", err) } return resultMap, nil } diff --git a/flow/connectors/postgres/escape.go b/flow/connectors/postgres/escape.go index 280d108338..783c0cd7c7 100644 --- a/flow/connectors/postgres/escape.go +++ b/flow/connectors/postgres/escape.go @@ -21,14 +21,14 @@ func QuoteLiteral(literal string) string { // https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/interfaces/libpq/fe-exec.c // // substitute any single-quotes (') with two single-quotes ('') - literal = strings.Replace(literal, `'`, `''`, -1) + literal = strings.ReplaceAll(literal, `'`, `''`) // determine if the string has any backslashes (\) in it. // if it does, replace any backslashes (\) with two backslashes (\\) // then, we need to wrap the entire string with a PostgreSQL // C-style escape. Per how "PQEscapeStringInternal" handles this case, we // also add a space before the "E" if strings.Contains(literal, `\`) { - literal = strings.Replace(literal, `\`, `\\`, -1) + literal = strings.ReplaceAll(literal, `\`, `\\`) literal = ` E'` + literal + `'` } else { // otherwise, we can just wrap the literal with a pair of single quotes @@ -53,5 +53,5 @@ func QuoteIdentifier(name string) string { if end > -1 { name = name[:end] } - return `"` + strings.Replace(name, `"`, `""`, -1) + `"` + return `"` + strings.ReplaceAll(name, `"`, `""`) + `"` } diff --git a/flow/connectors/postgres/ssh_wrapped_pool.go b/flow/connectors/postgres/ssh_wrapped_pool.go index a82356a721..9922c41675 100644 --- a/flow/connectors/postgres/ssh_wrapped_pool.go +++ b/flow/connectors/postgres/ssh_wrapped_pool.go @@ -137,18 +137,21 @@ func (swpp *SSHWrappedPostgresPool) Close() { type retryFunc func() error -func retryWithBackoff(fn retryFunc, maxRetries int, backoff time.Duration) (err error) { - for i := 0; i < maxRetries; i++ { - err = fn() +func retryWithBackoff(fn retryFunc, maxRetries int, backoff time.Duration) error { + i := 0 + for { + err := fn() if err == nil { return nil } - if i < maxRetries-1 { + i += 1 + if i < maxRetries { slog.Info(fmt.Sprintf("Attempt #%d failed, retrying in %s", i+1, backoff)) time.Sleep(backoff) + } else { + return err } } - return err } // see: https://github.com/jackc/pgx/issues/382#issuecomment-1496586216 diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index d2fc3a477d..29f41d5995 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "log/slog" "regexp" "strings" @@ -383,12 +382,12 @@ func (c *SnowflakeConnector) getTableNametoUnchangedCols(flowJobName string, syn var r UnchangedToastColumnResult err := rows.Scan(&r.TableName, &r.UnchangedToastColumns) if err != nil { - log.Fatalf("Failed to scan row: %v", err) + return nil, fmt.Errorf("failed to scan row: %w", err) } resultMap[r.TableName] = r.UnchangedToastColumns } if err := rows.Err(); err != nil { - log.Fatalf("Error iterating over rows: %v", err) + return nil, fmt.Errorf("error iterating over rows: %w", err) } return resultMap, nil } diff --git a/flow/connectors/utils/partition/partition.go b/flow/connectors/utils/partition/partition.go index cb2f326a66..37ecd15f37 100644 --- a/flow/connectors/utils/partition/partition.go +++ b/flow/connectors/utils/partition/partition.go @@ -45,14 +45,12 @@ func compareValues(prevEnd interface{}, start interface{}) int { return -1 } else if pe.BlockNumber > v.BlockNumber { return 1 + } else if pe.OffsetNumber < v.OffsetNumber { + return -1 + } else if pe.OffsetNumber > v.OffsetNumber { + return 1 } else { - if pe.OffsetNumber < v.OffsetNumber { - return -1 - } else if pe.OffsetNumber > v.OffsetNumber { - return 1 - } else { - return 0 - } + return 0 } case uint32: // xmin if prevEnd.(uint32) < v { diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 445488266e..7d2c5bce6b 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -382,11 +382,11 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor } // returns whether the function errors or there are nulls -func (b *BigQueryTestHelper) CheckNull(tableName string, ColName []string) (bool, error) { - if len(ColName) == 0 { +func (b *BigQueryTestHelper) CheckNull(tableName string, colName []string) (bool, error) { + if len(colName) == 0 { return true, nil } - joinedString := strings.Join(ColName, " is null or ") + " is null" + joinedString := strings.Join(colName, " is null or ") + " is null" command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s` WHERE %s", b.Config.DatasetId, tableName, joinedString) q := b.client.Query(command) @@ -419,8 +419,8 @@ func (b *BigQueryTestHelper) CheckNull(tableName string, ColName []string) (bool } // check if NaN, Inf double values are null -func (b *BigQueryTestHelper) CheckDoubleValues(tableName string, ColName []string) (bool, error) { - csep := strings.Join(ColName, ",") +func (b *BigQueryTestHelper) CheckDoubleValues(tableName string, colName []string) (bool, error) { + csep := strings.Join(colName, ",") command := fmt.Sprintf("SELECT %s FROM `%s.%s`", csep, b.Config.DatasetId, tableName) q := b.client.Query(command) diff --git a/flow/hstore/hstore.go b/flow/hstore/hstore.go index 0253fef2b1..cbb7d60c8a 100644 --- a/flow/hstore/hstore.go +++ b/flow/hstore/hstore.go @@ -40,11 +40,11 @@ func (p *hstoreParser) atEnd() bool { } // consume returns the next byte of the string, or end if the string is done. -func (p *hstoreParser) consume() (b byte, end bool) { +func (p *hstoreParser) consume() (byte, bool) { if p.pos >= len(p.str) { return 0, true } - b = p.str[p.pos] + b := p.str[p.pos] p.pos++ return b, false } diff --git a/flow/shared/signals.go b/flow/shared/signals.go index 2097ba95c5..5e441d0110 100644 --- a/flow/shared/signals.go +++ b/flow/shared/signals.go @@ -7,16 +7,17 @@ import ( func FlowSignalHandler(activeSignal CDCFlowSignal, v CDCFlowSignal, logger log.Logger, ) CDCFlowSignal { - if v == ShutdownSignal { + switch v { + case ShutdownSignal: logger.Info("received shutdown signal") return v - } else if v == PauseSignal { + case PauseSignal: logger.Info("received pause signal") if activeSignal == NoopSignal { logger.Info("workflow was running, pausing it") return v } - } else if v == NoopSignal { + case NoopSignal: logger.Info("received resume signal") if activeSignal == PauseSignal { logger.Info("workflow was paused, resuming it")