Skip to content

Commit

Permalink
fixed issues wrt PG compatability
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jan 24, 2024
1 parent 8e08a74 commit bd67eff
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
29 changes: 19 additions & 10 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ const (
RANK() OVER (PARTITION BY %s ORDER BY _peerdb_timestamp DESC) AS _peerdb_rank
FROM %s.%s WHERE _peerdb_batch_id>$1 AND _peerdb_batch_id<=$2 AND _peerdb_destination_table_name=$3
)
DELETE FROM %s USING %s FROM src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2`
%s src_rank WHERE %s AND src_rank._peerdb_rank=1 AND src_rank._peerdb_record_type=2`

dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE mirror_job_name=$1"
Expand Down Expand Up @@ -254,18 +254,27 @@ func (c *PostgresConnector) checkSlotAndPublication(slot string, publication str
func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, error) {
var whereClause string
if slotName != "" {
whereClause = fmt.Sprintf(" WHERE slot_name = %s", QuoteLiteral(slotName))
whereClause = fmt.Sprintf("WHERE slot_name=%s", QuoteLiteral(slotName))
} else {
whereClause = fmt.Sprintf(" WHERE database = %s", QuoteLiteral(c.config.Database))
whereClause = fmt.Sprintf("WHERE database=%s", QuoteLiteral(c.config.Database))
}
rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,wal_status,"+
"confirmed_flush_lsn::text,active,"+
"round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END"+
" - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+
" FROM pg_control_checkpoint(), pg_replication_slots"+whereClause)

hasWALStatus, _, err := c.MajorVersionCheck(POSTGRES_13)
if err != nil {
return nil, err
}
walStatusSelector := "wal_status"
if !hasWALStatus {
walStatusSelector = "'unknown'"
}
rows, err := c.pool.Query(c.ctx, fmt.Sprintf(`SELECT slot_name, redo_lsn::Text,restart_lsn::text,%s,
confirmed_flush_lsn::text,active,
round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END
- confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind
FROM pg_control_checkpoint(),pg_replication_slots %s`, walStatusSelector, whereClause))
if err != nil {
return nil, fmt.Errorf("failed to read information for slots: %w", err)
}
defer rows.Close()
var slotInfoRows []*protos.SlotInfo
for rows.Next() {
Expand Down Expand Up @@ -615,8 +624,8 @@ func (c *PostgresConnector) CheckSourceTables(tableNames []string, pubName strin
var pubTableCount int
err := c.pool.QueryRow(c.ctx, fmt.Sprintf(`
with source_table_components (sname, tname) as (values %s)
select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables
INNER JOIN source_table_components stc
select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables
INNER JOIN source_table_components stc
ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount)
if err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/postgres/normalize_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,23 @@ func (n *normalizeStmtGenerator) generateFallbackStatements() []string {
}
deleteWhereClauseSQL := strings.Join(deleteWhereClauseArray, " AND ")

deleteUpdate := ""
// make it update instead in case soft-delete is enabled
deleteUpdate := fmt.Sprintf(`DELETE FROM %s USING `, parsedDstTable.String())
if n.peerdbCols.SoftDelete {
deleteUpdate = fmt.Sprintf(`UPDATE %s SET %s=TRUE`,
parsedDstTable.String(), QuoteIdentifier(n.peerdbCols.SoftDeleteColName))
if n.peerdbCols.SyncedAtColName != "" {
deleteUpdate += fmt.Sprintf(`,%s=CURRENT_TIMESTAMP`, QuoteIdentifier(n.peerdbCols.SyncedAtColName))
}
deleteUpdate += " FROM"
}
fallbackUpsertStatement := fmt.Sprintf(fallbackUpsertStatementSQL,
strings.Join(maps.Values(primaryKeyColumnCasts), ","), n.metadataSchema,
n.rawTableName, parsedDstTable.String(), insertColumnsSQL, flattenedCastsSQL,
strings.Join(n.normalizedTableSchema.PrimaryKeyColumns, ","), updateColumnsSQL)
fallbackDeleteStatement := fmt.Sprintf(fallbackDeleteStatementSQL,
strings.Join(maps.Values(primaryKeyColumnCasts), ","), n.metadataSchema,
n.rawTableName, parsedDstTable.String(), deleteUpdate, deleteWhereClauseSQL)
n.rawTableName, deleteUpdate, deleteWhereClauseSQL)

return []string{fallbackUpsertStatement, fallbackDeleteStatement}
}
Expand Down

0 comments on commit bd67eff

Please sign in to comment.