From 50646c9685707fe80b988197bde2858ddd48ae79 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 12 Mar 2024 21:54:57 +0530 Subject: [PATCH] SF QRep: Swap synced at update with alter table rename (#1475) --- flow/connectors/postgres/qrep.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 2dc1a5bd09..c9c271eb78 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -651,17 +651,11 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi if err != nil { return fmt.Errorf("failed to drop %s: %v", dstTableName, err) } - _, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", - tempTableIdentifier.Sanitize(), QuoteIdentifier(dstSchemaTable.Table))) - if err != nil { - return fmt.Errorf("failed to rename %s to %s: %v", - tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err) - } if config.SyncedAtColName != "" { updateSyncedAtStmt := fmt.Sprintf( `UPDATE %s SET %s = CURRENT_TIMESTAMP`, - dstTableIdentifier.Sanitize(), + tempTableIdentifier.Sanitize(), QuoteIdentifier(config.SyncedAtColName), ) _, err = tx.Exec(ctx, updateSyncedAtStmt) @@ -669,6 +663,14 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi return fmt.Errorf("failed to update synced_at column: %v", err) } } + + _, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", + tempTableIdentifier.Sanitize(), QuoteIdentifier(dstSchemaTable.Table))) + if err != nil { + return fmt.Errorf("failed to rename %s to %s: %v", + tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err) + } + } if constraintsHookExists {