Skip to content

Commit

Permalink
golangci 1.56
Browse files Browse the repository at this point in the history
Also raise go version in go.mod to go 1.22
  • Loading branch information
serprex committed Feb 27, 2024
1 parent 702d050 commit 203c1ab
Show file tree
Hide file tree
Showing 44 changed files with 151 additions and 144 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.55
version: v1.56
working-directory: ./flow
args: --timeout=10m
32 changes: 16 additions & 16 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ func (a *FlowableActivity) CreateNormalizedTable(

numTablesSetup.Add(1)
if !existing {
logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
logger.Info("created table " + tableIdentifier)
} else {
logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier))
logger.Info("table already exists " + tableIdentifier)
}
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func (a *FlowableActivity) SyncFlow(
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("transferring records for job - %s", flowName)
return "transferring records for job - " + flowName
})
defer shutdown()

Expand Down Expand Up @@ -470,7 +470,7 @@ func (a *FlowableActivity) StartNormalize(
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName
})
defer shutdown()

Expand Down Expand Up @@ -538,7 +538,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
return "getting partitions for job - " + config.FlowJobName
})
defer shutdown()

Expand Down Expand Up @@ -629,7 +629,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info(fmt.Sprintf("replicating partition %s", partition.PartitionId))
logger.Info("replicating partition " + partition.PartitionId)
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
Expand Down Expand Up @@ -689,7 +689,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

if rowsSynced == 0 {
logger.Info(fmt.Sprintf("no records to push for partition %s", partition.PartitionId))
logger.Info("no records to push for partition " + partition.PartitionId)
pullCancel()
} else {
wg.Wait()
Expand Down Expand Up @@ -722,7 +722,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
return "consolidating partitions for job - " + config.FlowJobName
})
defer shutdown()

Expand Down Expand Up @@ -886,7 +886,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
}
defer connectors.CloseConnector(ctx, srcConn)

slotName := fmt.Sprintf("peerflow_slot_%s", config.FlowJobName)
slotName := "peerflow_slot_" + config.FlowJobName
if config.ReplicationSlotName != "" {
slotName = config.ReplicationSlotName
}
Expand Down Expand Up @@ -977,26 +977,26 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("renaming tables for job - %s", config.FlowJobName)
return "renaming tables for job - " + config.FlowJobName
})
defer shutdown()

if config.Peer.Type == protos.DBType_SNOWFLAKE {
sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
return nil, errors.New("failed to cast connector to snowflake connector")
}
return sfConn.RenameTables(ctx, config)
} else if config.Peer.Type == protos.DBType_BIGQUERY {
bqConn, ok := dstConn.(*connbigquery.BigQueryConnector)
if !ok {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to cast connector to bigquery connector")
return nil, errors.New("failed to cast connector to bigquery connector")
}
return bqConn.RenameTables(ctx, config)
}
return nil, fmt.Errorf("rename tables is only supported on snowflake and bigquery")
return nil, errors.New("rename tables is only supported on snowflake and bigquery")
}

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
Expand All @@ -1012,18 +1012,18 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr
if req.Peer.Type == protos.DBType_SNOWFLAKE {
sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
return nil, errors.New("failed to cast connector to snowflake connector")
}
return sfConn.CreateTablesFromExisting(ctx, req)
} else if req.Peer.Type == protos.DBType_BIGQUERY {
bqConn, ok := dstConn.(*connbigquery.BigQueryConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to bigquery connector")
return nil, errors.New("failed to cast connector to bigquery connector")
}
return bqConn.CreateTablesFromExisting(ctx, req)
}
a.Alerter.LogFlowError(ctx, req.FlowJobName, err)
return nil, fmt.Errorf("create tables from existing is only supported on snowflake and bigquery")
return nil, errors.New("create tables from existing is only supported on snowflake and bigquery")
}

// ReplicateXminPartition replicates a XminPartition from the source to the destination.
Expand Down
20 changes: 10 additions & 10 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func (h *FlowRequestHandler) MirrorStatus(
slog.Info("Mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName))
cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error()))
slog.Error("unable to query flow", slog.Any("error", err))
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
ErrorMessage: "unable to query flow: " + err.Error(),
}, nil
}

