diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 2dc1a5bd09..c9c271eb78 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -651,17 +651,11 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi if err != nil { return fmt.Errorf("failed to drop %s: %v", dstTableName, err) } - _, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", - tempTableIdentifier.Sanitize(), QuoteIdentifier(dstSchemaTable.Table))) - if err != nil { - return fmt.Errorf("failed to rename %s to %s: %v", - tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err) - } if config.SyncedAtColName != "" { updateSyncedAtStmt := fmt.Sprintf( `UPDATE %s SET %s = CURRENT_TIMESTAMP`, - dstTableIdentifier.Sanitize(), + tempTableIdentifier.Sanitize(), QuoteIdentifier(config.SyncedAtColName), ) _, err = tx.Exec(ctx, updateSyncedAtStmt) @@ -669,6 +663,14 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi return fmt.Errorf("failed to update synced_at column: %v", err) } } + + _, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", + tempTableIdentifier.Sanitize(), QuoteIdentifier(dstSchemaTable.Table))) + if err != nil { + return fmt.Errorf("failed to rename %s to %s: %v", + tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err) + } + } if constraintsHookExists {