Skip to content

Commit

Permalink
perfsprint cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 15, 2024
1 parent 99de22d commit 18c552b
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 34 deletions.
5 changes: 3 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connpostgres
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"log/slog"
"time"
Expand Down Expand Up @@ -169,7 +170,7 @@ func (p *PostgresCDCSource) replicationOptions() (*pglogrepl.StartReplicationOpt
pubOpt := fmt.Sprintf("publication_names '%s'", p.publication)
pluginArguments = append(pluginArguments, pubOpt)
} else {
return nil, fmt.Errorf("publication name is not set")
return nil, errors.New("publication name is not set")
}

return &pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments}, nil
Expand Down Expand Up @@ -259,7 +260,7 @@ func (p *PostgresCDCSource) consumeStream(

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len())
p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage))
p.logger.Info("Sent Standby status message. " + numRowsProcessedMessage)
standByLastLogged = time.Now()
}
}
Expand Down
10 changes: 4 additions & 6 deletions flow/connectors/postgres/normalize_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (n *normalizeStmtGenerator) generateMergeStatement() string {
flattenedCastsSQL := strings.Join(flattenedCastsSQLArray, ",")
insertValuesSQLArray := make([]string, 0, columnCount+2)
for _, quotedCol := range quotedColumnNames {
insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("src.%s", quotedCol))
insertValuesSQLArray = append(insertValuesSQLArray, "src."+quotedCol)
}

updateStatementsforToastCols := n.generateUpdateStatements(quotedColumnNames)
Expand Down Expand Up @@ -208,13 +208,11 @@ func (n *normalizeStmtGenerator) generateUpdateStatements(quotedCols []string) [
}
// set the synced at column to the current timestamp
if n.peerdbCols.SyncedAtColName != "" {
tmpArray = append(tmpArray, fmt.Sprintf(`%s=CURRENT_TIMESTAMP`,
QuoteIdentifier(n.peerdbCols.SyncedAtColName)))
tmpArray = append(tmpArray, QuoteIdentifier(n.peerdbCols.SyncedAtColName)+`=CURRENT_TIMESTAMP`)
}
// set soft-deleted to false, tackles insert after soft-delete
if handleSoftDelete {
tmpArray = append(tmpArray, fmt.Sprintf(`%s=FALSE`,
QuoteIdentifier(n.peerdbCols.SoftDeleteColName)))
tmpArray = append(tmpArray, QuoteIdentifier(n.peerdbCols.SoftDeleteColName)+`=FALSE`)
}

quotedCols := QuoteLiteral(cols)
Expand All @@ -228,7 +226,7 @@ func (n *normalizeStmtGenerator) generateUpdateStatements(quotedCols []string) [
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if handleSoftDelete {
tmpArray[len(tmpArray)-1] = fmt.Sprintf(`%s=TRUE`, QuoteIdentifier(n.peerdbCols.SoftDeleteColName))
tmpArray[len(tmpArray)-1] = QuoteIdentifier(n.peerdbCols.SoftDeleteColName) + `=TRUE`
ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
src._peerdb_record_type=2 AND _peerdb_unchanged_toast_columns=%s
Expand Down
19 changes: 10 additions & 9 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connpostgres

import (
"context"
"errors"
"fmt"
"log/slog"
"regexp"
Expand Down Expand Up @@ -114,7 +115,7 @@ func (c *PostgresConnector) Conn() *pgx.Conn {
// ConnectionActive returns nil if the connection is active.
func (c *PostgresConnector) ConnectionActive(ctx context.Context) error {
if c.conn == nil {
return fmt.Errorf("connection is nil")
return errors.New("connection is nil")
}
pingErr := c.conn.Ping(ctx)
return pingErr
Expand Down Expand Up @@ -184,7 +185,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
}()

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
slotName := "peerflow_slot_" + req.FlowJobName
if req.OverrideReplicationSlotName != "" {
slotName = req.OverrideReplicationSlotName
}
Expand Down Expand Up @@ -567,8 +568,8 @@ func (c *PostgresConnector) GetTableSchema(
return nil, err
}
res[tableName] = tableSchema
utils.RecordHeartbeat(ctx, fmt.Sprintf("fetched schema for table %s", tableName))
c.logger.Info(fmt.Sprintf("fetched schema for table %s", tableName))
utils.RecordHeartbeat(ctx, "fetched schema for table "+tableName)
c.logger.Info("fetched schema for table " + tableName)
}

return &protos.GetTableSchemaBatchOutput{
Expand Down Expand Up @@ -768,7 +769,7 @@ func (c *PostgresConnector) EnsurePullability(
}

if !req.CheckConstraints {
msg := fmt.Sprintf("[no-constraints] ensured pullability table %s", tableName)
msg := "[no-constraints] ensured pullability table " + tableName
utils.RecordHeartbeat(ctx, msg)
continue
}
Expand All @@ -789,7 +790,7 @@ func (c *PostgresConnector) EnsurePullability(
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
}

utils.RecordHeartbeat(ctx, fmt.Sprintf("ensured pullability table %s", tableName))
utils.RecordHeartbeat(ctx, "ensured pullability table "+tableName)
}

return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil
Expand All @@ -804,7 +805,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig
}

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
slotName := "peerflow_slot_" + req.FlowJobName
if req.ExistingReplicationSlotName != "" {
slotName = req.ExistingReplicationSlotName
}
Expand Down Expand Up @@ -839,7 +840,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig

func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error {
// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", jobName)
slotName := "peerflow_slot_%s" + jobName

publicationName := c.getDefaultPublicationName(jobName)

Expand All @@ -854,7 +855,7 @@ func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string)
}
}()

