Skip to content

Commit

Permalink
fix: pg12
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 18, 2023
1 parent dcad4ab commit 6252da8
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const (
RANK() OVER (PARTITION BY %s ORDER BY _peerdb_timestamp DESC) AS _peerdb_rank
FROM %s.%s WHERE _peerdb_batch_id>$1 AND _peerdb_batch_id<=$2 AND _peerdb_destination_table_name=$3
)
%s USING src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2`
%s src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2`

dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE MIRROR_JOB_NAME=$1"
Expand Down Expand Up @@ -583,16 +583,17 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie
}
deleteWhereClauseSQL := strings.TrimSuffix(strings.Join(deleteWhereClauseArray, ""), "AND ")
deletePart := fmt.Sprintf(
"DELETE FROM %s",
"DELETE FROM %s USING",
parsedDstTable.String())

if peerdbCols.SoftDelete {
deletePart = fmt.Sprintf("UPDATE %s SET %s = TRUE",
deletePart = fmt.Sprintf(`UPDATE %s SET "%s" = TRUE`,
parsedDstTable.String(), peerdbCols.SoftDeleteColName)
if peerdbCols.SyncedAtColName != "" {
deletePart = fmt.Sprintf("%s, %s = CURRENT_TIMESTAMP",
deletePart = fmt.Sprintf(`%s, "%s" = CURRENT_TIMESTAMP`,
deletePart, peerdbCols.SyncedAtColName)
}
deletePart += " FROM"
}
fallbackUpsertStatement := fmt.Sprintf(fallbackUpsertStatementSQL,
strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), c.metadataSchema,
Expand Down

0 comments on commit 6252da8

Please sign in to comment.