From 6b4e0e317d557d865869f39eabdf87aabebf073e Mon Sep 17 00:00:00 2001 From: Sai Srirampur Date: Wed, 20 Dec 2023 09:33:15 -0800 Subject: [PATCH] Replication should now work from PG 16 Standbys (#858) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I tried PeerDB on Read Replicas in PG 16: 1. Initial Load worked as expected 2. SyncFlows were erroring out: ``` ERROR: recovery is in progress HINT: WAL control functions cannot be executed during recovery. ``` The reason was `pg_current_wal_lsn()` cannot be run on a standby. I changed that function to `pg_last_wal_replay_lsn()` if Replica. For now `CREATE PUBLICATION` needs to be run on primary by user Co-authored-by: Philip Dubé --- flow/connectors/postgres/client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index d709866239..1d7d4d5fb5 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -253,7 +253,8 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er } rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,wal_status,"+ "confirmed_flush_lsn::text,active,"+ - "round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+ + "round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END"+ + " - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+ " FROM pg_control_checkpoint(), pg_replication_slots"+specificSlotClause+";") if err != nil { return nil, err @@ -775,7 +776,8 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, } func (c *PostgresConnector) getCurrentLSN() (pglogrepl.LSN, error) { - row := c.pool.QueryRow(c.ctx, "SELECT pg_current_wal_lsn();") + row := c.pool.QueryRow(c.ctx, + "SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END") var result pgtype.Text err := row.Scan(&result) if err != nil {