diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 92a1b181c9..ba7e53a8dc 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -12,6 +12,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" log "github.com/sirupsen/logrus" "golang.org/x/exp/maps" ) @@ -205,33 +206,42 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er if slotName != "" { specificSlotClause = fmt.Sprintf(" WHERE slot_name = '%s'", slotName) } - rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,confirmed_flush_lsn::text,active,"+ - "round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+ - " FROM pg_control_checkpoint(), pg_replication_slots"+specificSlotClause+";") + rows, err := c.pool.Query(c.ctx, ` + SELECT + slot_name, + redo_lsn::text, + restart_lsn::text, + confirmed_flush_lsn::text, + active, + round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind + FROM + pg_control_checkpoint(), + pg_replication_slots`+specificSlotClause+`;`) + if err != nil { return nil, err } defer rows.Close() var slotInfoRows []*protos.SlotInfo for rows.Next() { - var redoLSN string - var slotName string - var restartLSN string - var confirmedFlushLSN string - var active bool - var lagInMB float32 + var redoLSN pgtype.Text + var slotName pgtype.Text + var restartLSN pgtype.Text + var confirmedFlushLSN pgtype.Text + var active pgtype.Bool + var lagInMB pgtype.Float4 err := rows.Scan(&slotName, &redoLSN, &restartLSN, &confirmedFlushLSN, &active, &lagInMB) if err != nil { return nil, err } slotInfoRows = append(slotInfoRows, &protos.SlotInfo{ - RedoLSN: redoLSN, - RestartLSN: restartLSN, - ConfirmedFlushLSN: confirmedFlushLSN, - SlotName: slotName, - Active: active, - LagInMb: lagInMB, + RedoLSN: redoLSN.String, + RestartLSN: restartLSN.String, + ConfirmedFlushLSN: confirmedFlushLSN.String, + SlotName: slotName.String, + Active: active.Bool, + LagInMb: lagInMB.Float32, }) } return slotInfoRows, nil