_, err = pullFlowCleanupTx.Exec(ctx, fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", publicationName))
_, err = pullFlowCleanupTx.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName)
if err != nil {
return fmt.Errorf("error dropping publication: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
_, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE",
schema))
require.NoError(t, err)
_, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schema))
_, err = setupTx.Exec(context.Background(), "CREATE SCHEMA "+schema)
require.NoError(t, err)
err = setupTx.Commit(context.Background())
require.NoError(t, err)
Expand All @@ -58,7 +58,7 @@ func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite {
}

func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
tableName := fmt.Sprintf("%s.simple_add_column", s.schema)
tableName := s.schema + ".simple_add_column"
_, err := s.connector.conn.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -96,7 +96,7 @@ func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
}

func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
tableName := fmt.Sprintf("%s.add_drop_all_column_types", s.schema)
tableName := s.schema + ".add_drop_all_column_types"
_, err := s.connector.conn.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
}

func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", s.schema)
tableName := s.schema + ".add_drop_tricky_column_names"
_, err := s.connector.conn.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
}

func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", s.schema)
tableName := s.schema + ".add_drop_whitespace_column_names"
_, err := s.connector.conn.Exec(context.Background(),
fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *PostgresConnector) GetQRepPartitions(
func (c *PostgresConnector) setTransactionSnapshot(ctx context.Context, tx pgx.Tx) error {
snapshot := c.config.TransactionSnapshot
if snapshot != "" {
if _, err := tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT %s", QuoteLiteral(snapshot))); err != nil {
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(snapshot)); err != nil {
return fmt.Errorf("failed to set transaction snapshot: %w", err)
}
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
quotedWatermarkColumn,
parsedWatermarkTable.String(),
)
c.logger.Info(fmt.Sprintf("[row_based_next] partitions query: %s", partitionsQuery))
c.logger.Info("[row_based_next] partitions query: " + partitionsQuery)
rows, err = tx.Query(ctx, partitionsQuery, minVal)
} else {
partitionsQuery := fmt.Sprintf(
Expand All @@ -166,7 +166,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
quotedWatermarkColumn,
parsedWatermarkTable.String(),
)
c.logger.Info(fmt.Sprintf("[row_based] partitions query: %s", partitionsQuery))
c.logger.Info("[row_based] partitions query: " + partitionsQuery)
rows, err = tx.Query(ctx, partitionsQuery)
}
if err != nil {
Expand Down Expand Up @@ -500,7 +500,7 @@ func (c *PostgresConnector) SetupQRepMetadataTables(ctx context.Context, config
if config.WriteMode != nil &&
config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.conn.Exec(ctx,
fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
"TRUNCATE TABLE "+config.DestinationTableIdentifier)
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
Expand Down Expand Up @@ -569,7 +569,7 @@ func BuildQuery(logger log.Logger, query string, flowJobName string) (string, er
}
res := buf.String()

logger.Info(fmt.Sprintf("templated query: %s", res))
logger.Info("templated query: " + res)
return res, nil
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type testCase struct {
}

func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum int) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
schemaQualifiedTable := schema + ".test"
query := fmt.Sprintf(
`SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`,
schemaQualifiedTable)
Expand All @@ -44,7 +44,7 @@ func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum
}

func newTestCaseForCTID(schema string, name string, rows uint32, expectedNum int) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
schemaQualifiedTable := schema + ".test"
query := fmt.Sprintf(
`SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`,
schemaQualifiedTable)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
}()

if qe.snapshot != "" {
_, err = tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT %s", QuoteLiteral(qe.snapshot)))
_, err = tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to set snapshot: %w", err),
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/postgres/qrep_sql_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
}
} else {
// Step 2.1: Create a temp staging table
stagingTableName := fmt.Sprintf("_peerdb_staging_%s", shared.RandomString(8))
stagingTableName := "_peerdb_staging_" + shared.RandomString(8)
stagingTableIdentifier := pgx.Identifier{s.connector.metadataSchema, stagingTableName}
dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}

Expand All @@ -122,7 +122,6 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
s.connector.logger.Info(fmt.Sprintf("Creating staging table %s - '%s'",
stagingTableName, createStagingTableStmt), syncLog)
_, err = tx.Exec(context.Background(), createStagingTableStmt)

if err != nil {
return -1, fmt.Errorf("failed to create staging table: %v", err)
}
Expand Down Expand Up @@ -156,7 +155,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
selectStrArray = append(selectStrArray, quotedCol)
}
setClauseArray = append(setClauseArray,
fmt.Sprintf(`%s = CURRENT_TIMESTAMP`, QuoteIdentifier(syncedAtCol)))
QuoteIdentifier(syncedAtCol)+`= CURRENT_TIMESTAMP`)
setClause := strings.Join(setClauseArray, ",")
selectSQL := strings.Join(selectStrArray, ",")

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/ssh_wrapped_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (tunnel *SSHTunnel) NewPostgresConnFromConfig(
}
return nil
}, 5, 5*time.Second)

if err != nil {
logger.Error("Failed to create pool", slog.Any("error", err), slog.String("host", host))
conn.Close(ctx)
Expand Down

0 comments on commit 18c552b

Please sign in to comment.