diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index d709866239..1d7d4d5fb5 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -253,7 +253,8 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er } rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,wal_status,"+ "confirmed_flush_lsn::text,active,"+ - "round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+ + "round((CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END"+ + " - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+ " FROM pg_control_checkpoint(), pg_replication_slots"+specificSlotClause+";") if err != nil { return nil, err @@ -775,7 +776,8 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, } func (c *PostgresConnector) getCurrentLSN() (pglogrepl.LSN, error) { - row := c.pool.QueryRow(c.ctx, "SELECT pg_current_wal_lsn();") + row := c.pool.QueryRow(c.ctx, + "SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_receive_lsn() ELSE pg_current_wal_lsn() END") var result pgtype.Text err := row.Scan(&result) if err != nil {