Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: perfsprint cleanup #1303

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connpostgres
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"log/slog"
"time"
Expand Down Expand Up @@ -169,7 +170,7 @@ func (p *PostgresCDCSource) replicationOptions() (*pglogrepl.StartReplicationOpt
pubOpt := fmt.Sprintf("publication_names '%s'", p.publication)
pluginArguments = append(pluginArguments, pubOpt)
} else {
return nil, fmt.Errorf("publication name is not set")
return nil, errors.New("publication name is not set")
}

return &pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments}, nil
Expand Down Expand Up @@ -259,7 +260,7 @@ func (p *PostgresCDCSource) consumeStream(

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len())
p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage))
p.logger.Info("Sent Standby status message. " + numRowsProcessedMessage)
standByLastLogged = time.Now()
}
}
Expand Down
10 changes: 4 additions & 6 deletions flow/connectors/postgres/normalize_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (n *normalizeStmtGenerator) generateMergeStatement() string {
flattenedCastsSQL := strings.Join(flattenedCastsSQLArray, ",")
insertValuesSQLArray := make([]string, 0, columnCount+2)
for _, quotedCol := range quotedColumnNames {
insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("src.%s", quotedCol))
insertValuesSQLArray = append(insertValuesSQLArray, "src."+quotedCol)
}

updateStatementsforToastCols := n.generateUpdateStatements(quotedColumnNames)
Expand Down Expand Up @@ -208,13 +208,11 @@ func (n *normalizeStmtGenerator) generateUpdateStatements(quotedCols []string) [
}
// set the synced at column to the current timestamp
if n.peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf(`%s=CURRENT_TIMESTAMP`,
QuoteIdentifier(n.peerdbCols.SyncedAtColName)))
tmpArray = append(tmpArray, QuoteIdentifier(n.peerdbCols.SyncedAtColName)+`=CURRENT_TIMESTAMP`)
}
// set soft-deleted to false, tackles insert after soft-delete
if handleSoftDelete {
tmpArray = append(tmpArray, fmt.Sprintf(`%s=FALSE`,
QuoteIdentifier(n.peerdbCols.SoftDeleteColName)))
tmpArray = append(tmpArray, QuoteIdentifier(n.peerdbCols.SoftDeleteColName)+`=FALSE`)
}

quotedCols := QuoteLiteral(cols)
Expand All @@ -228,7 +226,7 @@ func (n *normalizeStmtGenerator) generateUpdateStatements(quotedCols []string) [
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if handleSoftDelete {
tmpArray[len(tmpArray)-1] = fmt.Sprintf(`%s=TRUE`, QuoteIdentifier(n.peerdbCols.SoftDeleteColName))
tmpArray[len(tmpArray)-1] = QuoteIdentifier(n.peerdbCols.SoftDeleteColName) + `=TRUE`
ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
src._peerdb_record_type=2 AND _peerdb_unchanged_toast_columns=%s
Expand Down
19 changes: 10 additions & 9 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connpostgres

import (
"context"
"errors"
"fmt"
"log/slog"
"regexp"
Expand Down Expand Up @@ -114,7 +115,7 @@ func (c *PostgresConnector) Conn() *pgx.Conn {
// ConnectionActive returns nil if the connection is active.
func (c *PostgresConnector) ConnectionActive(ctx context.Context) error {
if c.conn == nil {
return fmt.Errorf("connection is nil")
return errors.New("connection is nil")
}
pingErr := c.conn.Ping(ctx)
return pingErr
Expand Down Expand Up @@ -184,7 +185,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
}()

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
slotName := "peerflow_slot_" + req.FlowJobName
if req.OverrideReplicationSlotName != "" {
slotName = req.OverrideReplicationSlotName
}
Expand Down Expand Up @@ -567,8 +568,8 @@ func (c *PostgresConnector) GetTableSchema(
return nil, err
}
res[tableName] = tableSchema
utils.RecordHeartbeat(ctx, fmt.Sprintf("fetched schema for table %s", tableName))
c.logger.Info(fmt.Sprintf("fetched schema for table %s", tableName))
utils.RecordHeartbeat(ctx, "fetched schema for table "+tableName)
c.logger.Info("fetched schema for table " + tableName)
}

return &protos.GetTableSchemaBatchOutput{
Expand Down Expand Up @@ -768,7 +769,7 @@ func (c *PostgresConnector) EnsurePullability(
}

if !req.CheckConstraints {
msg := fmt.Sprintf("[no-constraints] ensured pullability table %s", tableName)
msg := "[no-constraints] ensured pullability table " + tableName
utils.RecordHeartbeat(ctx, msg)
continue
}
Expand All @@ -789,7 +790,7 @@ func (c *PostgresConnector) EnsurePullability(
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
}

utils.RecordHeartbeat(ctx, fmt.Sprintf("ensured pullability table %s", tableName))
utils.RecordHeartbeat(ctx, "ensured pullability table "+tableName)
}

return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil
Expand All @@ -804,7 +805,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig
}

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
slotName := "peerflow_slot_" + req.FlowJobName
if req.ExistingReplicationSlotName != "" {
slotName = req.ExistingReplicationSlotName
}
Expand Down Expand Up @@ -839,7 +840,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig

func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error {
// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", jobName)
slotName := "peerflow_slot_%s" + jobName

publicationName := c.getDefaultPublicationName(jobName)

Expand All @@ -854,7 +855,7 @@ func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string)
}
}()