Expand All @@ -36,15 +36,15 @@ func (h *FlowRequestHandler) MirrorStatus(
currState, err := h.getWorkflowStatus(ctx, workflowID)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to get flow state: %s", err.Error()),
ErrorMessage: "unable to get flow state: " + err.Error(),
}, nil
}

if cdcFlow {
cdcStatus, err := h.CDCFlowStatus(ctx, req)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
ErrorMessage: "unable to query flow: " + err.Error(),
}, nil
}

Expand All @@ -59,7 +59,7 @@ func (h *FlowRequestHandler) MirrorStatus(
qrepStatus, err := h.QRepFlowStatus(ctx, req)
if err != nil {
return &protos.MirrorStatusResponse{
ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()),
ErrorMessage: "unable to query flow: " + err.Error(),
}, nil
}

Expand Down Expand Up @@ -144,7 +144,7 @@ func (h *FlowRequestHandler) cloneTableSummary(

rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%")
if err != nil {
slog.Error(fmt.Sprintf("unable to query initial load partition - %s: %s", flowJobName, err.Error()))
slog.Error("unable to query initial load partition - "+flowJobName, slog.Any("error", err))
return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err)
}

Expand Down Expand Up @@ -192,7 +192,7 @@ func (h *FlowRequestHandler) cloneTableSummary(
if configBytes != nil {
var config protos.QRepConfig
if err := proto.Unmarshal(configBytes, &config); err != nil {
slog.Error(fmt.Sprintf("unable to unmarshal config: %s", err.Error()))
slog.Error("unable to unmarshal config", slog.Any("error", err))
return nil, fmt.Errorf("unable to unmarshal config: %w", err)
}
res.TableName = config.DestinationTableIdentifier
Expand Down Expand Up @@ -277,13 +277,13 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
err = h.pool.QueryRow(ctx,
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow config from catalog: %s", err.Error()))
slog.Error("unable to query flow config from catalog", slog.Any("error", err))
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
}

err = proto.Unmarshal(configBytes, &config)
if err != nil {
slog.Error(fmt.Sprintf("unable to unmarshal flow config: %s", err.Error()))
slog.Error("unable to unmarshal flow config", slog.Any("error", err))
return nil, fmt.Errorf("unable to unmarshal flow config: %w", err)
}

Expand Down Expand Up @@ -335,7 +335,7 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string)
var query pgtype.Text
err := h.pool.QueryRow(ctx, "SELECT query_string FROM flows WHERE name = $1", flowJobName).Scan(&query)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error()))
slog.Error("unable to query flow", slog.Any("error", err))
return false, fmt.Errorf("unable to query flow: %w", err)
}

Expand Down
8 changes: 3 additions & 5 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ func (c *BigQueryConnector) SyncFlowCleanup(ctx context.Context, jobName string)
func (c *BigQueryConnector) getRawTableName(flowJobName string) string {
// replace all non-alphanumeric characters with _
flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_")
return fmt.Sprintf("_peerdb_raw_%s", flowJobName)
return "_peerdb_raw_" + flowJobName
}

func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
Expand Down Expand Up @@ -892,11 +892,9 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
}
}

