diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 76de074bbd..a9cc01787e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -3,6 +3,7 @@ package connpostgres import ( "context" "crypto/sha256" + "errors" "fmt" "log/slog" "time" @@ -169,7 +170,7 @@ func (p *PostgresCDCSource) replicationOptions() (*pglogrepl.StartReplicationOpt pubOpt := fmt.Sprintf("publication_names '%s'", p.publication) pluginArguments = append(pluginArguments, pubOpt) } else { - return nil, fmt.Errorf("publication name is not set") + return nil, errors.New("publication name is not set") } return &pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments}, nil @@ -259,7 +260,7 @@ func (p *PostgresCDCSource) consumeStream( if time.Since(standByLastLogged) > 10*time.Second { numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len()) - p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage)) + p.logger.Info("Sent Standby status message. " + numRowsProcessedMessage) standByLastLogged = time.Now() } } diff --git a/flow/connectors/postgres/normalize_stmt_generator.go b/flow/connectors/postgres/normalize_stmt_generator.go index 5801dfb6dd..01fe11273d 100644 --- a/flow/connectors/postgres/normalize_stmt_generator.go +++ b/flow/connectors/postgres/normalize_stmt_generator.go @@ -137,7 +137,7 @@ func (n *normalizeStmtGenerator) generateMergeStatement() string { flattenedCastsSQL := strings.Join(flattenedCastsSQLArray, ",") insertValuesSQLArray := make([]string, 0, columnCount+2) for _, quotedCol := range quotedColumnNames { - insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("src.%s", quotedCol)) + insertValuesSQLArray = append(insertValuesSQLArray, "src."+quotedCol) } updateStatementsforToastCols := n.generateUpdateStatements(quotedColumnNames) @@ -208,13 +208,11 @@ func (n *normalizeStmtGenerator) generateUpdateStatements(quotedCols []string) [ } // set the synced at column to the current timestamp if n.peerdbCols.SyncedAtColName != "" { - tmpArray = append(tmpArray, fmt.Sprintf(`%s=CURRENT_TIMESTAMP`, - QuoteIdentifier(n.peerdbCols.SyncedAtColName))) + tmpArray = append(tmpArray, QuoteIdentifier(n.peerdbCols.SyncedAtColName)+`=CURRENT_TIMESTAMP`) } // set soft-deleted to false, tackles insert after soft-delete if handleSoftDelete { - tmpArray = append(tmpArray, fmt.Sprintf(`%s=FALSE`, - QuoteIdentifier(n.peerdbCols.SoftDeleteColName))) + tmpArray = append(tmpArray, QuoteIdentifier(n.peerdbCols.SoftDeleteColName)+`=FALSE`) } quotedCols := QuoteLiteral(cols) @@ -228,7 +226,7 @@ func (n *normalizeStmtGenerator) generateUpdateStatements(quotedCols []string) [ // the backfill has happened from the pull side already, so treat the DeleteRecord as an update // and then set soft-delete to true. if handleSoftDelete { - tmpArray[len(tmpArray)-1] = fmt.Sprintf(`%s=TRUE`, QuoteIdentifier(n.peerdbCols.SoftDeleteColName)) + tmpArray[len(tmpArray)-1] = QuoteIdentifier(n.peerdbCols.SoftDeleteColName) + `=TRUE` ssep := strings.Join(tmpArray, ", ") updateStmt := fmt.Sprintf(`WHEN MATCHED AND src._peerdb_record_type=2 AND _peerdb_unchanged_toast_columns=%s diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index b8d3d27f20..e63802a7fd 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -2,6 +2,7 @@ package connpostgres import ( "context" + "errors" "fmt" "log/slog" "regexp" @@ -114,7 +115,7 @@ func (c *PostgresConnector) Conn() *pgx.Conn { // ConnectionActive returns nil if the connection is active. func (c *PostgresConnector) ConnectionActive(ctx context.Context) error { if c.conn == nil { - return fmt.Errorf("connection is nil") + return errors.New("connection is nil") } pingErr := c.conn.Ping(ctx) return pingErr @@ -184,7 +185,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo }() // Slotname would be the job name prefixed with "peerflow_slot_" - slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName) + slotName := "peerflow_slot_" + req.FlowJobName if req.OverrideReplicationSlotName != "" { slotName = req.OverrideReplicationSlotName } @@ -567,8 +568,8 @@ func (c *PostgresConnector) GetTableSchema( return nil, err } res[tableName] = tableSchema - utils.RecordHeartbeat(ctx, fmt.Sprintf("fetched schema for table %s", tableName)) - c.logger.Info(fmt.Sprintf("fetched schema for table %s", tableName)) + utils.RecordHeartbeat(ctx, "fetched schema for table "+tableName) + c.logger.Info("fetched schema for table " + tableName) } return &protos.GetTableSchemaBatchOutput{ @@ -768,7 +769,7 @@ func (c *PostgresConnector) EnsurePullability( } if !req.CheckConstraints { - msg := fmt.Sprintf("[no-constraints] ensured pullability table %s", tableName) + msg := "[no-constraints] ensured pullability table " + tableName utils.RecordHeartbeat(ctx, msg) continue } @@ -789,7 +790,7 @@ func (c *PostgresConnector) EnsurePullability( return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable) } - utils.RecordHeartbeat(ctx, fmt.Sprintf("ensured pullability table %s", tableName)) + utils.RecordHeartbeat(ctx, "ensured pullability table "+tableName) } return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil @@ -804,7 +805,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig } // Slotname would be the job name prefixed with "peerflow_slot_" - slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName) + slotName := "peerflow_slot_" + req.FlowJobName if req.ExistingReplicationSlotName != "" { slotName = req.ExistingReplicationSlotName } @@ -839,7 +840,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error { // Slotname would be the job name prefixed with "peerflow_slot_" - slotName := fmt.Sprintf("peerflow_slot_%s", jobName) + slotName := "peerflow_slot_%s" + jobName publicationName := c.getDefaultPublicationName(jobName) @@ -854,7 +855,7 @@ func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) } }() - _, err = pullFlowCleanupTx.Exec(ctx, fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", publicationName)) + _, err = pullFlowCleanupTx.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName) if err != nil { return fmt.Errorf("error dropping publication: %w", err) } diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 6f40501d74..5e3166a195 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -45,7 +45,7 @@ func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite { _, err = setupTx.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema)) require.NoError(t, err) - _, err = setupTx.Exec(context.Background(), fmt.Sprintf("CREATE SCHEMA %s", schema)) + _, err = setupTx.Exec(context.Background(), "CREATE SCHEMA "+schema) require.NoError(t, err) err = setupTx.Commit(context.Background()) require.NoError(t, err) @@ -58,7 +58,7 @@ func SetupSuite(t *testing.T) PostgresSchemaDeltaTestSuite { } func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { - tableName := fmt.Sprintf("%s.simple_add_column", s.schema) + tableName := s.schema + ".simple_add_column" _, err := s.connector.conn.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) require.NoError(s.t, err) @@ -96,7 +96,7 @@ func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { } func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { - tableName := fmt.Sprintf("%s.add_drop_all_column_types", s.schema) + tableName := s.schema + ".add_drop_all_column_types" _, err := s.connector.conn.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) require.NoError(s.t, err) @@ -131,7 +131,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { } func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { - tableName := fmt.Sprintf("%s.add_drop_tricky_column_names", s.schema) + tableName := s.schema + ".add_drop_tricky_column_names" _, err := s.connector.conn.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName)) require.NoError(s.t, err) @@ -166,7 +166,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { } func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { - tableName := fmt.Sprintf("%s.add_drop_whitespace_column_names", s.schema) + tableName := s.schema + ".add_drop_whitespace_column_names" _, err := s.connector.conn.Exec(context.Background(), fmt.Sprintf("CREATE TABLE %s(\" \" INT PRIMARY KEY)", tableName)) require.NoError(s.t, err) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 13b92dfcd4..df82fb5bf4 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -72,7 +72,7 @@ func (c *PostgresConnector) GetQRepPartitions( func (c *PostgresConnector) setTransactionSnapshot(ctx context.Context, tx pgx.Tx) error { snapshot := c.config.TransactionSnapshot if snapshot != "" { - if _, err := tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT %s", QuoteLiteral(snapshot))); err != nil { + if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(snapshot)); err != nil { return fmt.Errorf("failed to set transaction snapshot: %w", err) } } @@ -151,7 +151,7 @@ func (c *PostgresConnector) getNumRowsPartitions( quotedWatermarkColumn, parsedWatermarkTable.String(), ) - c.logger.Info(fmt.Sprintf("[row_based_next] partitions query: %s", partitionsQuery)) + c.logger.Info("[row_based_next] partitions query: " + partitionsQuery) rows, err = tx.Query(ctx, partitionsQuery, minVal) } else { partitionsQuery := fmt.Sprintf( @@ -166,7 +166,7 @@ func (c *PostgresConnector) getNumRowsPartitions( quotedWatermarkColumn, parsedWatermarkTable.String(), ) - c.logger.Info(fmt.Sprintf("[row_based] partitions query: %s", partitionsQuery)) + c.logger.Info("[row_based] partitions query: " + partitionsQuery) rows, err = tx.Query(ctx, partitionsQuery) } if err != nil { @@ -500,7 +500,7 @@ func (c *PostgresConnector) SetupQRepMetadataTables(ctx context.Context, config if config.WriteMode != nil && config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { _, err = c.conn.Exec(ctx, - fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)) + "TRUNCATE TABLE "+config.DestinationTableIdentifier) if err != nil { return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err) } @@ -569,7 +569,7 @@ func BuildQuery(logger log.Logger, query string, flowJobName string) (string, er } res := buf.String() - logger.Info(fmt.Sprintf("templated query: %s", res)) + logger.Info("templated query: " + res) return res, nil } diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index 0567136bb3..b1aabfb60b 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -25,7 +25,7 @@ type testCase struct { } func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum int) *testCase { - schemaQualifiedTable := fmt.Sprintf("%s.test", schema) + schemaQualifiedTable := schema + ".test" query := fmt.Sprintf( `SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`, schemaQualifiedTable) @@ -44,7 +44,7 @@ func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum } func newTestCaseForCTID(schema string, name string, rows uint32, expectedNum int) *testCase { - schemaQualifiedTable := fmt.Sprintf("%s.test", schema) + schemaQualifiedTable := schema + ".test" query := fmt.Sprintf( `SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`, schemaQualifiedTable) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 2b97759d0e..a4efdfc85b 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -362,7 +362,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( }() if qe.snapshot != "" { - _, err = tx.Exec(ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT %s", QuoteLiteral(qe.snapshot))) + _, err = tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) if err != nil { stream.Records <- model.QRecordOrError{ Err: fmt.Errorf("failed to set snapshot: %w", err), diff --git a/flow/connectors/postgres/qrep_sql_sync.go b/flow/connectors/postgres/qrep_sql_sync.go index ed88d395f1..16ed755553 100644 --- a/flow/connectors/postgres/qrep_sql_sync.go +++ b/flow/connectors/postgres/qrep_sql_sync.go @@ -109,7 +109,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( } } else { // Step 2.1: Create a temp staging table - stagingTableName := fmt.Sprintf("_peerdb_staging_%s", shared.RandomString(8)) + stagingTableName := "_peerdb_staging_" + shared.RandomString(8) stagingTableIdentifier := pgx.Identifier{s.connector.metadataSchema, stagingTableName} dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table} @@ -122,7 +122,6 @@ func (s *QRepStagingTableSync) SyncQRepRecords( s.connector.logger.Info(fmt.Sprintf("Creating staging table %s - '%s'", stagingTableName, createStagingTableStmt), syncLog) _, err = tx.Exec(context.Background(), createStagingTableStmt) - if err != nil { return -1, fmt.Errorf("failed to create staging table: %v", err) } @@ -156,7 +155,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( selectStrArray = append(selectStrArray, quotedCol) } setClauseArray = append(setClauseArray, - fmt.Sprintf(`%s = CURRENT_TIMESTAMP`, QuoteIdentifier(syncedAtCol))) + QuoteIdentifier(syncedAtCol)+`= CURRENT_TIMESTAMP`) setClause := strings.Join(setClauseArray, ",") selectSQL := strings.Join(selectStrArray, ",") diff --git a/flow/connectors/postgres/ssh_wrapped_pool.go b/flow/connectors/postgres/ssh_wrapped_pool.go index e0c5591a3e..8b3d5d067e 100644 --- a/flow/connectors/postgres/ssh_wrapped_pool.go +++ b/flow/connectors/postgres/ssh_wrapped_pool.go @@ -119,7 +119,6 @@ func (tunnel *SSHTunnel) NewPostgresConnFromConfig( } return nil }, 5, 5*time.Second) - if err != nil { logger.Error("Failed to create pool", slog.Any("error", err), slog.String("host", host)) conn.Close(ctx)