Skip to content

Commit

Permalink
fix value
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 10, 2023
1 parent eebf5f6 commit 350bb86
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
log "github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
)
Expand Down Expand Up @@ -205,33 +206,42 @@ func (c *PostgresConnector) GetSlotInfo(slotName string) ([]*protos.SlotInfo, er
if slotName != "" {
specificSlotClause = fmt.Sprintf(" WHERE slot_name = '%s'", slotName)
}
rows, err := c.pool.Query(c.ctx, "SELECT slot_name, redo_lsn::Text,restart_lsn::text,confirmed_flush_lsn::text,active,"+
"round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind"+
" FROM pg_control_checkpoint(), pg_replication_slots"+specificSlotClause+";")
rows, err := c.pool.Query(c.ctx, `
SELECT
slot_name,
redo_lsn::text,
restart_lsn::text,
confirmed_flush_lsn::text,
active,
round((pg_current_wal_lsn() - confirmed_flush_lsn) / 1024 / 1024) AS MB_Behind
FROM
pg_control_checkpoint(),
pg_replication_slots`+specificSlotClause+`;`)

if err != nil {
return nil, err
}
defer rows.Close()
var slotInfoRows []*protos.SlotInfo
for rows.Next() {
var redoLSN string
var slotName string
var restartLSN string
var confirmedFlushLSN string
var active bool
var lagInMB float32
var redoLSN pgtype.Text
var slotName pgtype.Text
var restartLSN pgtype.Text
var confirmedFlushLSN pgtype.Text
var active pgtype.Bool
var lagInMB pgtype.Float4
err := rows.Scan(&slotName, &redoLSN, &restartLSN, &confirmedFlushLSN, &active, &lagInMB)
if err != nil {
return nil, err
}

slotInfoRows = append(slotInfoRows, &protos.SlotInfo{
RedoLSN: redoLSN,
RestartLSN: restartLSN,
ConfirmedFlushLSN: confirmedFlushLSN,
SlotName: slotName,
Active: active,
LagInMb: lagInMB,
RedoLSN: redoLSN.String,
RestartLSN: restartLSN.String,
ConfirmedFlushLSN: confirmedFlushLSN.String,
SlotName: slotName.String,
Active: active.Bool,
LagInMb: lagInMB.Float32,
})
}
return slotInfoRows, nil
Expand Down

0 comments on commit 350bb86

Please sign in to comment.