_, err = pullFlowCleanupTx.Exec(ctx, fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", publicationName))
_, err = pullFlowCleanupTx.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName)
if err != nil {
return fmt.Errorf("error dropping publication: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
_, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
schema))
require.NoError(t, err)
_, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schema))
_, err = setupTx.Exec(context.Background(), "CREATE SCHEMA "+schema)
require.NoError(t, err)
err = setupTx.Commit(context.Background())
require.NoError(t, err)
Expand All @@ -58,7 +58,7 @@ func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
}

func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
tableName := fmt.Sprintf("%s.simple_add_column", s.schema)
tableName := s.schema + ".simple_add_column"
_, err := s.connector.conn.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -96,7 +96,7 @@ func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
}

func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
tableName := fmt.Sprintf("%s.add_drop_all_column_types", s.schema)
tableName := s.schema + ".add_drop_all_column_types"
_, err := s.connector.conn.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
}

func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", s.schema)
tableName := s.schema + ".add_drop_tricky_column_names"
_, err := s.connector.conn.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
}

func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", s.schema)
tableName := s.schema + ".add_drop_whitespace_column_names"
_, err := s.connector.conn.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *PostgresConnector) GetQRepPartitions(
func (c *PostgresConnector) setTransactionSnapshot(ctx context.Context, tx pgx.Tx) error {
snapshot := c.config.TransactionSnapshot
if snapshot != "" {
if _, err := tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT %s", QuoteLiteral(snapshot))); err != nil {
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(snapshot)); err != nil {
return fmt.Errorf("failed to set transaction snapshot: %w", err)
}
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
quotedWatermarkColumn,
parsedWatermarkTable.String(),
)
c.logger.Info(fmt.Sprintf("[row_based_next] partitions query: %s", partitionsQuery))
c.logger.Info("[row_based_next] partitions query: " + partitionsQuery)
rows, err = tx.Query(ctx, partitionsQuery, minVal)
} else {
partitionsQuery := fmt.Sprintf(
Expand All @@ -166,7 +166,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
quotedWatermarkColumn,
parsedWatermarkTable.String(),
)
c.logger.Info(fmt.Sprintf("[row_based] partitions query: %s", partitionsQuery))
c.logger.Info("[row_based] partitions query: " + partitionsQuery)
rows, err = tx.Query(ctx, partitionsQuery)
}
if err != nil {
Expand Down Expand Up @@ -500,7 +500,7 @@ func (c *PostgresConnector) SetupQRepMetadataTables(ctx context.Context, config
if config.WriteMode != nil &&
config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.conn.Exec(ctx,
fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
"TRUNCATE TABLE "+config.DestinationTableIdentifier)
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
Expand Down Expand Up @@ -569,7 +569,7 @@ func BuildQuery(logger log.Logger, query string, flowJobName string) (string, er
}
res := buf.String()

logger.Info(fmt.Sprintf("templated query: %s", res))
logger.Info("templated query: " + res)
return res, nil
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type testCase struct {
}

func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum int) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
schemaQualifiedTable := schema + ".test"
query := fmt.Sprintf(
`SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`,
schemaQualifiedTable)
Expand All @@ -44,7 +44,7 @@ func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum
}

func newTestCaseForCTID(schema string, name string, rows uint32, expectedNum int) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
schemaQualifiedTable := schema + ".test"
query := fmt.Sprintf(
`SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`,
schemaQualifiedTable)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
}()

if qe.snapshot != "" {
_, err = tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT %s", QuoteLiteral(qe.snapshot)))
_, err = tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to set snapshot: %w", err),
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/postgres/qrep_sql_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
}
} else {
// Step 2.1: Create a temp staging table
stagingTableName := fmt.Sprintf("_peerdb_staging_%s", shared.RandomString(8))
stagingTableName := "_peerdb_staging_" + shared.RandomString(8)
stagingTableIdentifier := pgx.Identifier{s.connector.metadataSchema, stagingTableName}
dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}

Expand All @@ -122,7 +122,6 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
s.connector.logger.Info(fmt.Sprintf("Creating staging table %s - '%s'",
stagingTableName, createStagingTableStmt), syncLog)
_, err = tx.Exec(context.Background(), createStagingTableStmt)

if err != nil {
return -1, fmt.Errorf("failed to create staging table: %v", err)
}
Expand Down Expand Up @@ -156,7 +155,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
selectStrArray = append(selectStrArray, quotedCol)
}
setClauseArray = append(setClauseArray,
fmt.Sprintf(`%s = CURRENT_TIMESTAMP`, QuoteIdentifier(syncedAtCol)))
QuoteIdentifier(syncedAtCol)+`= CURRENT_TIMESTAMP`)
setClause := strings.Join(setClauseArray, ",")
selectSQL := strings.Join(selectStrArray, ",")

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/ssh_wrapped_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (tunnel *SSHTunnel) NewPostgresConnFromConfig(
}
return nil
}, 5, 5*time.Second)

if err != nil {
logger.Error("Failed to create pool", slog.Any("error", err), slog.String("host", host))
conn.Close(ctx)
Expand Down
Loading