c.logger.Info(fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string()))
c.logger.Info("DROP TABLE IF EXISTS " + dstDatasetTable.string())
// drop the dst table if exists
dropQuery := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string()))
dropQuery := c.client.Query("DROP TABLE IF EXISTS " + dstDatasetTable.string())
dropQuery.DefaultProjectID = c.projectID
dropQuery.DefaultDatasetID = c.datasetID
_, err = dropQuery.Read(ctx)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(

func (c *BigQueryConnector) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error {
if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
query := c.client.Query(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
query := c.client.Query("TRUNCATE TABLE " + config.DestinationTableIdentifier)
query.DefaultDatasetID = c.datasetID
query.DefaultProjectID = c.projectID
_, err := query.Read(ctx)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
flowLog)
}

s.connector.logger.Info(fmt.Sprintf("loaded stage into %s", dstTableName), flowLog)
s.connector.logger.Info("loaded stage into "+dstTableName, flowLog)
return numRecords, nil
}

Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connclickhouse
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"regexp"
Expand All @@ -25,7 +26,7 @@ const (
func (c *ClickhouseConnector) getRawTableName(flowJobName string) string {
// replace all non-alphanumeric characters with _
flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_")
return fmt.Sprintf("_peerdb_raw_%s", flowJobName)
return "_peerdb_raw_" + flowJobName
}

func (c *ClickhouseConnector) checkIfTableExists(ctx context.Context, databaseName string, tableIdentifier string) (bool, error) {
Expand All @@ -36,7 +37,7 @@ func (c *ClickhouseConnector) checkIfTableExists(ctx context.Context, databaseNa
}

if !result.Valid {
return false, fmt.Errorf("[clickhouse] checkIfTableExists: result is not valid")
return false, errors.New("[clickhouse] checkIfTableExists: result is not valid")
}

return result.Int32 == 1, nil
Expand Down Expand Up @@ -118,7 +119,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(

func (c *ClickhouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)
c.logger.Info(fmt.Sprintf("pushing records to Clickhouse table %s", rawTableName))
c.logger.Info("pushing records to Clickhouse table " + rawTableName)

res, err := c.syncRecordsViaAvro(ctx, req, rawTableName, req.SyncBatchID)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/clickhouse/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *ClickhouseConnector) SetupQRepMetadataTables(ctx context.Context, confi
}

if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.database.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
_, err = c.database.ExecContext(ctx, "TRUNCATE TABLE "+config.DestinationTableIdentifier)
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
Expand All @@ -140,12 +140,12 @@ func (c *ClickhouseConnector) createQRepMetadataTable(ctx context.Context) error
queryString := fmt.Sprintf(schemaStatement, qRepMetadataTableName)
_, err := c.database.ExecContext(ctx, queryString)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to create table %s", qRepMetadataTableName),
c.logger.Error("failed to create table "+qRepMetadataTableName,
slog.Any("error", err))

return fmt.Errorf("failed to create table %s: %w", qRepMetadataTableName, err)
}
c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName))
c.logger.Info("Created table " + qRepMetadataTableName)
return nil
}

Expand Down Expand Up @@ -206,6 +206,6 @@ func (c *ClickhouseConnector) dropStage(ctx context.Context, stagingPath string,
c.logger.Info(fmt.Sprintf("Deleted contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job))
}

c.logger.Info(fmt.Sprintf("Dropped stage %s", stagingPath))
c.logger.Info("Dropped stage " + stagingPath)
return nil
}
1 change: 0 additions & 1 deletion flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey)

_, err = s.connector.database.ExecContext(ctx, query)

if err != nil {
return 0, err
}
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package connectors
import (
"context"
"errors"
"fmt"
"log/slog"

"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -175,7 +174,7 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) {
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, inner.SnowflakeConfig)
case *protos.Peer_EventhubConfig:
return nil, fmt.Errorf("use eventhub group config instead")
return nil, errors.New("use eventhub group config instead")
case *protos.Peer_EventhubGroupConfig:
return conneventhub.NewEventHubConnector(ctx, inner.EventhubGroupConfig)
case *protos.Peer_S3Config:
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
// if the namespace isn't fully qualified, add the `.servicebus.windows.net`
// check by counting the number of '.' in the namespace
if strings.Count(namespace, ".") < 2 {
namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace)
namespace += ".servicebus.windows.net"
}

var hubConnectOK bool
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (c *PostgresConnector) replicationOptions(publicationName string) (*pglogre
}

if publicationName != "" {
pubOpt := fmt.Sprintf("publication_names %s", QuoteLiteral(publicationName))
pubOpt := "publication_names " + QuoteLiteral(publicationName)
pluginArguments = append(pluginArguments, pubOpt)
} else {
return nil, errors.New("publication name is not set")
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq
recordStream := streamRes.Stream
qrepConfig := &protos.QRepConfig{
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: fmt.Sprintf("raw_table_%s", req.FlowJobName),
DestinationTableIdentifier: "raw_table_" + req.FlowJobName,
}
partition := &protos.QRepPartition{
PartitionId: strconv.FormatInt(req.SyncBatchID, 10),
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/snowflake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string
if err != nil {
return 0, fmt.Errorf("failed to parse table name %s: %w", table, err)
}
row := c.database.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table))
//nolint:gosec
row := c.database.QueryRowContext(ctx, "SELECT COUNT(*) FROM "+table)
var count pgtype.Int8
err = row.Scan(&count)
if err != nil {
Expand Down
Loading

0 comments on commit 203c1ab

Please sign in to comment.