diff --git a/.github/workflows/golang-lint.yml b/.github/workflows/golang-lint.yml index d2b6caae8f..eadc52ed7d 100644 --- a/.github/workflows/golang-lint.yml +++ b/.github/workflows/golang-lint.yml @@ -29,6 +29,6 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: - version: v1.55 + version: v1.56 working-directory: ./flow args: --timeout=10m diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 1409536bf0..dcf1fb3ec4 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -190,9 +190,9 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesSetup.Add(1) if !existing { - logger.Info(fmt.Sprintf("created table %s", tableIdentifier)) + logger.Info("created table " + tableIdentifier) } else { - logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier)) + logger.Info("table already exists " + tableIdentifier) } } @@ -297,7 +297,7 @@ func (a *FlowableActivity) SyncFlow( } shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("transferring records for job - %s", flowName) + return "transferring records for job - " + flowName }) defer shutdown() @@ -470,7 +470,7 @@ func (a *FlowableActivity) StartNormalize( defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName) + return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName }) defer shutdown() @@ -538,7 +538,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, defer connectors.CloseConnector(ctx, srcConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName) + return "getting partitions for job - " + config.FlowJobName }) defer shutdown() @@ -629,7 +629,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } defer connectors.CloseConnector(ctx, dstConn) - logger.Info(fmt.Sprintf("replicating partition %s", partition.PartitionId)) + logger.Info("replicating partition " + partition.PartitionId) shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) }) @@ -689,7 +689,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } if rowsSynced == 0 { - logger.Info(fmt.Sprintf("no records to push for partition %s", partition.PartitionId)) + logger.Info("no records to push for partition " + partition.PartitionId) pullCancel() } else { wg.Wait() @@ -722,7 +722,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName) + return "consolidating partitions for job - " + config.FlowJobName }) defer shutdown() @@ -886,7 +886,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { } defer connectors.CloseConnector(ctx, srcConn) - slotName := fmt.Sprintf("peerflow_slot_%s", config.FlowJobName) + slotName := "peerflow_slot_" + config.FlowJobName if config.ReplicationSlotName != "" { slotName = config.ReplicationSlotName } @@ -977,7 +977,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("renaming tables for job - %s", config.FlowJobName) + return "renaming tables for job - " + config.FlowJobName }) defer shutdown() @@ -985,18 +985,18 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector) if !ok { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return nil, fmt.Errorf("failed to cast connector to snowflake connector") + return nil, errors.New("failed to cast connector to snowflake connector") } return sfConn.RenameTables(ctx, config) } else if config.Peer.Type == protos.DBType_BIGQUERY { bqConn, ok := dstConn.(*connbigquery.BigQueryConnector) if !ok { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return nil, fmt.Errorf("failed to cast connector to bigquery connector") + return nil, errors.New("failed to cast connector to bigquery connector") } return bqConn.RenameTables(ctx, config) } - return nil, fmt.Errorf("rename tables is only supported on snowflake and bigquery") + return nil, errors.New("rename tables is only supported on snowflake and bigquery") } func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) ( @@ -1012,18 +1012,18 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr if req.Peer.Type == protos.DBType_SNOWFLAKE { sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector) if !ok { - return nil, fmt.Errorf("failed to cast connector to snowflake connector") + return nil, errors.New("failed to cast connector to snowflake connector") } return sfConn.CreateTablesFromExisting(ctx, req) } else if req.Peer.Type == protos.DBType_BIGQUERY { bqConn, ok := dstConn.(*connbigquery.BigQueryConnector) if !ok { - return nil, fmt.Errorf("failed to cast connector to bigquery connector") + return nil, errors.New("failed to cast connector to bigquery connector") } return bqConn.CreateTablesFromExisting(ctx, req) } a.Alerter.LogFlowError(ctx, req.FlowJobName, err) - return nil, fmt.Errorf("create tables from existing is only supported on snowflake and bigquery") + return nil, errors.New("create tables from existing is only supported on snowflake and bigquery") } // ReplicateXminPartition replicates a XminPartition from the source to the destination. diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index cda1e496de..70dee3c6eb 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -22,9 +22,9 @@ func (h *FlowRequestHandler) MirrorStatus( slog.Info("Mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName)) cdcFlow, err := h.isCDCFlow(ctx, req.FlowJobName) if err != nil { - slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error())) + slog.Error("unable to query flow", slog.Any("error", err)) return &protos.MirrorStatusResponse{ - ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()), + ErrorMessage: "unable to query flow: " + err.Error(), }, nil } @@ -36,7 +36,7 @@ func (h *FlowRequestHandler) MirrorStatus( currState, err := h.getWorkflowStatus(ctx, workflowID) if err != nil { return &protos.MirrorStatusResponse{ - ErrorMessage: fmt.Sprintf("unable to get flow state: %s", err.Error()), + ErrorMessage: "unable to get flow state: " + err.Error(), }, nil } @@ -44,7 +44,7 @@ func (h *FlowRequestHandler) MirrorStatus( cdcStatus, err := h.CDCFlowStatus(ctx, req) if err != nil { return &protos.MirrorStatusResponse{ - ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()), + ErrorMessage: "unable to query flow: " + err.Error(), }, nil } @@ -59,7 +59,7 @@ func (h *FlowRequestHandler) MirrorStatus( qrepStatus, err := h.QRepFlowStatus(ctx, req) if err != nil { return &protos.MirrorStatusResponse{ - ErrorMessage: fmt.Sprintf("unable to query flow: %s", err.Error()), + ErrorMessage: "unable to query flow: " + err.Error(), }, nil } @@ -144,7 +144,7 @@ func (h *FlowRequestHandler) cloneTableSummary( rows, err := h.pool.Query(ctx, q, "clone_"+flowJobName+"_%") if err != nil { - slog.Error(fmt.Sprintf("unable to query initial load partition - %s: %s", flowJobName, err.Error())) + slog.Error("unable to query initial load partition - "+flowJobName, slog.Any("error", err)) return nil, fmt.Errorf("unable to query initial load partition - %s: %w", flowJobName, err) } @@ -192,7 +192,7 @@ func (h *FlowRequestHandler) cloneTableSummary( if configBytes != nil { var config protos.QRepConfig if err := proto.Unmarshal(configBytes, &config); err != nil { - slog.Error(fmt.Sprintf("unable to unmarshal config: %s", err.Error())) + slog.Error("unable to unmarshal config", slog.Any("error", err)) return nil, fmt.Errorf("unable to unmarshal config: %w", err) } res.TableName = config.DestinationTableIdentifier @@ -277,13 +277,13 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog( err = h.pool.QueryRow(ctx, "SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes) if err != nil { - slog.Error(fmt.Sprintf("unable to query flow config from catalog: %s", err.Error())) + slog.Error("unable to query flow config from catalog", slog.Any("error", err)) return nil, fmt.Errorf("unable to query flow config from catalog: %w", err) } err = proto.Unmarshal(configBytes, &config) if err != nil { - slog.Error(fmt.Sprintf("unable to unmarshal flow config: %s", err.Error())) + slog.Error("unable to unmarshal flow config", slog.Any("error", err)) return nil, fmt.Errorf("unable to unmarshal flow config: %w", err) } @@ -335,7 +335,7 @@ func (h *FlowRequestHandler) isCDCFlow(ctx context.Context, flowJobName string) var query pgtype.Text err := h.pool.QueryRow(ctx, "SELECT query_string FROM flows WHERE name = $1", flowJobName).Scan(&query) if err != nil { - slog.Error(fmt.Sprintf("unable to query flow: %s", err.Error())) + slog.Error("unable to query flow", slog.Any("error", err)) return false, fmt.Errorf("unable to query flow: %w", err) } diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 749bbcf8c4..b6294a42b0 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -788,7 +788,7 @@ func (c *BigQueryConnector) SyncFlowCleanup(ctx context.Context, jobName string) func (c *BigQueryConnector) getRawTableName(flowJobName string) string { // replace all non-alphanumeric characters with _ flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_") - return fmt.Sprintf("_peerdb_raw_%s", flowJobName) + return "_peerdb_raw_" + flowJobName } func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { @@ -892,11 +892,9 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename } } - c.logger.Info(fmt.Sprintf("DROP TABLE IF EXISTS %s", - dstDatasetTable.string())) + c.logger.Info("DROP TABLE IF EXISTS " + dstDatasetTable.string()) // drop the dst table if exists - dropQuery := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s", - dstDatasetTable.string())) + dropQuery := c.client.Query("DROP TABLE IF EXISTS " + dstDatasetTable.string()) dropQuery.DefaultProjectID = c.projectID dropQuery.DefaultDatasetID = c.datasetID _, err = dropQuery.Read(ctx) diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 4bc76986fc..744a20cdec 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -100,7 +100,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep( func (c *BigQueryConnector) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error { if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { - query := c.client.Query(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) + query := c.client.Query("TRUNCATE TABLE " + config.DestinationTableIdentifier) query.DefaultDatasetID = c.datasetID query.DefaultProjectID = c.projectID _, err := query.Read(ctx) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 0237f7aed0..2e522f7543 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -221,7 +221,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( flowLog) } - s.connector.logger.Info(fmt.Sprintf("loaded stage into %s", dstTableName), flowLog) + s.connector.logger.Info("loaded stage into "+dstTableName, flowLog) return numRecords, nil } diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 2dcd6a9824..12d1402d87 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -3,6 +3,7 @@ package connclickhouse import ( "context" "database/sql" + "errors" "fmt" "log/slog" "regexp" @@ -25,7 +26,7 @@ const ( func (c *ClickhouseConnector) getRawTableName(flowJobName string) string { // replace all non-alphanumeric characters with _ flowJobName = regexp.MustCompile("[^a-zA-Z0-9_]+").ReplaceAllString(flowJobName, "_") - return fmt.Sprintf("_peerdb_raw_%s", flowJobName) + return "_peerdb_raw_" + flowJobName } func (c *ClickhouseConnector) checkIfTableExists(ctx context.Context, databaseName string, tableIdentifier string) (bool, error) { @@ -36,7 +37,7 @@ func (c *ClickhouseConnector) checkIfTableExists(ctx context.Context, databaseNa } if !result.Valid { - return false, fmt.Errorf("[clickhouse] checkIfTableExists: result is not valid") + return false, errors.New("[clickhouse] checkIfTableExists: result is not valid") } return result.Int32 == 1, nil @@ -118,7 +119,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro( func (c *ClickhouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) { rawTableName := c.getRawTableName(req.FlowJobName) - c.logger.Info(fmt.Sprintf("pushing records to Clickhouse table %s", rawTableName)) + c.logger.Info("pushing records to Clickhouse table " + rawTableName) res, err := c.syncRecordsViaAvro(ctx, req, rawTableName, req.SyncBatchID) if err != nil { diff --git a/flow/connectors/clickhouse/qrep.go b/flow/connectors/clickhouse/qrep.go index 0abff100ef..5efcfaefb8 100644 --- a/flow/connectors/clickhouse/qrep.go +++ b/flow/connectors/clickhouse/qrep.go @@ -116,7 +116,7 @@ func (c *ClickhouseConnector) SetupQRepMetadataTables(ctx context.Context, confi } if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { - _, err = c.database.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) + _, err = c.database.ExecContext(ctx, "TRUNCATE TABLE "+config.DestinationTableIdentifier) if err != nil { return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) } @@ -140,12 +140,12 @@ func (c *ClickhouseConnector) createQRepMetadataTable(ctx context.Context) error queryString := fmt.Sprintf(schemaStatement, qRepMetadataTableName) _, err := c.database.ExecContext(ctx, queryString) if err != nil { - c.logger.Error(fmt.Sprintf("failed to create table %s", qRepMetadataTableName), + c.logger.Error("failed to create table "+qRepMetadataTableName, slog.Any("error", err)) return fmt.Errorf("failed to create table %s: %w", qRepMetadataTableName, err) } - c.logger.Info(fmt.Sprintf("Created table %s", qRepMetadataTableName)) + c.logger.Info("Created table " + qRepMetadataTableName) return nil } @@ -206,6 +206,6 @@ func (c *ClickhouseConnector) dropStage(ctx context.Context, stagingPath string, c.logger.Info(fmt.Sprintf("Deleted contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job)) } - c.logger.Info(fmt.Sprintf("Dropped stage %s", stagingPath)) + c.logger.Info("Dropped stage " + stagingPath) return nil } diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index e68fe3c118..bf119143cc 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -142,7 +142,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( s.connector.creds.AccessKeyID, s.connector.creds.SecretAccessKey) _, err = s.connector.database.ExecContext(ctx, query) - if err != nil { return 0, err } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 67112e2d8e..47621ea2d0 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -3,7 +3,6 @@ package connectors import ( "context" "errors" - "fmt" "log/slog" "github.com/jackc/pgx/v5/pgxpool" @@ -175,7 +174,7 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) { case *protos.Peer_SnowflakeConfig: return connsnowflake.NewSnowflakeConnector(ctx, inner.SnowflakeConfig) case *protos.Peer_EventhubConfig: - return nil, fmt.Errorf("use eventhub group config instead") + return nil, errors.New("use eventhub group config instead") case *protos.Peer_EventhubGroupConfig: return conneventhub.NewEventHubConnector(ctx, inner.EventhubGroupConfig) case *protos.Peer_S3Config: diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index 4ae8c5d044..7b791e52ca 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -56,7 +56,7 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE // if the namespace isn't fully qualified, add the `.servicebus.windows.net` // check by counting the number of '.' in the namespace if strings.Count(namespace, ".") < 2 { - namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace) + namespace += ".servicebus.windows.net" } var hubConnectOK bool diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 5be019f2d5..dd3f24b5a9 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -203,7 +203,7 @@ func (c *PostgresConnector) replicationOptions(publicationName string) (*pglogre } if publicationName != "" { - pubOpt := fmt.Sprintf("publication_names %s", QuoteLiteral(publicationName)) + pubOpt := "publication_names " + QuoteLiteral(publicationName) pluginArguments = append(pluginArguments, pubOpt) } else { return nil, errors.New("publication name is not set") diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index e3526d9488..f5c82e82c1 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -173,7 +173,7 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq recordStream := streamRes.Stream qrepConfig := &protos.QRepConfig{ FlowJobName: req.FlowJobName, - DestinationTableIdentifier: fmt.Sprintf("raw_table_%s", req.FlowJobName), + DestinationTableIdentifier: "raw_table_" + req.FlowJobName, } partition := &protos.QRepPartition{ PartitionId: strconv.FormatInt(req.SyncBatchID, 10), diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index 3ee20362c7..4609540157 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -73,7 +73,8 @@ func (c *SnowflakeConnector) getTableCounts(ctx context.Context, tables []string if err != nil { return 0, fmt.Errorf("failed to parse table name %s: %w", table, err) } - row := c.database.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)) + //nolint:gosec + row := c.database.QueryRowContext(ctx, "SELECT COUNT(*) FROM "+table) var count pgtype.Int8 err = row.Scan(&count) if err != nil { diff --git a/flow/connectors/snowflake/get_schema_for_tests.go b/flow/connectors/snowflake/get_schema_for_tests.go index 5a4ed9fd4d..05631e635f 100644 --- a/flow/connectors/snowflake/get_schema_for_tests.go +++ b/flow/connectors/snowflake/get_schema_for_tests.go @@ -2,7 +2,6 @@ package connsnowflake import ( "context" - "fmt" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -48,7 +47,7 @@ func (c *SnowflakeConnector) GetTableSchema( return nil, err } res[tableName] = tableSchema - utils.RecordHeartbeat(ctx, fmt.Sprintf("fetched schema for table %s", tableName)) + utils.RecordHeartbeat(ctx, "fetched schema for table "+tableName) } return &protos.GetTableSchemaBatchOutput{ diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index 1ab579069c..dd166e708f 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -94,7 +94,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) { insertValuesSQLArray := make([]string, 0, len(columns)) for _, column := range columns { normalizedColName := SnowflakeIdentifierNormalize(column.Name) - insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("SOURCE.%s", normalizedColName)) + insertValuesSQLArray = append(insertValuesSQLArray, "SOURCE."+normalizedColName) } // fill in synced_at column insertValuesSQLArray = append(insertValuesSQLArray, "CURRENT_TIMESTAMP") diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 0f3de892b2..d5ef2e32db 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -86,7 +86,7 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(ctx context.Context, config } if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { - _, err = c.database.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) + _, err = c.database.ExecContext(ctx, "TRUNCATE TABLE "+config.DestinationTableIdentifier) if err != nil { return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) } @@ -114,11 +114,11 @@ func (c *SnowflakeConnector) createStage(ctx context.Context, stageName string, // Execute the query _, err := c.database.ExecContext(ctx, createStageStmt) if err != nil { - c.logger.Error(fmt.Sprintf("failed to create stage %s", stageName), slog.Any("error", err)) + c.logger.Error("failed to create stage "+stageName, slog.Any("error", err)) return fmt.Errorf("failed to create stage %s: %w", stageName, err) } - c.logger.Info(fmt.Sprintf("Created stage %s", stageName)) + c.logger.Info("Created stage " + stageName) return nil } @@ -224,7 +224,7 @@ func (c *SnowflakeConnector) getColsFromTable(ctx context.Context, tableName str // dropStage drops the stage for the given job. func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string, job string) error { stageName := c.getStageNameForJob(job) - stmt := fmt.Sprintf("DROP STAGE IF EXISTS %s", stageName) + stmt := "DROP STAGE IF EXISTS " + stageName _, err := c.database.ExecContext(ctx, stmt) if err != nil { @@ -275,7 +275,7 @@ func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string, c.logger.Info(fmt.Sprintf("Deleted contents of bucket %s with prefix %s/%s", s3o.Bucket, s3o.Prefix, job)) } - c.logger.Info(fmt.Sprintf("Dropped stage %s", stageName)) + c.logger.Info("Dropped stage " + stageName) return nil } diff --git a/flow/connectors/snowflake/qrep_avro_consolidate.go b/flow/connectors/snowflake/qrep_avro_consolidate.go index 26ca452bd2..707afd878c 100644 --- a/flow/connectors/snowflake/qrep_avro_consolidate.go +++ b/flow/connectors/snowflake/qrep_avro_consolidate.go @@ -77,7 +77,7 @@ func getTransformSQL(colNames []string, colTypes []string, syncedAtCol string) ( normalizedColName := SnowflakeIdentifierNormalize(avroColName) columnOrder = append(columnOrder, normalizedColName) if avroColName == syncedAtCol { - transformations = append(transformations, fmt.Sprintf("CURRENT_TIMESTAMP AS %s", normalizedColName)) + transformations = append(transformations, "CURRENT_TIMESTAMP AS "+normalizedColName) continue } @@ -172,7 +172,7 @@ func (s *SnowflakeAvroConsolidateHandler) generateUpsertMergeCommand( quotedColumn := utils.QuoteIdentifier(column) updateSetClauses = append(updateSetClauses, fmt.Sprintf("%s = src.%s", quotedColumn, quotedColumn)) insertColumnsClauses = append(insertColumnsClauses, quotedColumn) - insertValuesClauses = append(insertValuesClauses, fmt.Sprintf("src.%s", quotedColumn)) + insertValuesClauses = append(insertValuesClauses, "src."+quotedColumn) } updateSetClause := strings.Join(updateSetClauses, ", ") insertColumnsClause := strings.Join(insertColumnsClauses, ", ") diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index d8b7921289..8babfad0b4 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -69,7 +69,7 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( if err != nil { return 0, err } - s.connector.logger.Info(fmt.Sprintf("Created stage %s", stage)) + s.connector.logger.Info("Created stage " + stage) err = s.putFileToStage(ctx, avroFile, stage) if err != nil { @@ -270,7 +270,7 @@ func (s *SnowflakeAvroSyncHandler) putFileToStage(ctx context.Context, avroFile putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("putting file to stage %s", stage) + return "putting file to stage " + stage }) defer shutdown() diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index e57e95791d..cc830d289a 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -421,7 +421,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas( func (c *SnowflakeConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) - c.logger.Info(fmt.Sprintf("pushing records to Snowflake table %s", rawTableIdentifier)) + c.logger.Info("pushing records to Snowflake table " + rawTableIdentifier) res, err := c.syncRecordsViaAvro(ctx, req, rawTableIdentifier, req.SyncBatchID) if err != nil { @@ -684,16 +684,14 @@ func generateCreateTableSQLForNormalizedTable( // add a _peerdb_is_deleted column to the normalized table // this is boolean default false, and is used to mark records as deleted if softDeleteColName != "" { - createTableSQLArray = append(createTableSQLArray, - fmt.Sprintf(`%s BOOLEAN DEFAULT FALSE`, softDeleteColName)) + createTableSQLArray = append(createTableSQLArray, softDeleteColName+" BOOLEAN DEFAULT FALSE") } // add a _peerdb_synced column to the normalized table // this is a timestamp column that is used to mark records as synced // default value is the current timestamp (snowflake) if syncedAtColName != "" { - createTableSQLArray = append(createTableSQLArray, - fmt.Sprintf(`%s TIMESTAMP DEFAULT CURRENT_TIMESTAMP`, syncedAtColName)) + createTableSQLArray = append(createTableSQLArray, syncedAtColName+" TIMESTAMP DEFAULT CURRENT_TIMESTAMP") } // add composite primary key to the table @@ -782,7 +780,7 @@ func (c *SnowflakeConnector) RenameTables(ctx context.Context, req *protos.Renam activity.RecordHeartbeat(ctx, fmt.Sprintf("renaming table '%s' to '%s'...", src, dst)) // drop the dst table if exists - _, err = renameTablesTx.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", dst)) + _, err = renameTablesTx.ExecContext(ctx, "DROP TABLE IF EXISTS "+dst) if err != nil { return nil, fmt.Errorf("unable to drop table %s: %w", dst, err) } diff --git a/flow/connectors/sqlserver/qrep.go b/flow/connectors/sqlserver/qrep.go index e4ad60104d..1b93a0ebbd 100644 --- a/flow/connectors/sqlserver/qrep.go +++ b/flow/connectors/sqlserver/qrep.go @@ -3,6 +3,7 @@ package connsqlserver import ( "bytes" "context" + "errors" "fmt" "text/template" @@ -30,7 +31,7 @@ func (c *SQLServerConnector) GetQRepPartitions( } if config.NumRowsPerPartition <= 0 { - return nil, fmt.Errorf("num rows per partition must be greater than 0 for sql server") + return nil, errors.New("num rows per partition must be greater than 0 for sql server") } var err error @@ -58,17 +59,23 @@ func (c *SQLServerConnector) GetQRepPartitions( "minVal": minVal, } - rows, err := c.db.NamedQuery(countQuery, params) - if err != nil { - return nil, fmt.Errorf("failed to query for total rows: %w", err) - } - - defer rows.Close() + err := func() error { + //nolint:sqlclosecheck + rows, err := c.db.NamedQuery(countQuery, params) + if err != nil { + return err + } + defer rows.Close() - if rows.Next() { - if err = rows.Scan(&totalRows); err != nil { - return nil, fmt.Errorf("failed to query for total rows: %w", err) + if rows.Next() { + if err := rows.Scan(&totalRows); err != nil { + return err + } } + return rows.Err() + }() + if err != nil { + return nil, fmt.Errorf("failed to query for total rows: %w", err) } } else { row := c.db.QueryRowContext(ctx, countQuery) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 806d8a0b7d..150cdb0a9e 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -218,7 +218,6 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(ctx context.Context, bucketName, key Key: aws.String(key), Body: r, }) - if err != nil { s3Path := "s3://" + bucketName + "/" + key logger.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path)) diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index b62522b28c..a72b5073ad 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -1,6 +1,7 @@ package utils import ( + "errors" "fmt" "net/http" "os" @@ -42,7 +43,7 @@ func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) { awsRegion = os.Getenv("AWS_REGION") } if awsRegion == "" { - return nil, fmt.Errorf("AWS_REGION must be set") + return nil, errors.New("AWS_REGION must be set") } awsEndpoint := creds.Endpoint @@ -67,7 +68,7 @@ func GetAWSSecrets(creds S3PeerCredentials) (*AWSSecrets, error) { // one of (awsKey and awsSecret) or awsRoleArn must be set if awsKey == "" && awsSecret == "" && awsRoleArn == "" { - return nil, fmt.Errorf("one of (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) or AWS_ROLE_ARN must be set") + return nil, errors.New("one of (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) or AWS_ROLE_ARN must be set") } return &AWSSecrets{ diff --git a/flow/connectors/utils/azure.go b/flow/connectors/utils/azure.go index cded09815e..df612b47d3 100644 --- a/flow/connectors/utils/azure.go +++ b/flow/connectors/utils/azure.go @@ -1,7 +1,7 @@ package utils import ( - "fmt" + "errors" "os" ) @@ -9,7 +9,7 @@ func GetAzureSubscriptionID() (string, error) { // get this from env id := os.Getenv("AZURE_SUBSCRIPTION_ID") if id == "" { - return "", fmt.Errorf("AZURE_SUBSCRIPTION_ID is not set") + return "", errors.New("AZURE_SUBSCRIPTION_ID is not set") } return id, nil } diff --git a/flow/connectors/utils/ssh.go b/flow/connectors/utils/ssh.go index c4580e870f..f9f83102ff 100644 --- a/flow/connectors/utils/ssh.go +++ b/flow/connectors/utils/ssh.go @@ -2,6 +2,7 @@ package utils import ( "encoding/base64" + "errors" "fmt" "golang.org/x/crypto/ssh" @@ -39,7 +40,7 @@ func GetSSHClientConfig(config *protos.SSHConfig) (*ssh.ClientConfig, error) { } if len(authMethods) == 0 { - return nil, fmt.Errorf("no authentication methods provided") + return nil, errors.New("no authentication methods provided") } var hostKeyCallback ssh.HostKeyCallback diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 6c94124c73..76c93bd5e4 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -3,6 +3,7 @@ package e2e_bigquery import ( "context" "encoding/json" + "errors" "fmt" "math/big" "os" @@ -42,7 +43,7 @@ func NewBigQueryTestHelper() (*BigQueryTestHelper, error) { jsonPath := os.Getenv("TEST_BQ_CREDS") if jsonPath == "" { - return nil, fmt.Errorf("TEST_BQ_CREDS env var not set") + return nil, errors.New("TEST_BQ_CREDS env var not set") } content, err := e2eshared.ReadFileToBytes(jsonPath) @@ -175,8 +176,8 @@ func (b *BigQueryTestHelper) countRows(tableName string) (int, error) { func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, nonNullCol string) (int, error) { command := fmt.Sprintf("SELECT COUNT(*) FROM `%s.%s`", dataset, tableName) if nonNullCol != "" { - command = fmt.Sprintf("SELECT COUNT(CASE WHEN " + nonNullCol + - " IS NOT NULL THEN 1 END) AS non_null_count FROM `" + dataset + "." + tableName + "`;") + command = "SELECT COUNT(CASE WHEN " + nonNullCol + + " IS NOT NULL THEN 1 END) AS non_null_count FROM `" + dataset + "." + tableName + "`;" } q := b.client.Query(command) q.DisableQueryCache = true @@ -198,7 +199,7 @@ func (b *BigQueryTestHelper) countRowsWithDataset(dataset, tableName string, non cntI64, ok := row[0].(int64) if !ok { - return 0, fmt.Errorf("failed to convert row count to int64") + return 0, errors.New("failed to convert row count to int64") } return int(cntI64), nil @@ -406,7 +407,7 @@ func (b *BigQueryTestHelper) CheckNull(tableName string, colName []string) (bool cntI64, ok := row[0].(int64) if !ok { - return false, fmt.Errorf("failed to convert row count to int64") + return false, errors.New("failed to convert row count to int64") } if cntI64 > 0 { return false, nil diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 8ba591400d..a1be8d6a5a 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -124,7 +124,7 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel if entry.Kind == qvalue.QValueKindBoolean { isDeleteVal, ok := entry.Value.(bool) if !(ok && isDeleteVal) { - return fmt.Errorf("peerdb column failed: _PEERDB_IS_DELETED is not true") + return errors.New("peerdb column failed: _PEERDB_IS_DELETED is not true") } recordCount += 1 } @@ -132,7 +132,7 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel if entry.Kind == qvalue.QValueKindTimestamp { _, ok := entry.Value.(time.Time) if !ok { - return fmt.Errorf("peerdb column failed: _PEERDB_SYNCED_AT is not valid") + return errors.New("peerdb column failed: _PEERDB_SYNCED_AT is not valid") } recordCount += 1 @@ -141,7 +141,7 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel } if recordCount == 0 { - return fmt.Errorf("peerdb column check failed: no records found") + return errors.New("peerdb column check failed: no records found") } return nil @@ -1153,7 +1153,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { srcTable1Name := s.attachSchemaSuffix("test1_bq") dstTable1Name := "test1_bq" - secondDataset := fmt.Sprintf("%s_2", s.bqHelper.Config.DatasetId) + secondDataset := s.bqHelper.Config.DatasetId + "_2" srcTable2Name := s.attachSchemaSuffix("test2_bq") dstTable2Name := "test2_bq" @@ -1294,7 +1294,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") - srcTableName := fmt.Sprintf("%s_src", cmpTableName) + srcTableName := cmpTableName + "_src" dstTableName := "test_softdel_iud" _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index a1b1a0601c..a00d687172 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -2,6 +2,7 @@ package e2e import ( "context" + "errors" "fmt" "testing" "time" @@ -42,7 +43,7 @@ func cleanPostgres(conn *pgx.Conn, suffix string) error { _, err = conn.Exec( context.Background(), "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE $1", - fmt.Sprintf("%%_%s", suffix), + "%_"+suffix, ) if err != nil { return fmt.Errorf("failed to drop replication slots: %w", err) @@ -51,7 +52,7 @@ func cleanPostgres(conn *pgx.Conn, suffix string) error { // list all publications from pg_publication table rows, err := conn.Query(context.Background(), "SELECT pubname FROM pg_publication WHERE pubname LIKE $1", - fmt.Sprintf("%%_%s", suffix), + "%_"+suffix, ) if err != nil { return fmt.Errorf("failed to list publications: %w", err) @@ -62,7 +63,7 @@ func cleanPostgres(conn *pgx.Conn, suffix string) error { } for _, pubName := range publications { - _, err = conn.Exec(context.Background(), fmt.Sprintf("DROP PUBLICATION %s", pubName)) + _, err = conn.Exec(context.Background(), "DROP PUBLICATION "+pubName) if err != nil { return fmt.Errorf("failed to drop publication %s: %w", pubName, err) } @@ -76,7 +77,7 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error { setupTx, err := conn.Begin(context.Background()) if err != nil { - return fmt.Errorf("failed to start setup transaction") + return errors.New("failed to start setup transaction") } // create an e2e_test schema @@ -92,7 +93,7 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error { }() // create an e2e_test schema - _, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA e2e_test_%s", suffix)) + _, err = setupTx.Exec(context.Background(), "CREATE SCHEMA e2e_test_"+suffix) if err != nil { return fmt.Errorf("failed to create e2e_test schema: %w", err) } diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 67f36f92ff..d99479a781 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -43,11 +43,11 @@ func (s PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, ro } if !isDeleted.Bool { - return fmt.Errorf("isDeleted is not true") + return errors.New("isDeleted is not true") } if !syncedAt.Valid { - return fmt.Errorf("syncedAt is not valid") + return errors.New("syncedAt is not valid") } return nil @@ -722,7 +722,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) cmpTableName := s.attachSchemaSuffix("test_softdel") - srcTableName := fmt.Sprintf("%s_src", cmpTableName) + srcTableName := cmpTableName + "_src" dstTableName := s.attachSchemaSuffix("test_softdel_dst") _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -808,7 +808,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") - srcTableName := fmt.Sprintf("%s_src", cmpTableName) + srcTableName := cmpTableName + "_src" dstTableName := s.attachSchemaSuffix("test_softdel_iud_dst") _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` @@ -887,7 +887,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_ud") - srcTableName := fmt.Sprintf("%s_src", cmpTableName) + srcTableName := cmpTableName + "_src" dstTableName := s.attachSchemaSuffix("test_softdel_ud_dst") _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 65a043df78..c13e02b6e0 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -2,6 +2,7 @@ package e2e_postgres import ( "context" + "errors" "fmt" "strings" "testing" @@ -104,7 +105,7 @@ func (s PeerFlowE2ETestSuitePG) checkEnums(srcSchemaQualified, dstSchemaQualifie } if exists.Bool { - return fmt.Errorf("enum comparison failed: rows are not equal") + return errors.New("enum comparison failed: rows are not equal") } return nil } @@ -144,7 +145,7 @@ func (s PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQualif } func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { - query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) + query := `SELECT "_PEERDB_SYNCED_AT" FROM ` + dstSchemaQualified rows, _ := s.Conn().Query(context.Background(), query) @@ -157,7 +158,7 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { } if !syncedAt.Valid { - return fmt.Errorf("synced_at is not valid") + return errors.New("synced_at is not valid") } } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 48fc321ae4..d9b2697980 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1243,7 +1243,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { env := e2e.NewTemporalTestWorkflowEnvironment(s.t) cmpTableName := s.attachSchemaSuffix("test_softdel_iud") - srcTableName := fmt.Sprintf("%s_src", cmpTableName) + srcTableName := cmpTableName + "_src" dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_softdel_iud") _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index e73b1aee5d..c26209c379 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -3,6 +3,7 @@ package e2e_snowflake import ( "context" "encoding/json" + "errors" "fmt" "math/big" "os" @@ -34,7 +35,7 @@ type SnowflakeTestHelper struct { func NewSnowflakeTestHelper() (*SnowflakeTestHelper, error) { jsonPath := os.Getenv("TEST_SF_CREDS") if jsonPath == "" { - return nil, fmt.Errorf("TEST_SF_CREDS env var not set") + return nil, errors.New("TEST_SF_CREDS env var not set") } content, err := e2eshared.ReadFileToBytes(jsonPath) @@ -102,7 +103,7 @@ func (s *SnowflakeTestHelper) Cleanup() error { if err != nil { return err } - err = s.adminClient.ExecuteQuery(context.Background(), fmt.Sprintf("DROP DATABASE %s", s.testDatabaseName)) + err = s.adminClient.ExecuteQuery(context.Background(), "DROP DATABASE "+s.testDatabaseName) if err != nil { return err } @@ -190,11 +191,11 @@ func (s *SnowflakeTestHelper) checkSyncedAt(query string) error { for _, record := range recordBatch.Records { for _, entry := range record { if entry.Kind != qvalue.QValueKindTimestamp { - return fmt.Errorf("synced_at column check failed: _PEERDB_SYNCED_AT is not timestamp") + return errors.New("synced_at column check failed: _PEERDB_SYNCED_AT is not timestamp") } _, ok := entry.Value.(time.Time) if !ok { - return fmt.Errorf("synced_at column failed: _PEERDB_SYNCED_AT is not valid") + return errors.New("synced_at column failed: _PEERDB_SYNCED_AT is not valid") } } } diff --git a/flow/e2e/snowflake/snowflake_schema_delta_test.go b/flow/e2e/snowflake/snowflake_schema_delta_test.go index db3459404d..68c70e56aa 100644 --- a/flow/e2e/snowflake/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake/snowflake_schema_delta_test.go @@ -46,7 +46,7 @@ func setupSchemaDeltaSuite(t *testing.T) SnowflakeSchemaDeltaTestSuite { } func (s SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { - tableName := fmt.Sprintf("%s.SIMPLE_ADD_COLUMN", schemaDeltaTestSchemaName) + tableName := schemaDeltaTestSchemaName + ".SIMPLE_ADD_COLUMN" err := s.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) require.NoError(s.t, err) @@ -82,7 +82,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { } func (s SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { - tableName := fmt.Sprintf("%s.ADD_DROP_ALL_COLUMN_TYPES", schemaDeltaTestSchemaName) + tableName := schemaDeltaTestSchemaName + ".ADD_DROP_ALL_COLUMN_TYPES" err := s.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(ID TEXT PRIMARY KEY)", tableName)) require.NoError(s.t, err) @@ -172,7 +172,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { } func (s SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { - tableName := fmt.Sprintf("%s.ADD_DROP_TRICKY_COLUMN_NAMES", schemaDeltaTestSchemaName) + tableName := schemaDeltaTestSchemaName + ".ADD_DROP_TRICKY_COLUMN_NAMES" err := s.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(id TEXT PRIMARY KEY)", tableName)) require.NoError(s.t, err) @@ -251,7 +251,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { } func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { - tableName := fmt.Sprintf("%s.ADD_DROP_WHITESPACE_COLUMN_NAMES", schemaDeltaTestSchemaName) + tableName := schemaDeltaTestSchemaName + ".ADD_DROP_WHITESPACE_COLUMN_NAMES" err := s.sfTestHelper.RunCommand(fmt.Sprintf("CREATE TABLE %s(\" \" TEXT PRIMARY KEY)", tableName)) require.NoError(s.t, err) diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 87d4285085..74065b98ea 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -187,7 +187,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( // Verify that the destination table has the same number of rows as the source table var numRowsInDest pgtype.Int8 - countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s", dstTableName) + countQuery := "SELECT COUNT(*) FROM " + dstTableName err = s.Conn().QueryRow(context.Background(), countQuery).Scan(&numRowsInDest) require.NoError(s.t, err) diff --git a/flow/e2e/sqlserver/sqlserver_helper.go b/flow/e2e/sqlserver/sqlserver_helper.go index 15ade1796f..fe8438dcdb 100644 --- a/flow/e2e/sqlserver/sqlserver_helper.go +++ b/flow/e2e/sqlserver/sqlserver_helper.go @@ -94,7 +94,7 @@ func (h *SQLServerHelper) CleanUp() error { } if h.SchemaName != "" { - return h.E.ExecuteQuery(context.Background(), fmt.Sprintf("DROP SCHEMA %s", h.SchemaName)) + return h.E.ExecuteQuery(context.Background(), "DROP SCHEMA "+h.SchemaName) } return nil diff --git a/flow/geo/geo.go b/flow/geo/geo.go index 9640173973..6602a26d53 100644 --- a/flow/geo/geo.go +++ b/flow/geo/geo.go @@ -14,7 +14,7 @@ func GeoValidate(hexWkb string) (string, error) { // Decode the WKB hex string into binary wkb, hexErr := hex.DecodeString(hexWkb) if hexErr != nil { - slog.Warn(fmt.Sprintf("Ignoring invalid WKB: %s", hexWkb)) + slog.Warn("Ignoring invalid WKB: " + hexWkb) return "", hexErr } diff --git a/flow/go.mod b/flow/go.mod index 24ce7d9f7a..a6ad81e4f7 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -1,6 +1,6 @@ module github.com/PeerDB-io/peer-flow -go 1.21 +go 1.22 require ( cloud.google.com/go v0.112.0 diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 4455101ad8..77700b7a89 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -1,6 +1,7 @@ package model import ( + "errors" "fmt" "log/slog" "math/big" @@ -100,7 +101,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindFloat32: v, ok := qValue.Value.(float32) if !ok { - src.err = fmt.Errorf("invalid float32 value") + src.err = errors.New("invalid float32 value") return nil, src.err } values[i] = v @@ -108,7 +109,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindFloat64: v, ok := qValue.Value.(float64) if !ok { - src.err = fmt.Errorf("invalid float64 value") + src.err = errors.New("invalid float64 value") return nil, src.err } values[i] = v @@ -116,7 +117,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindInt16, qvalue.QValueKindInt32: v, ok := qValue.Value.(int32) if !ok { - src.err = fmt.Errorf("invalid int32 value") + src.err = errors.New("invalid int32 value") return nil, src.err } values[i] = v @@ -124,7 +125,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindInt64: v, ok := qValue.Value.(int64) if !ok { - src.err = fmt.Errorf("invalid int64 value") + src.err = errors.New("invalid int64 value") return nil, src.err } values[i] = v @@ -132,7 +133,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindBoolean: v, ok := qValue.Value.(bool) if !ok { - src.err = fmt.Errorf("invalid boolean value") + src.err = errors.New("invalid boolean value") return nil, src.err } values[i] = v @@ -140,7 +141,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindQChar: v, ok := qValue.Value.(uint8) if !ok { - src.err = fmt.Errorf("invalid \"char\" value") + src.err = errors.New("invalid \"char\" value") return nil, src.err } values[i] = rune(v) @@ -148,7 +149,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindString: v, ok := qValue.Value.(string) if !ok { - src.err = fmt.Errorf("invalid string value") + src.err = errors.New("invalid string value") return nil, src.err } values[i] = v @@ -156,7 +157,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindTime: t, ok := qValue.Value.(time.Time) if !ok { - src.err = fmt.Errorf("invalid Time value") + src.err = errors.New("invalid Time value") return nil, src.err } time := pgtype.Time{Microseconds: t.UnixMicro(), Valid: true} @@ -165,7 +166,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindTimestamp: t, ok := qValue.Value.(time.Time) if !ok { - src.err = fmt.Errorf("invalid ExtendedTime value") + src.err = errors.New("invalid ExtendedTime value") return nil, src.err } timestamp := pgtype.Timestamp{Time: t, Valid: true} @@ -174,7 +175,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindTimestampTZ: t, ok := qValue.Value.(time.Time) if !ok { - src.err = fmt.Errorf("invalid ExtendedTime value") + src.err = errors.New("invalid ExtendedTime value") return nil, src.err } timestampTZ := pgtype.Timestamptz{Time: t, Valid: true} @@ -211,7 +212,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindBytes, qvalue.QValueKindBit: v, ok := qValue.Value.([]byte) if !ok { - src.err = fmt.Errorf("invalid Bytes value") + src.err = errors.New("invalid Bytes value") return nil, src.err } values[i] = v @@ -219,7 +220,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindDate: t, ok := qValue.Value.(time.Time) if !ok { - src.err = fmt.Errorf("invalid Date value") + src.err = errors.New("invalid Date value") return nil, src.err } date := pgtype.Date{Time: t, Valid: true} @@ -228,7 +229,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindHStore: v, ok := qValue.Value.(string) if !ok { - src.err = fmt.Errorf("invalid HStore value") + src.err = errors.New("invalid HStore value") return nil, src.err } @@ -236,13 +237,13 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint: v, ok := qValue.Value.(string) if !ok { - src.err = fmt.Errorf("invalid Geospatial value") + src.err = errors.New("invalid Geospatial value") return nil, src.err } wkb, err := geo.GeoToWKB(v) if err != nil { - src.err = fmt.Errorf("failed to convert Geospatial value to wkb") + src.err = errors.New("failed to convert Geospatial value to wkb") return nil, src.err } @@ -312,7 +313,7 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { case qvalue.QValueKindJSON: v, ok := qValue.Value.(string) if !ok { - src.err = fmt.Errorf("invalid JSON value") + src.err = errors.New("invalid JSON value") return nil, src.err } values[i] = v diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 83ac85032d..e3fa4ee74e 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -1,7 +1,7 @@ package model import ( - "fmt" + "errors" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -70,7 +70,7 @@ func (s *QRecordStream) Schema() (*QRecordSchema, error) { func (s *QRecordStream) SetSchema(schema *QRecordSchema) error { if s.schemaSet { - return fmt.Errorf("Schema already set") + return errors.New("Schema already set") } s.schema <- QRecordSchemaOrError{ diff --git a/flow/shared/crypto.go b/flow/shared/crypto.go index 7aeb58c2f8..83c67a5760 100644 --- a/flow/shared/crypto.go +++ b/flow/shared/crypto.go @@ -3,6 +3,7 @@ package shared import ( "crypto/rsa" "encoding/pem" + "errors" "fmt" "github.com/youmark/pkcs8" @@ -11,7 +12,7 @@ import ( func DecodePKCS8PrivateKey(rawKey []byte, password *string) (*rsa.PrivateKey, error) { PEMBlock, _ := pem.Decode(rawKey) if PEMBlock == nil { - return nil, fmt.Errorf("failed to decode private key PEM block") + return nil, errors.New("failed to decode private key PEM block") } var privateKey *rsa.PrivateKey diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 7f33c33e52..398a09d47c 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -213,7 +213,7 @@ func CDCFlowWorkflow( state *CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error) { if cfg == nil { - return nil, fmt.Errorf("invalid connection configs") + return nil, errors.New("invalid connection configs") } if state == nil { diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 5263d7b97d..652705c142 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -2,7 +2,6 @@ package peerflow import ( "errors" - "fmt" "log/slog" "time" @@ -23,7 +22,7 @@ func DropFlowWorkflow(ctx workflow.Context, req *protos.ShutdownRequest) error { var sourceError, destinationError error var sourceOk, destinationOk, canceled bool - selector := workflow.NewNamedSelector(ctx, fmt.Sprintf("%s-drop", req.FlowJobName)) + selector := workflow.NewNamedSelector(ctx, req.FlowJobName+"-drop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { canceled = true }) diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 64aa42277d..cece0106cb 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -308,7 +308,7 @@ func (q *QRepFlowExecution) waitForNewRows(ctx workflow.Context, lastPartition * func (q *QRepFlowExecution) handleTableCreationForResync(ctx workflow.Context, state *protos.QRepFlowState) error { if state.NeedsResync && q.config.DstTableFullResync { - renamedTableIdentifier := fmt.Sprintf("%s_peerdb_resync", q.config.DestinationTableIdentifier) + renamedTableIdentifier := q.config.DestinationTableIdentifier + "_peerdb_resync" createTablesFromExistingCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 10 * time.Minute, HeartbeatTimeout: time.Minute, diff --git a/flow/workflows/scheduled_flows.go b/flow/workflows/scheduled_flows.go index 4cf66d3ff4..61db3d0262 100644 --- a/flow/workflows/scheduled_flows.go +++ b/flow/workflows/scheduled_flows.go @@ -1,7 +1,6 @@ package peerflow import ( - "fmt" "time" "go.temporal.io/api/enums/v1" @@ -53,7 +52,7 @@ func GlobalScheduleManagerWorkflow(ctx workflow.Context) error { }) if walHeartbeatEnabled { heartbeatCtx := withCronOptions(ctx, - fmt.Sprintf("wal-heartbeat-%s", info.OriginalRunID), + "wal-heartbeat-"+info.OriginalRunID, "*/12 * * * *") workflow.ExecuteChildWorkflow( heartbeatCtx, @@ -62,7 +61,7 @@ func GlobalScheduleManagerWorkflow(ctx workflow.Context) error { } slotSizeCtx := withCronOptions(ctx, - fmt.Sprintf("record-slot-size-%s", info.OriginalRunID), + "record-slot-size-"+info.OriginalRunID, "*/5 * * * *") workflow.ExecuteChildWorkflow(slotSizeCtx, RecordSlotSizeWorkflow)