Skip to content

Commit

Permalink
Replication should now work from PG 16 Standbys (#858)
Browse files Browse the repository at this point in the history
I tried PeerDB on Read Replicas in PG 16:
1. Initial Load worked as expected
2. SyncFlows were erroring out:
```
ERROR:  recovery is in progress
HINT:  WAL control functions cannot be executed during recovery.
```
The reason was `pg_current_wal_lsn()` cannot be run on a standby. I
changed that function to `pg_last_wal_replay_lsn()` if Replica.

For now `CREATE PUBLICATION` needs to be run on primary by user

Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
saisrirampur and serprex authored Dec 20, 2023
1 parent 9618c5e commit 6b4e0e3
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er
}
rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,wal_status,"+
"confirmed_flush_lsn::text,active,"+
"round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+
"round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END"+
" - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+
" FROM pg_control_checkpoint(), pg_replication_slots"+specificSlotClause+";")
if err != nil {
return nil, err
Expand Down Expand Up @@ -775,7 +776,8 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string,
}

func (c *PostgresConnector) getCurrentLSN() (pglogrepl.LSN, error) {
row := c.pool.QueryRow(c.ctx, "SELECT pg_current_wal_lsn();")
row := c.pool.QueryRow(c.ctx,
"SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END")
var result pgtype.Text
err := row.Scan(&result)
if err != nil {
Expand Down

0 comments on commit 6b4e0e3

Please sign in to comment.